From ed27a73f94065bef3f9a53d40f51fb5a64bf00ab Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 18 Mar 2026 21:36:55 +0800 Subject: [PATCH 1/3] Refactor push down filter benchmarks to use dynamic sweep points --- .../core/benches/sql_planner_extended.rs | 212 +++++++++--------- 1 file changed, 112 insertions(+), 100 deletions(-) diff --git a/datafusion/core/benches/sql_planner_extended.rs b/datafusion/core/benches/sql_planner_extended.rs index d4955313c79c3..213577d6c24a2 100644 --- a/datafusion/core/benches/sql_planner_extended.rs +++ b/datafusion/core/benches/sql_planner_extended.rs @@ -27,12 +27,17 @@ use datafusion_expr::{cast, col, lit, not, try_cast, when}; use datafusion_functions::expr_fn::{ btrim, length, regexp_like, regexp_replace, to_timestamp, upper, }; +use std::env; use std::fmt::Write; use std::hint::black_box; use std::ops::Rem; use std::sync::Arc; use tokio::runtime::Runtime; +const FULL_PREDICATE_SWEEP: [usize; 5] = [10, 20, 30, 40, 60]; +const FULL_DEPTH_SWEEP: [usize; 3] = [1, 2, 3]; +const DEFAULT_SWEEP_POINTS: [(usize, usize); 3] = [(10, 1), (30, 2), (60, 3)]; + // This benchmark suite is designed to test the performance of // logical planning with a large plan containing unions, many columns // with a variety of operations in it. @@ -324,6 +329,27 @@ fn build_non_case_left_join_df_with_push_down_filter( rt.block_on(async { ctx.sql(&query).await.unwrap() }) } +fn include_full_push_down_filter_sweep() -> bool { + env::var("DATAFUSION_PUSH_DOWN_FILTER_FULL_SWEEP") + .map(|value| value == "1" || value.eq_ignore_ascii_case("true")) + .unwrap_or(false) +} + +fn push_down_filter_sweep_points() -> Vec<(usize, usize)> { + if include_full_push_down_filter_sweep() { + FULL_DEPTH_SWEEP + .into_iter() + .flat_map(|depth| { + FULL_PREDICATE_SWEEP + .into_iter() + .map(move |predicate_count| (predicate_count, depth)) + }) + .collect() + } else { + DEFAULT_SWEEP_POINTS.to_vec() + } +} + fn criterion_benchmark(c: &mut Criterion) { let baseline_ctx = SessionContext::new(); let case_heavy_ctx = SessionContext::new(); @@ -349,115 +375,101 @@ fn criterion_benchmark(c: &mut Criterion) { }) }); - let predicate_sweep = [10, 20, 30, 40, 60]; - let case_depth_sweep = [1, 2, 3]; + let sweep_points = push_down_filter_sweep_points(); let mut hotspot_group = c.benchmark_group("push_down_filter_hotspot_case_heavy_left_join_ab"); - for case_depth in case_depth_sweep { - for predicate_count in predicate_sweep { - let with_push_down_filter = - build_case_heavy_left_join_df_with_push_down_filter( - &rt, - predicate_count, - case_depth, - true, - ); - let without_push_down_filter = - build_case_heavy_left_join_df_with_push_down_filter( - &rt, - predicate_count, - case_depth, - false, - ); - - let input_label = - format!("predicates={predicate_count},case_depth={case_depth}"); - // A/B interpretation: - // - with_push_down_filter: default optimizer path (rule enabled) - // - without_push_down_filter: control path with the rule removed - // Compare both IDs at the same sweep point to isolate rule impact. - hotspot_group.bench_with_input( - BenchmarkId::new("with_push_down_filter", &input_label), - &with_push_down_filter, - |b, df| { - b.iter(|| { - let df_clone = df.clone(); - black_box( - rt.block_on(async { - df_clone.into_optimized_plan().unwrap() - }), - ); - }) - }, - ); - hotspot_group.bench_with_input( - BenchmarkId::new("without_push_down_filter", &input_label), - &without_push_down_filter, - |b, df| { - b.iter(|| { - let df_clone = df.clone(); - black_box( - rt.block_on(async { - df_clone.into_optimized_plan().unwrap() - }), - ); - }) - }, + hotspot_group.sample_size(10); + for &(predicate_count, case_depth) in &sweep_points { + let with_push_down_filter = build_case_heavy_left_join_df_with_push_down_filter( + &rt, + predicate_count, + case_depth, + true, + ); + let without_push_down_filter = + build_case_heavy_left_join_df_with_push_down_filter( + &rt, + predicate_count, + case_depth, + false, ); - } + + let input_label = format!("predicates={predicate_count},case_depth={case_depth}"); + // A/B interpretation: + // - with_push_down_filter: default optimizer path (rule enabled) + // - without_push_down_filter: control path with the rule removed + // Compare both IDs at the same sweep point to isolate rule impact. + hotspot_group.bench_with_input( + BenchmarkId::new("with_push_down_filter", &input_label), + &with_push_down_filter, + |b, df| { + b.iter(|| { + let df_clone = df.clone(); + black_box( + rt.block_on(async { df_clone.into_optimized_plan().unwrap() }), + ); + }) + }, + ); + hotspot_group.bench_with_input( + BenchmarkId::new("without_push_down_filter", &input_label), + &without_push_down_filter, + |b, df| { + b.iter(|| { + let df_clone = df.clone(); + black_box( + rt.block_on(async { df_clone.into_optimized_plan().unwrap() }), + ); + }) + }, + ); } hotspot_group.finish(); let mut control_group = c.benchmark_group("push_down_filter_control_non_case_left_join_ab"); - for nesting_depth in case_depth_sweep { - for predicate_count in predicate_sweep { - let with_push_down_filter = build_non_case_left_join_df_with_push_down_filter( - &rt, - predicate_count, - nesting_depth, - true, - ); - let without_push_down_filter = - build_non_case_left_join_df_with_push_down_filter( - &rt, - predicate_count, - nesting_depth, - false, - ); - - let input_label = - format!("predicates={predicate_count},nesting_depth={nesting_depth}"); - control_group.bench_with_input( - BenchmarkId::new("with_push_down_filter", &input_label), - &with_push_down_filter, - |b, df| { - b.iter(|| { - let df_clone = df.clone(); - black_box( - rt.block_on(async { - df_clone.into_optimized_plan().unwrap() - }), - ); - }) - }, - ); - control_group.bench_with_input( - BenchmarkId::new("without_push_down_filter", &input_label), - &without_push_down_filter, - |b, df| { - b.iter(|| { - let df_clone = df.clone(); - black_box( - rt.block_on(async { - df_clone.into_optimized_plan().unwrap() - }), - ); - }) - }, - ); - } + control_group.sample_size(10); + for &(predicate_count, nesting_depth) in &sweep_points { + let with_push_down_filter = build_non_case_left_join_df_with_push_down_filter( + &rt, + predicate_count, + nesting_depth, + true, + ); + let without_push_down_filter = build_non_case_left_join_df_with_push_down_filter( + &rt, + predicate_count, + nesting_depth, + false, + ); + + let input_label = + format!("predicates={predicate_count},nesting_depth={nesting_depth}"); + control_group.bench_with_input( + BenchmarkId::new("with_push_down_filter", &input_label), + &with_push_down_filter, + |b, df| { + b.iter(|| { + let df_clone = df.clone(); + black_box( + rt.block_on(async { df_clone.into_optimized_plan().unwrap() }), + ); + }) + }, + ); + control_group.bench_with_input( + BenchmarkId::new("without_push_down_filter", &input_label), + &without_push_down_filter, + |b, df| { + b.iter(|| { + let df_clone = df.clone(); + black_box( + rt.block_on(async { df_clone.into_optimized_plan().unwrap() }), + ); + }) + }, + ); } control_group.finish(); } From 5f10213fad1af08abba93f1f7d8d233a5aa7ff2d Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 18 Mar 2026 21:46:58 +0800 Subject: [PATCH 2/3] Add dynamic sample size configuration for push down filter benchmarks --- datafusion/core/benches/sql_planner_extended.rs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/datafusion/core/benches/sql_planner_extended.rs b/datafusion/core/benches/sql_planner_extended.rs index 213577d6c24a2..97431929ff53f 100644 --- a/datafusion/core/benches/sql_planner_extended.rs +++ b/datafusion/core/benches/sql_planner_extended.rs @@ -37,6 +37,7 @@ use tokio::runtime::Runtime; const FULL_PREDICATE_SWEEP: [usize; 5] = [10, 20, 30, 40, 60]; const FULL_DEPTH_SWEEP: [usize; 3] = [1, 2, 3]; const DEFAULT_SWEEP_POINTS: [(usize, usize); 3] = [(10, 1), (30, 2), (60, 3)]; +const DEFAULT_SAMPLE_SIZE: usize = 10; // This benchmark suite is designed to test the performance of // logical planning with a large plan containing unions, many columns @@ -350,6 +351,14 @@ fn push_down_filter_sweep_points() -> Vec<(usize, usize)> { } } +fn push_down_filter_sample_size() -> usize { + env::var("DATAFUSION_PUSH_DOWN_FILTER_SAMPLE_SIZE") + .ok() + .and_then(|value| value.parse::().ok()) + .filter(|sample_size| *sample_size >= 10) + .unwrap_or(DEFAULT_SAMPLE_SIZE) +} + fn criterion_benchmark(c: &mut Criterion) { let baseline_ctx = SessionContext::new(); let case_heavy_ctx = SessionContext::new(); @@ -376,10 +385,11 @@ fn criterion_benchmark(c: &mut Criterion) { }); let sweep_points = push_down_filter_sweep_points(); + let sample_size = push_down_filter_sample_size(); let mut hotspot_group = c.benchmark_group("push_down_filter_hotspot_case_heavy_left_join_ab"); - hotspot_group.sample_size(10); + hotspot_group.sample_size(sample_size); for &(predicate_count, case_depth) in &sweep_points { let with_push_down_filter = build_case_heavy_left_join_df_with_push_down_filter( &rt, @@ -429,7 +439,7 @@ fn criterion_benchmark(c: &mut Criterion) { let mut control_group = c.benchmark_group("push_down_filter_control_non_case_left_join_ab"); - control_group.sample_size(10); + control_group.sample_size(sample_size); for &(predicate_count, nesting_depth) in &sweep_points { let with_push_down_filter = build_non_case_left_join_df_with_push_down_filter( &rt, From 6cc60be20016a0674b255022e42fd5e3f638b679 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 18 Mar 2026 21:59:59 +0800 Subject: [PATCH 3/3] Remove unused sample size function and constant from push down filter benchmarks --- datafusion/core/benches/sql_planner_extended.rs | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/datafusion/core/benches/sql_planner_extended.rs b/datafusion/core/benches/sql_planner_extended.rs index 97431929ff53f..ccd7d3f3f031b 100644 --- a/datafusion/core/benches/sql_planner_extended.rs +++ b/datafusion/core/benches/sql_planner_extended.rs @@ -37,7 +37,6 @@ use tokio::runtime::Runtime; const FULL_PREDICATE_SWEEP: [usize; 5] = [10, 20, 30, 40, 60]; const FULL_DEPTH_SWEEP: [usize; 3] = [1, 2, 3]; const DEFAULT_SWEEP_POINTS: [(usize, usize); 3] = [(10, 1), (30, 2), (60, 3)]; -const DEFAULT_SAMPLE_SIZE: usize = 10; // This benchmark suite is designed to test the performance of // logical planning with a large plan containing unions, many columns @@ -351,14 +350,6 @@ fn push_down_filter_sweep_points() -> Vec<(usize, usize)> { } } -fn push_down_filter_sample_size() -> usize { - env::var("DATAFUSION_PUSH_DOWN_FILTER_SAMPLE_SIZE") - .ok() - .and_then(|value| value.parse::().ok()) - .filter(|sample_size| *sample_size >= 10) - .unwrap_or(DEFAULT_SAMPLE_SIZE) -} - fn criterion_benchmark(c: &mut Criterion) { let baseline_ctx = SessionContext::new(); let case_heavy_ctx = SessionContext::new(); @@ -385,11 +376,8 @@ fn criterion_benchmark(c: &mut Criterion) { }); let sweep_points = push_down_filter_sweep_points(); - let sample_size = push_down_filter_sample_size(); - let mut hotspot_group = c.benchmark_group("push_down_filter_hotspot_case_heavy_left_join_ab"); - hotspot_group.sample_size(sample_size); for &(predicate_count, case_depth) in &sweep_points { let with_push_down_filter = build_case_heavy_left_join_df_with_push_down_filter( &rt, @@ -439,7 +427,6 @@ fn criterion_benchmark(c: &mut Criterion) { let mut control_group = c.benchmark_group("push_down_filter_control_non_case_left_join_ab"); - control_group.sample_size(sample_size); for &(predicate_count, nesting_depth) in &sweep_points { let with_push_down_filter = build_non_case_left_join_df_with_push_down_filter( &rt,