From 580db28bdf13f62d16c429d0ed751b8ab31a2d08 Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Thu, 26 Mar 2026 12:20:02 -0600 Subject: [PATCH] feat(common): inject _block_num statistics into physical plan Override _block_num column min/max on the Statistics passed to FileScanConfig in scan(), using synced_range. This is where DataFusion's AggregateStatistics optimizer reads from, enabling it to resolve MIN/MAX(_block_num) as constants without scanning parquet files. --- .../common/src/catalog/physical/snapshot.rs | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/crates/core/common/src/catalog/physical/snapshot.rs b/crates/core/common/src/catalog/physical/snapshot.rs index bc36cf9f1..dfb12f860 100644 --- a/crates/core/common/src/catalog/physical/snapshot.rs +++ b/crates/core/common/src/catalog/physical/snapshot.rs @@ -5,7 +5,7 @@ use amp_parquet::reader; use datafusion::{ arrow::datatypes::SchemaRef, catalog::{Session, memory::DataSourceExec}, - common::{DFSchema, project_schema, stats::Precision}, + common::{DFSchema, ScalarValue, project_schema, stats::Precision}, datasource::{ TableProvider, TableType, create_ordering, listing::{ListingTableUrl, PartitionedFile}, @@ -342,9 +342,24 @@ impl TableProvider for QueryableSnapshot { let target_partitions = state.config_options().execution.target_partitions; let table_schema = self.physical_table.schema(); - let (file_groups, statistics) = self + let (file_groups, mut statistics) = self .resolve_file_groups(&segments, target_partitions, table_schema.clone()) .await?; + + // Override _block_num column statistics with exact min/max from synced_range. + // This enables the AggregateStatistics optimizer to resolve MIN/MAX(_block_num) + // as constants without scanning parquet files. + if let Some(range) = &self.synced_range + && let Ok(idx) = + table_schema.index_of(datasets_common::block_num::RESERVED_BLOCK_NUM_COLUMN_NAME) + { + statistics.column_statistics[idx].null_count = Precision::Exact(0); + statistics.column_statistics[idx].min_value = + Precision::Exact(ScalarValue::UInt64(Some(range.start()))); + statistics.column_statistics[idx].max_value = + Precision::Exact(ScalarValue::UInt64(Some(range.end()))); + } + if statistics.num_rows == Precision::Absent { tracing::warn!("Table has no row count statistics. Queries may be inefficient."); }