Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/bin/ampctl/src/cmd/manifest/generate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ fn generate_evm_rpc_manifest(
let manifest_table = ManifestTable {
schema,
network: network.clone(),
bloom_filter_columns: Vec::new(),
};
(table.name().to_string(), manifest_table)
})
Expand Down Expand Up @@ -204,6 +205,7 @@ fn generate_solana_manifest(
let manifest_table = ManifestTable {
schema,
network: network.clone(),
bloom_filter_columns: Vec::new(),
};
(table.name().to_string(), manifest_table)
})
Expand Down Expand Up @@ -234,6 +236,7 @@ fn generate_firehose_manifest(
let manifest_table = ManifestTable {
schema,
network: network.clone(),
bloom_filter_columns: Vec::new(),
};
(table.name().to_string(), manifest_table)
})
Expand Down Expand Up @@ -264,6 +267,7 @@ fn generate_tempo_manifest(
let manifest_table = ManifestTable {
schema,
network: network.clone(),
bloom_filter_columns: Vec::new(),
};
(table.name().to_string(), manifest_table)
})
Expand Down
5 changes: 0 additions & 5 deletions crates/config/src/worker_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ pub struct ParquetConfig {
/// Compression algorithm (default: `zstd(1)`).
#[serde(default)]
pub compression: Compression,
/// Enable Parquet bloom filters (default: false).
#[serde(default)]
pub bloom_filters: bool,
/// Parquet metadata cache size in MB (default: 1024).
#[serde(default = "default_cache_size_mb")]
pub cache_size_mb: u64,
Expand Down Expand Up @@ -48,7 +45,6 @@ impl Default for ParquetConfig {
fn default() -> Self {
Self {
compression: Compression::default(),
bloom_filters: false,
cache_size_mb: default_cache_size_mb(),
max_row_group_mb: default_max_row_group_mb(),
target_size: SizeLimitConfig::default(),
Expand All @@ -71,7 +67,6 @@ impl From<&ParquetConfig> for amp_worker_core::ParquetConfig {
fn from(config: &ParquetConfig) -> Self {
Self {
compression: (&config.compression).into(),
bloom_filters: config.bloom_filters,
cache_size_mb: config.cache_size_mb,
max_row_group_mb: config.max_row_group_mb,
target_size: (&config.target_size).into(),
Expand Down
9 changes: 8 additions & 1 deletion crates/core/datasets-common/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use downcast_rs::{DowncastSync, impl_downcast};

use crate::{
block_num::BlockNum, dataset_kind_str::DatasetKindStr, hash_reference::HashReference,
network_id::NetworkId, table_name::TableName,
manifest::BloomFilterColumnConfig, network_id::NetworkId, table_name::TableName,
};

/// Core trait representing a dataset definition.
Expand Down Expand Up @@ -95,6 +95,13 @@ pub trait Table: DowncastSync + std::fmt::Debug {

/// Returns column names by which this table is naturally sorted. Always includes `_block_num`.
fn sorted_by(&self) -> &BTreeSet<String>;

/// Returns the bloom filter column configurations for this table.
///
/// Empty by default — tables without bloom filter config get no bloom filters.
fn bloom_filter_columns(&self) -> &[BloomFilterColumnConfig] {
&[]
}
}

// Implement downcasting for `Table`.
Expand Down
49 changes: 49 additions & 0 deletions crates/core/datasets-common/src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,55 @@ impl Default for Schema {
}
}

/// Configuration for a bloom filter on a specific column.
///
/// Bloom filters enable efficient row-group pruning during queries by quickly
/// determining whether a row group could contain a specific value. The `ndv`
/// (number of distinct values) parameter tunes the filter's false-positive rate.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
pub struct BloomFilterColumnConfig {
/// Column name to apply the bloom filter to.
pub column: String,
/// Estimated number of distinct values for this column (default: 10,000).
#[serde(default = "default_bloom_filter_ndv")]
pub ndv: u64,
}

fn default_bloom_filter_ndv() -> u64 {
10_000
}

/// Validates that all bloom filter column references exist in the given schema.
///
/// Returns an error describing the first invalid column found, or `Ok(())` if all
/// columns are valid.
pub fn validate_bloom_filter_columns(
table_name: &str,
schema: &datafusion::arrow::datatypes::SchemaRef,
columns: &[BloomFilterColumnConfig],
) -> Result<(), InvalidBloomFilterColumn> {
for config in columns {
if schema.field_with_name(&config.column).is_err() {
return Err(InvalidBloomFilterColumn {
table_name: table_name.to_owned(),
column: config.column.clone(),
});
}
}
Ok(())
}

/// A bloom filter column reference does not match any column in the table schema.
#[derive(Debug, thiserror::Error)]
#[error("bloom filter column '{column}' does not exist in table '{table_name}'")]
pub struct InvalidBloomFilterColumn {
/// Table that contains the invalid reference.
pub table_name: String,
/// Column name that was referenced but not found.
pub column: String,
}

/// Apache Arrow data _new-type_ wrapper with JSON schema support.
///
/// This wrapper provides serialization and JSON schema generation capabilities
Expand Down
37 changes: 32 additions & 5 deletions crates/core/datasets-derived/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use datasets_common::{
dataset::Table as TableTrait,
dataset_kind_str::DatasetKindStr,
hash_reference::HashReference,
manifest::{BloomFilterColumnConfig, InvalidBloomFilterColumn, validate_bloom_filter_columns},
table_name::TableName,
};

Expand Down Expand Up @@ -44,8 +45,16 @@ pub fn dataset(reference: HashReference, manifest: Manifest) -> Result<Dataset,
let unsorted_tables: Vec<Table> = manifest
.tables
.into_iter()
.map(|(name, table)| Table::new(name, table.schema.arrow.into(), vec![]))
.collect();
.map(|(name, table)| {
Table::new(
name,
table.schema.arrow.into(),
vec![],
table.bloom_filter_columns,
)
.map_err(DatasetError::InvalidBloomFilterConfig)
})
.collect::<Result<Vec<_>, DatasetError>>()?;
let tables = sort_tables_by_dependencies(unsorted_tables, &queries)
.map_err(DatasetError::SortTableDependencies)?;

Expand Down Expand Up @@ -162,6 +171,10 @@ pub enum DatasetError {
/// Failed to sort tables by their SQL dependencies
#[error("Failed to sort tables by dependencies")]
SortTableDependencies(#[source] SortTablesByDependenciesError),

/// A bloom filter column reference does not exist in the table schema
#[error(transparent)]
InvalidBloomFilterConfig(#[from] InvalidBloomFilterColumn),
}

/// A table definition for a derived dataset. Has no network association.
Expand All @@ -170,18 +183,28 @@ pub struct Table {
name: TableName,
schema: SchemaRef,
sorted_by: BTreeSet<String>,
bloom_filter_columns: Vec<BloomFilterColumnConfig>,
}

impl Table {
/// Creates a new derived table definition. Automatically adds `_block_num` to `sorted_by`.
pub fn new(name: TableName, schema: SchemaRef, sorted_by: Vec<String>) -> Self {
///
/// Returns an error if any bloom filter column does not exist in the schema.
pub fn new(
name: TableName,
schema: SchemaRef,
sorted_by: Vec<String>,
bloom_filter_columns: Vec<BloomFilterColumnConfig>,
) -> Result<Self, InvalidBloomFilterColumn> {
validate_bloom_filter_columns(name.as_str(), &schema, &bloom_filter_columns)?;
let mut sorted_by: BTreeSet<String> = sorted_by.into_iter().collect();
sorted_by.insert(RESERVED_BLOCK_NUM_COLUMN_NAME.to_string());
Self {
Ok(Self {
name,
schema,
sorted_by,
}
bloom_filter_columns,
})
}
}

Expand All @@ -197,6 +220,10 @@ impl TableTrait for Table {
fn sorted_by(&self) -> &BTreeSet<String> {
&self.sorted_by
}

fn bloom_filter_columns(&self) -> &[BloomFilterColumnConfig] {
&self.bloom_filter_columns
}
}

/// Sort tables by their SQL dependencies using topological ordering.
Expand Down
6 changes: 5 additions & 1 deletion crates/core/datasets-derived/src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use std::collections::BTreeMap;

// Re-export schema types from datasets-common
pub use datasets_common::manifest::{ArrowSchema, Field, TableSchema};
pub use datasets_common::manifest::{ArrowSchema, BloomFilterColumnConfig, Field, TableSchema};
use datasets_common::table_name::TableName;

use crate::{
Expand Down Expand Up @@ -51,6 +51,10 @@ pub struct Table {
pub input: TableInput,
/// Arrow schema definition for the table
pub schema: TableSchema,
/// Columns to apply bloom filters to for row-group pruning.
/// Omit to disable bloom filters for this table.
#[serde(default)]
pub bloom_filter_columns: Vec<BloomFilterColumnConfig>,
}

/// Input source for a table definition.
Expand Down
33 changes: 31 additions & 2 deletions crates/core/datasets-raw/src/dataset.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
//! Concrete raw dataset and table types.

use std::{collections::BTreeSet, sync::Arc};
use std::{
collections::{BTreeMap, BTreeSet},
sync::Arc,
};

use arrow::datatypes::SchemaRef;
use datasets_common::{
block_num::{BlockNum, RESERVED_BLOCK_NUM_COLUMN_NAME},
dataset::Table as TableTrait,
dataset_kind_str::DatasetKindStr,
hash_reference::HashReference,
manifest::{BloomFilterColumnConfig, validate_bloom_filter_columns},
network_id::NetworkId,
table_name::TableName,
};
Expand Down Expand Up @@ -37,7 +41,8 @@ impl Dataset {
reference: HashReference,
kind: DatasetKindStr,
network: NetworkId,
tables: Vec<Table>,
mut tables: Vec<Table>,
manifest_tables: &BTreeMap<String, crate::manifest::Table>,
start_block: Option<BlockNum>,
finalized_blocks_only: bool,
) -> Self {
Expand All @@ -50,6 +55,19 @@ impl Dataset {
"all tables must belong to the same network as the dataset"
);

// Apply per-table bloom filter config from the manifest, validating column references.
for table in &mut tables {
if let Some(mt) = manifest_tables.get(table.name().as_str()) {
validate_bloom_filter_columns(
table.name().as_str(),
table.schema(),
&mt.bloom_filter_columns,
)
.expect("bloom filter columns must reference valid schema columns");
table.set_bloom_filter_columns(mt.bloom_filter_columns.clone());
}
}

let tables: Vec<Arc<dyn TableTrait>> = tables
.into_iter()
.map(|t| Arc::new(t) as Arc<dyn TableTrait>)
Expand Down Expand Up @@ -111,6 +129,7 @@ pub struct Table {
schema: SchemaRef,
network: NetworkId,
sorted_by: BTreeSet<String>,
bloom_filter_columns: Vec<BloomFilterColumnConfig>,
}

impl Table {
Expand All @@ -128,13 +147,19 @@ impl Table {
schema,
network,
sorted_by,
bloom_filter_columns: Vec::new(),
}
}

/// Returns the network this raw table belongs to.
pub fn network_ref(&self) -> &NetworkId {
&self.network
}

/// Sets the bloom filter column configurations for this table.
pub fn set_bloom_filter_columns(&mut self, columns: Vec<BloomFilterColumnConfig>) {
self.bloom_filter_columns = columns;
}
}

impl TableTrait for Table {
Expand All @@ -153,4 +178,8 @@ impl TableTrait for Table {
fn sorted_by(&self) -> &BTreeSet<String> {
&self.sorted_by
}

fn bloom_filter_columns(&self) -> &[BloomFilterColumnConfig] {
&self.bloom_filter_columns
}
}
10 changes: 9 additions & 1 deletion crates/core/datasets-raw/src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@

use std::collections::BTreeMap;

use datasets_common::{block_num::BlockNum, manifest::TableSchema, network_id::NetworkId};
use datasets_common::{
block_num::BlockNum,
manifest::{BloomFilterColumnConfig, TableSchema},
network_id::NetworkId,
};

use crate::dataset_kind::{
EvmRpcDatasetKind, FirehoseDatasetKind, SolanaDatasetKind, TempoDatasetKind,
Expand Down Expand Up @@ -57,6 +61,10 @@ pub struct Table {
pub schema: TableSchema,
/// Network for this table.
pub network: NetworkId,
/// Columns to apply bloom filters to for row-group pruning.
/// Omit to disable bloom filters for this table.
#[serde(default)]
pub bloom_filter_columns: Vec<BloomFilterColumnConfig>,
Copy link
Copy Markdown
Contributor

@LNSD LNSD Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should validate that the columns in the vector match those in the table schema at manifest registration time

}

/// Schema generation types, gated behind the `schemars` feature.
Expand Down
3 changes: 0 additions & 3 deletions crates/core/worker-core/src/compaction/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ use super::algorithm::Overflow;
pub struct ParquetConfig {
/// Compression algorithm: zstd, lz4, gzip, brotli, snappy, uncompressed (default: zstd(1))
pub compression: Compression,
/// Enable bloom filters (default: false)
pub bloom_filters: bool,
/// Parquet metadata cache size in MB (default: 1024)
pub cache_size_mb: u64,
/// Max row group size in MB (default: 512)
Expand All @@ -26,7 +24,6 @@ impl Default for ParquetConfig {
fn default() -> Self {
Self {
compression: Compression::ZSTD(ZstdLevel::default()),
bloom_filters: false,
cache_size_mb: 1024, // 1 GB
max_row_group_mb: 512, // 512 MB
target_size: SizeLimitConfig::default(),
Expand Down
Loading
Loading