diff --git a/datafusion/catalog/src/memory/table.rs b/datafusion/catalog/src/memory/table.rs index 9b91062657a07..5c240ba33ceda 100644 --- a/datafusion/catalog/src/memory/table.rs +++ b/datafusion/catalog/src/memory/table.rs @@ -32,7 +32,7 @@ use arrow::compute::{and, filter_record_batch}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::error::Result; -use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_common::tree_node::{Transformed, TreeNodeRecursion}; use datafusion_common::{Constraints, DFSchema, SchemaExt, not_impl_err, plan_err}; use datafusion_common_runtime::JoinSet; use datafusion_datasource::memory::{MemSink, MemorySourceConfig}; @@ -46,8 +46,8 @@ use datafusion_physical_expr::{ use datafusion_physical_plan::repartition::RepartitionExec; use datafusion_physical_plan::stream::RecordBatchStreamAdapter; use datafusion_physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning, - PhysicalExpr, PlanProperties, common, + DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, MappedExpr, + Partitioning, PhysicalExpr, PlanProperties, common, }; use datafusion_session::Session; @@ -643,4 +643,11 @@ impl ExecutionPlan for DmlResultExec { ) -> Result { Ok(TreeNodeRecursion::Continue) } + + fn map_expressions( + self: Arc, + _f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + Ok(Transformed::no(self)) + } } diff --git a/datafusion/datasource/src/sink.rs b/datafusion/datasource/src/sink.rs index 155c951fe5756..de93869f0de22 100644 --- a/datafusion/datasource/src/sink.rs +++ b/datafusion/datasource/src/sink.rs @@ -24,7 +24,7 @@ use std::sync::Arc; use arrow::array::{ArrayRef, RecordBatch, UInt64Array}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_common::tree_node::{Transformed, TreeNodeRecursion}; use datafusion_common::{Result, assert_eq_or_internal_err}; use datafusion_execution::TaskContext; use datafusion_physical_expr::{Distribution, EquivalenceProperties, PhysicalExpr}; @@ -32,8 +32,9 @@ use datafusion_physical_expr_common::sort_expr::{LexRequirement, OrderingRequire use datafusion_physical_plan::metrics::MetricsSet; use datafusion_physical_plan::stream::RecordBatchStreamAdapter; use datafusion_physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning, - PlanProperties, SendableRecordBatchStream, execute_input_stream, + DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, MappedExpr, + Partitioning, PlanProperties, RecomputePropertiesBehavior, SendableRecordBatchStream, + execute_input_stream, }; use async_trait::async_trait; @@ -233,6 +234,53 @@ impl ExecutionPlan for DataSinkExec { Ok(TreeNodeRecursion::Continue) } + fn map_expressions( + self: Arc, + f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + let Some(sort_order) = &self.sort_order else { + return Ok(Transformed::no(self)); + }; + + let mut any_transformed = false; + let mut any_recompute = false; + let mut new_reqs = Vec::with_capacity(sort_order.len()); + for req in sort_order.iter() { + let mapped = f(Arc::clone(&req.expr))?; + if mapped.expr.transformed { + any_transformed = true; + } + if mapped.recompute == RecomputePropertiesBehavior::Recompute { + any_recompute = true; + } + new_reqs.push( + datafusion_physical_expr_common::sort_expr::PhysicalSortRequirement { + expr: mapped.expr.data, + options: req.options, + }, + ); + } + + if !any_transformed { + return Ok(Transformed::no(self)); + } + + let new_sort_order = LexRequirement::new(new_reqs); + let new_node = if any_recompute { + DataSinkExec::new( + Arc::clone(&self.input), + Arc::clone(&self.sink), + new_sort_order, + ) + } else { + DataSinkExec { + sort_order: new_sort_order, + ..(*self).clone() + } + }; + Ok(Transformed::yes(Arc::new(new_node) as _)) + } + /// Execute the plan and return a stream of `RecordBatch`es for /// the specified partition. fn execute( diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index bffd94af210a7..e82f8b50fa771 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -31,13 +31,13 @@ use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use datafusion_physical_plan::projection::ProjectionExec; use datafusion_physical_plan::stream::BatchSplitStream; use datafusion_physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, + DisplayAs, DisplayFormatType, ExecutionPlan, MappedExpr, PlanProperties, }; use itertools::Itertools; use crate::file_scan_config::FileScanConfig; use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_common::tree_node::{Transformed, TreeNodeRecursion}; use datafusion_common::{Constraints, Result, Statistics}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr}; @@ -303,6 +303,13 @@ impl ExecutionPlan for DataSourceExec { self.data_source.apply_expressions(f) } + fn map_expressions( + self: Arc, + _f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + Ok(Transformed::no(self)) + } + fn with_new_children( self: Arc, _: Vec>, diff --git a/datafusion/ffi/src/execution_plan.rs b/datafusion/ffi/src/execution_plan.rs index 064e4a895317b..d4296476c48a3 100644 --- a/datafusion/ffi/src/execution_plan.rs +++ b/datafusion/ffi/src/execution_plan.rs @@ -21,11 +21,12 @@ use std::sync::Arc; use abi_stable::StableAbi; use abi_stable::std_types::{RString, RVec}; +use datafusion_common::tree_node::Transformed; use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{DataFusionError, Result}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, + DisplayAs, DisplayFormatType, ExecutionPlan, MappedExpr, PlanProperties, }; use tokio::runtime::Handle; @@ -310,6 +311,15 @@ impl ExecutionPlan for ForeignExecutionPlan { } Ok(tnr) } + + fn map_expressions( + self: Arc, + _f: &mut dyn FnMut( + Arc, + ) -> Result, + ) -> Result>> { + Ok(Transformed::no(self)) + } } #[cfg(test)] @@ -400,6 +410,15 @@ pub(crate) mod tests { } Ok(tnr) } + + fn map_expressions( + self: Arc, + _f: &mut dyn FnMut( + Arc, + ) -> Result, + ) -> Result>> { + Ok(Transformed::no(self)) + } } #[test] diff --git a/datafusion/physical-optimizer/src/output_requirements.rs b/datafusion/physical-optimizer/src/output_requirements.rs index 8b71fc9fbf74a..d73d85793f2bc 100644 --- a/datafusion/physical-optimizer/src/output_requirements.rs +++ b/datafusion/physical-optimizer/src/output_requirements.rs @@ -41,8 +41,8 @@ use datafusion_physical_plan::projection::{ use datafusion_physical_plan::sorts::sort::SortExec; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use datafusion_physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties, - SendableRecordBatchStream, + DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, MappedExpr, + PlanProperties, SendableRecordBatchStream, }; /// This rule either adds or removes [`OutputRequirements`]s to/from the physical @@ -328,6 +328,15 @@ impl ExecutionPlan for OutputRequirementExec { Ok(tnr) } + + fn map_expressions( + self: Arc, + _f: &mut dyn FnMut( + Arc, + ) -> Result, + ) -> Result>> { + Ok(Transformed::no(self)) + } } impl PhysicalOptimizerRule for OutputRequirements { diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 42df1a8b07cd4..f6fd0850f1eb2 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -32,8 +32,9 @@ use crate::filter_pushdown::{ }; use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::{ - DisplayFormatType, Distribution, ExecutionPlan, InputOrderMode, - SendableRecordBatchStream, Statistics, check_if_same_properties, + DisplayFormatType, Distribution, ExecutionPlan, InputOrderMode, MappedExpr, + RecomputePropertiesBehavior, SendableRecordBatchStream, Statistics, + check_if_same_properties, }; use datafusion_common::config::ConfigOptions; use datafusion_physical_expr::utils::collect_columns; @@ -45,7 +46,7 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_schema::FieldRef; use datafusion_common::stats::Precision; -use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_common::tree_node::{Transformed, TreeNodeRecursion}; use datafusion_common::{ Constraint, Constraints, Result, ScalarValue, assert_eq_or_internal_err, not_impl_err, }; @@ -1414,6 +1415,123 @@ impl ExecutionPlan for AggregateExec { Ok(tnr) } + fn map_expressions( + self: Arc, + f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + let mut any_transformed = false; + let mut any_recompute = false; + + // Map group_by expressions + let mut new_group_exprs: Vec<(Arc, String)> = + Vec::with_capacity(self.group_by.expr.len()); + for (expr, alias) in self.group_by.expr.iter() { + let mapped = f(Arc::clone(expr))?; + if mapped.expr.transformed { + any_transformed = true; + } + if mapped.recompute == RecomputePropertiesBehavior::Recompute { + any_recompute = true; + } + new_group_exprs.push((mapped.expr.data, alias.clone())); + } + + // Map aggr_expr (args + order_by expressions) + let mut new_aggr_exprs: Vec> = + Vec::with_capacity(self.aggr_expr.len()); + for aggr in self.aggr_expr.iter() { + let all = aggr.all_expressions(); + let mut new_args: Vec> = + Vec::with_capacity(all.args.len()); + let mut new_order_bys: Vec> = + Vec::with_capacity(all.order_by_exprs.len()); + for arg in all.args { + let mapped = f(arg)?; + if mapped.expr.transformed { + any_transformed = true; + } + if mapped.recompute == RecomputePropertiesBehavior::Recompute { + any_recompute = true; + } + new_args.push(mapped.expr.data); + } + for ob_expr in all.order_by_exprs { + let mapped = f(ob_expr)?; + if mapped.expr.transformed { + any_transformed = true; + } + if mapped.recompute == RecomputePropertiesBehavior::Recompute { + any_recompute = true; + } + new_order_bys.push(mapped.expr.data); + } + let new_aggr = aggr + .with_new_expressions(new_args, new_order_bys) + .unwrap_or_else(|| (**aggr).clone()); + new_aggr_exprs.push(Arc::new(new_aggr)); + } + + // Map filter expressions (FILTER WHERE clauses) + let mut new_filter_exprs: Vec>> = + Vec::with_capacity(self.filter_expr.len()); + for filter_opt in self.filter_expr.iter() { + match filter_opt { + None => new_filter_exprs.push(None), + Some(filter) => { + let mapped = f(Arc::clone(filter))?; + if mapped.expr.transformed { + any_transformed = true; + } + if mapped.recompute == RecomputePropertiesBehavior::Recompute { + any_recompute = true; + } + new_filter_exprs.push(Some(mapped.expr.data)); + } + } + } + + if !any_transformed { + return Ok(Transformed::no(self)); + } + + let new_group_by = PhysicalGroupBy::new( + new_group_exprs, + self.group_by.null_expr.clone(), + self.group_by.groups.clone(), + self.group_by.has_grouping_set, + ); + + if any_recompute { + let mut new_node = AggregateExec::try_new( + self.mode, + new_group_by, + new_aggr_exprs, + new_filter_exprs, + Arc::clone(&self.input), + Arc::clone(&self.input_schema), + )?; + new_node.limit_options = self.limit_options; + Ok(Transformed::yes(Arc::new(new_node) as _)) + } else { + let new_node = AggregateExec { + mode: self.mode, + group_by: Arc::new(new_group_by), + aggr_expr: new_aggr_exprs.into(), + filter_expr: new_filter_exprs.into(), + limit_options: self.limit_options, + input: Arc::clone(&self.input), + schema: Arc::clone(&self.schema), + input_schema: Arc::clone(&self.input_schema), + metrics: ExecutionPlanMetricsSet::new(), + required_input_ordering: self.required_input_ordering.clone(), + input_order_mode: self.input_order_mode.clone(), + cache: Arc::clone(&self.cache), + dynamic_filter: self.dynamic_filter.clone(), + }; + Ok(Transformed::yes(Arc::new(new_node) as _)) + } + } + fn with_new_children( self: Arc, children: Vec>, @@ -2518,6 +2636,13 @@ mod tests { Ok(TreeNodeRecursion::Continue) } + fn map_expressions( + self: Arc, + _f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + Ok(Transformed::no(self)) + } + fn with_new_children( self: Arc, _: Vec>, diff --git a/datafusion/physical-plan/src/analyze.rs b/datafusion/physical-plan/src/analyze.rs index 4aa78055daee3..1a9b1f805e687 100644 --- a/datafusion/physical-plan/src/analyze.rs +++ b/datafusion/physical-plan/src/analyze.rs @@ -27,11 +27,11 @@ use super::{ }; use crate::display::DisplayableExecutionPlan; use crate::metrics::MetricType; -use crate::{DisplayFormatType, ExecutionPlan, Partitioning}; +use crate::{DisplayFormatType, ExecutionPlan, MappedExpr, Partitioning}; use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch}; use datafusion_common::instant::Instant; -use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_common::tree_node::{Transformed, TreeNodeRecursion}; use datafusion_common::{DataFusionError, Result, assert_eq_or_internal_err}; use datafusion_execution::TaskContext; use datafusion_physical_expr::EquivalenceProperties; @@ -152,6 +152,13 @@ impl ExecutionPlan for AnalyzeExec { Ok(TreeNodeRecursion::Continue) } + fn map_expressions( + self: Arc, + _f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + Ok(Transformed::no(self)) + } + fn with_new_children( self: Arc, mut children: Vec>, diff --git a/datafusion/physical-plan/src/async_func.rs b/datafusion/physical-plan/src/async_func.rs index abfe870f52665..066166d3adf26 100644 --- a/datafusion/physical-plan/src/async_func.rs +++ b/datafusion/physical-plan/src/async_func.rs @@ -19,8 +19,8 @@ use crate::coalesce::LimitedBatchCoalescer; use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::stream::RecordBatchStreamAdapter; use crate::{ - DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties, - check_if_same_properties, + DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, MappedExpr, + PlanProperties, check_if_same_properties, }; use arrow::array::RecordBatch; use arrow_schema::{Fields, Schema, SchemaRef}; @@ -177,6 +177,13 @@ impl ExecutionPlan for AsyncFuncExec { Ok(TreeNodeRecursion::Continue) } + fn map_expressions( + self: Arc, + _f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + Ok(Transformed::no(self)) + } + fn with_new_children( self: Arc, mut children: Vec>, diff --git a/datafusion/physical-plan/src/buffer.rs b/datafusion/physical-plan/src/buffer.rs index 3e85fb32d2f2c..315497af43f94 100644 --- a/datafusion/physical-plan/src/buffer.rs +++ b/datafusion/physical-plan/src/buffer.rs @@ -26,12 +26,12 @@ use crate::filter_pushdown::{ use crate::projection::ProjectionExec; use crate::stream::RecordBatchStreamAdapter; use crate::{ - DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SortOrderPushdownResult, - check_if_same_properties, + DisplayAs, DisplayFormatType, ExecutionPlan, MappedExpr, PlanProperties, + SortOrderPushdownResult, check_if_same_properties, }; use arrow::array::RecordBatch; use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_common::tree_node::{Transformed, TreeNodeRecursion}; use datafusion_common::{Result, Statistics, internal_err, plan_err}; use datafusion_common_runtime::SpawnedTask; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; @@ -180,6 +180,13 @@ impl ExecutionPlan for BufferExec { Ok(TreeNodeRecursion::Continue) } + fn map_expressions( + self: Arc, + _f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + Ok(Transformed::no(self)) + } + fn with_new_children( self: Arc, mut children: Vec>, diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 3e8bfc7f81724..97431aa0362e0 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -26,14 +26,14 @@ use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use super::{DisplayAs, ExecutionPlanProperties, PlanProperties, Statistics}; use crate::projection::ProjectionExec; use crate::{ - DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, - check_if_same_properties, + DisplayFormatType, ExecutionPlan, MappedExpr, RecordBatchStream, + SendableRecordBatchStream, check_if_same_properties, }; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion_common::Result; -use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_common::tree_node::{Transformed, TreeNodeRecursion}; use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalExpr; @@ -195,6 +195,13 @@ impl ExecutionPlan for CoalesceBatchesExec { Ok(TreeNodeRecursion::Continue) } + fn map_expressions( + self: Arc, + _f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + Ok(Transformed::no(self)) + } + fn with_new_children( self: Arc, mut children: Vec>, diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index 5ea3589f22b3e..f8825d595ddc3 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -31,11 +31,13 @@ use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType}; use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase}; use crate::projection::{ProjectionExec, make_with_child}; use crate::sort_pushdown::SortOrderPushdownResult; -use crate::{DisplayFormatType, ExecutionPlan, Partitioning, check_if_same_properties}; +use crate::{ + DisplayFormatType, ExecutionPlan, MappedExpr, Partitioning, check_if_same_properties, +}; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_common::tree_node::{Transformed, TreeNodeRecursion}; use datafusion_common::{Result, assert_eq_or_internal_err, internal_err}; use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalExpr; @@ -166,6 +168,13 @@ impl ExecutionPlan for CoalescePartitionsExec { Ok(TreeNodeRecursion::Continue) } + fn map_expressions( + self: Arc, + _f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + Ok(Transformed::no(self)) + } + fn with_new_children( self: Arc, mut children: Vec>, diff --git a/datafusion/physical-plan/src/coop.rs b/datafusion/physical-plan/src/coop.rs index efe6506edd7bd..3c1918f256306 100644 --- a/datafusion/physical-plan/src/coop.rs +++ b/datafusion/physical-plan/src/coop.rs @@ -71,7 +71,7 @@ //! that report [`SchedulingType::NonCooperative`] in their [plan properties](ExecutionPlan::properties). use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_common::tree_node::{Transformed, TreeNodeRecursion}; use datafusion_physical_expr::PhysicalExpr; #[cfg(datafusion_coop = "tokio_fallback")] use futures::Future; @@ -87,8 +87,9 @@ use crate::filter_pushdown::{ }; use crate::projection::ProjectionExec; use crate::{ - DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, RecordBatchStream, - SendableRecordBatchStream, SortOrderPushdownResult, check_if_same_properties, + DisplayAs, DisplayFormatType, ExecutionPlan, MappedExpr, PlanProperties, + RecordBatchStream, SendableRecordBatchStream, SortOrderPushdownResult, + check_if_same_properties, }; use arrow::record_batch::RecordBatch; use arrow_schema::Schema; @@ -289,6 +290,13 @@ impl ExecutionPlan for CooperativeExec { Ok(TreeNodeRecursion::Continue) } + fn map_expressions( + self: Arc, + _f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + Ok(Transformed::no(self)) + } + fn with_new_children( self: Arc, mut children: Vec>, diff --git a/datafusion/physical-plan/src/display.rs b/datafusion/physical-plan/src/display.rs index aaf83345d99b8..102df5793827b 100644 --- a/datafusion/physical-plan/src/display.rs +++ b/datafusion/physical-plan/src/display.rs @@ -1122,12 +1122,13 @@ mod tests { use std::sync::Arc; use datafusion_common::{ - Result, Statistics, internal_datafusion_err, tree_node::TreeNodeRecursion, + Result, Statistics, internal_datafusion_err, + tree_node::{Transformed, TreeNodeRecursion}, }; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_physical_expr::PhysicalExpr; - use crate::{DisplayAs, ExecutionPlan, PlanProperties}; + use crate::{DisplayAs, ExecutionPlan, MappedExpr, PlanProperties}; use super::DisplayableExecutionPlan; @@ -1179,6 +1180,13 @@ mod tests { Ok(TreeNodeRecursion::Continue) } + fn map_expressions( + self: Arc, + _f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + Ok(Transformed::no(self)) + } + fn execute( &self, _: usize, diff --git a/datafusion/physical-plan/src/empty.rs b/datafusion/physical-plan/src/empty.rs index 078bc4b8d064b..2c481f9769f93 100644 --- a/datafusion/physical-plan/src/empty.rs +++ b/datafusion/physical-plan/src/empty.rs @@ -23,14 +23,14 @@ use std::sync::Arc; use crate::memory::MemoryStream; use crate::{DisplayAs, PlanProperties, SendableRecordBatchStream, Statistics}; use crate::{ - DisplayFormatType, ExecutionPlan, Partitioning, + DisplayFormatType, ExecutionPlan, MappedExpr, Partitioning, execution_plan::{Boundedness, EmissionType}, }; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion_common::stats::Precision; -use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_common::tree_node::{Transformed, TreeNodeRecursion}; use datafusion_common::{ColumnStatistics, Result, ScalarValue, assert_or_internal_err}; use datafusion_execution::TaskContext; use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr}; @@ -131,6 +131,13 @@ impl ExecutionPlan for EmptyExec { Ok(TreeNodeRecursion::Continue) } + fn map_expressions( + self: Arc, + _f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + Ok(Transformed::no(self)) + } + fn with_new_children( self: Arc, _: Vec>, diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 8df33452e096d..25a7c48b046f1 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -65,6 +65,45 @@ use datafusion_physical_expr_common::sort_expr::{ use futures::stream::{StreamExt, TryStreamExt}; +/// Controls whether plan properties (schema, ordering, partitioning) are +/// recomputed after expressions are replaced by [`ExecutionPlan::map_expressions`]. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RecomputePropertiesBehavior { + /// Recompute plan properties after expression replacement. + /// Use this when the transformation may change the semantics of the expressions + /// (e.g. changing column references, output types, sort keys). + Recompute, + /// Reuse the existing cached plan properties without recomputation. + /// Use this when the transformation preserves the plan's semantics + /// (e.g. substituting placeholder expressions with concrete values). + /// The caller is responsible for ensuring that properties remain valid. + Preserve, +} + +/// Return type of the closure passed to [`ExecutionPlan::map_expressions`]. +/// Wraps a transformed expression together with a directive on whether the +/// plan's cached properties should be recomputed after the replacement. +pub struct MappedExpr { + pub expr: Transformed>, + pub recompute: RecomputePropertiesBehavior, +} + +impl MappedExpr { + pub fn recompute(expr: Transformed>) -> Self { + Self { + expr, + recompute: RecomputePropertiesBehavior::Recompute, + } + } + + pub fn preserve(expr: Transformed>) -> Self { + Self { + expr, + recompute: RecomputePropertiesBehavior::Preserve, + } + } +} + /// Represent nodes in the DataFusion Physical Plan. /// /// Calling [`execute`] produces an `async` [`SendableRecordBatchStream`] of @@ -281,6 +320,114 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, ) -> Result; + /// Apply a transformation to all expressions owned directly by this node, + /// returning a new node with the transformed expressions. + /// + /// This is the write counterpart to [`Self::apply_expressions`]. Like + /// `apply_expressions`, this only visits expressions directly owned by the + /// node, not expressions in child nodes. + /// + /// The closure receives each expression and returns a [`MappedExpr`] that + /// contains the (possibly transformed) expression and a + /// [`RecomputePropertiesBehavior`] directive. If **any** expression signals + /// [`RecomputePropertiesBehavior::Recompute`], the implementor must + /// recompute its [`PlanProperties`] (schema, ordering, partitioning). + /// Otherwise the existing cached properties may be reused. + /// + /// Use [`RecomputePropertiesBehavior::Preserve`] when the transformation + /// does not change which columns are referenced or their types (e.g. + /// substituting a placeholder `$1` with a concrete value `42`). + /// + /// Use [`RecomputePropertiesBehavior::Recompute`] when the transformation + /// may change the semantics of the expression in a way that affects plan + /// properties (e.g. replacing a sort key column reference `a` with `b`, + /// which invalidates the cached output ordering). + /// + /// Returns [`Transformed::no`] if no expression was changed, or + /// [`Transformed::yes`] with the new node if any expression was replaced. + /// + /// # Example Usage + /// + /// ``` + /// # use std::sync::Arc; + /// # use datafusion_physical_plan::{ExecutionPlan, MappedExpr, RecomputePropertiesBehavior}; + /// # use datafusion_common::tree_node::Transformed; + /// # fn example(plan: Arc) -> datafusion_common::Result<()> { + /// // Substitute placeholder expressions, preserving plan properties. + /// let result = plan.map_expressions(&mut |expr| { + /// Ok(MappedExpr::preserve(Transformed::no(expr))) + /// })?; + /// # Ok(()) + /// # } + /// ``` + /// + /// See also the unit tests in this module for examples of the `Preserve` + /// and `Recompute` paths: + /// [`test_map_expressions_preserve_reuses_cache`] and + /// [`test_map_expressions_recompute_produces_new_cache`]. + /// + /// # Implementation Examples + /// + /// ## Node with no expressions (e.g., EmptyExec, CoalescePartitionsExec) + /// ```ignore + /// fn map_expressions( + /// self: Arc, + /// _f: &mut dyn FnMut(Arc) -> Result, + /// ) -> Result>> { + /// Ok(Transformed::no(self)) + /// } + /// ``` + /// + /// ## Node with a single expression (e.g., FilterExec) + /// ```ignore + /// fn map_expressions( + /// self: Arc, + /// f: &mut dyn FnMut(Arc) -> Result, + /// ) -> Result>> { + /// let mapped = f(Arc::clone(&self.predicate))?; + /// if !mapped.expr.transformed { + /// return Ok(Transformed::no(self)); + /// } + /// if mapped.recompute == RecomputePropertiesBehavior::Recompute { + /// // rebuild node using constructor, which calls compute_properties + /// Ok(Transformed::yes(Arc::new(MyExec::new(mapped.expr.data, self.input)?))) + /// } else { + /// // reuse cached properties + /// Ok(Transformed::yes(Arc::new(MyExec { predicate: mapped.expr.data, ..(*self).clone() }))) + /// } + /// } + /// ``` + /// + /// ## Node with multiple expressions (e.g., SortExec, ProjectionExec) + /// ```ignore + /// fn map_expressions( + /// self: Arc, + /// f: &mut dyn FnMut(Arc) -> Result, + /// ) -> Result>> { + /// let mut any_transformed = false; + /// let mut any_recompute = false; + /// let mut new_exprs = Vec::with_capacity(self.exprs.len()); + /// for expr in &self.exprs { + /// let mapped = f(Arc::clone(expr))?; + /// any_transformed |= mapped.expr.transformed; + /// any_recompute |= mapped.recompute == RecomputePropertiesBehavior::Recompute; + /// new_exprs.push(mapped.expr.data); + /// } + /// if !any_transformed { + /// return Ok(Transformed::no(self)); + /// } + /// if any_recompute { + /// Ok(Transformed::yes(Arc::new(MyExec::new(new_exprs, self.input)?))) + /// } else { + /// Ok(Transformed::yes(Arc::new(MyExec { exprs: new_exprs, cache: Arc::clone(&self.cache), ..(*self).clone() }))) + /// } + /// } + /// ``` + fn map_expressions( + self: Arc, + f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>>; + /// Returns a new `ExecutionPlan` where all existing children were replaced /// by the `children`, in order fn with_new_children( @@ -1639,6 +1786,13 @@ mod tests { Ok(TreeNodeRecursion::Continue) } + fn map_expressions( + self: Arc, + _f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + Ok(Transformed::no(self)) + } + fn execute( &self, _partition: usize, @@ -1705,6 +1859,13 @@ mod tests { Ok(TreeNodeRecursion::Continue) } + fn map_expressions( + self: Arc, + _f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + Ok(Transformed::no(self)) + } + fn with_new_children( self: Arc, _: Vec>, @@ -1729,10 +1890,25 @@ mod tests { } /// A test node that holds a fixed list of expressions, used to test - /// `apply_expressions` behavior. + /// `apply_expressions` and `map_expressions` behavior. #[derive(Debug)] struct MultiExprExec { exprs: Vec>, + cache: Arc, + } + + impl MultiExprExec { + fn new(exprs: Vec>) -> Self { + Self { + exprs, + cache: Arc::new(PlanProperties::new( + EquivalenceProperties::new(Arc::new(Schema::empty())), + Partitioning::UnknownPartitioning(1), + EmissionType::Final, + Boundedness::Bounded, + )), + } + } } impl DisplayAs for MultiExprExec { @@ -1754,8 +1930,8 @@ mod tests { self } - fn properties(&self) -> &std::sync::Arc { - unimplemented!() + fn properties(&self) -> &Arc { + &self.cache } fn children(&self) -> Vec<&Arc> { @@ -1780,6 +1956,42 @@ mod tests { Ok(tnr) } + fn map_expressions( + self: Arc, + f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + let mut any_transformed = false; + let mut any_recompute = false; + let mut new_exprs = Vec::with_capacity(self.exprs.len()); + for expr in &self.exprs { + let mapped = f(Arc::clone(expr))?; + if mapped.expr.transformed { + any_transformed = true; + } + if mapped.recompute == RecomputePropertiesBehavior::Recompute { + any_recompute = true; + } + new_exprs.push(mapped.expr.data); + } + if !any_transformed { + return Ok(Transformed::no(self)); + } + let new_node = if any_recompute { + // Recompute properties — in a real node this would re-derive + // schema, ordering, partitioning etc. from the new expressions. + // Here we just produce a fresh stub to show the path is taken. + MultiExprExec::new(new_exprs) + } else { + // Preserve properties — reuse the existing cache since the + // transformation does not affect plan properties. + MultiExprExec { + exprs: new_exprs, + cache: Arc::clone(&self.cache), + } + }; + Ok(Transformed::yes(Arc::new(new_node) as _)) + } + fn execute( &self, _partition: usize, @@ -1807,9 +2019,7 @@ mod tests { /// `apply_expressions` visits all expressions when `f` always returns `Continue`. #[test] fn test_apply_expressions_continue_visits_all() -> Result<()> { - let plan = MultiExprExec { - exprs: vec![lit_expr(1), lit_expr(2), lit_expr(3)], - }; + let plan = MultiExprExec::new(vec![lit_expr(1), lit_expr(2), lit_expr(3)]); let mut visited = 0usize; plan.apply_expressions(&mut |_expr| { visited += 1; @@ -1821,9 +2031,7 @@ mod tests { #[test] fn test_apply_expressions_stop_halts_early() -> Result<()> { - let plan = MultiExprExec { - exprs: vec![lit_expr(1), lit_expr(2), lit_expr(3)], - }; + let plan = MultiExprExec::new(vec![lit_expr(1), lit_expr(2), lit_expr(3)]); let mut visited = 0usize; let tnr = plan.apply_expressions(&mut |_expr| { visited += 1; @@ -1835,6 +2043,71 @@ mod tests { Ok(()) } + #[test] + fn test_map_expressions_no_change() -> Result<()> { + let plan: Arc = Arc::new(MultiExprExec::new(vec![ + lit_expr(1), + lit_expr(2), + lit_expr(3), + ])); + let result = plan.map_expressions(&mut |expr| { + Ok(MappedExpr::preserve(Transformed::no(expr))) + })?; + assert!(!result.transformed); + Ok(()) + } + + #[test] + fn test_map_expressions_with_change() -> Result<()> { + let plan: Arc = + Arc::new(MultiExprExec::new(vec![lit_expr(1), lit_expr(2)])); + // Replace every expression with lit(99) + let result = plan.map_expressions(&mut |_expr| { + Ok(MappedExpr::preserve(Transformed::yes(lit_expr(99)))) + })?; + assert!(result.transformed); + // The new node should have 2 expressions, both lit(99) + let mut visited = 0usize; + result.data.apply_expressions(&mut |_| { + visited += 1; + Ok(TreeNodeRecursion::Continue) + })?; + assert_eq!(visited, 2); + Ok(()) + } + + /// `map_expressions` with `Preserve` reuses the existing cache pointer. + #[test] + fn test_map_expressions_preserve_reuses_cache() -> Result<()> { + let plan: Arc = + Arc::new(MultiExprExec::new(vec![lit_expr(1)])); + let original_cache = Arc::clone(plan.properties()); + + let result = plan.map_expressions(&mut |_expr| { + Ok(MappedExpr::preserve(Transformed::yes(lit_expr(99)))) + })?; + assert!(result.transformed); + // Cache pointer is the same — no recomputation occurred. + assert!(Arc::ptr_eq(&original_cache, result.data.properties())); + Ok(()) + } + + /// `map_expressions` with `Recompute` produces a fresh cache. + #[test] + fn test_map_expressions_recompute_produces_new_cache() -> Result<()> { + let plan: Arc = + Arc::new(MultiExprExec::new(vec![lit_expr(1)])); + let original_cache = Arc::clone(plan.properties()); + + let result = plan.map_expressions(&mut |_expr| { + Ok(MappedExpr::recompute(Transformed::yes(lit_expr(99)))) + })?; + assert!(result.transformed); + // Cache pointer differs — properties were recomputed. + assert!(!Arc::ptr_eq(&original_cache, result.data.properties())); + Ok(()) + } + #[test] fn test_execution_plan_name() { let schema1 = Arc::new(Schema::empty()); diff --git a/datafusion/physical-plan/src/explain.rs b/datafusion/physical-plan/src/explain.rs index fa684f3483a83..0bbefb32da101 100644 --- a/datafusion/physical-plan/src/explain.rs +++ b/datafusion/physical-plan/src/explain.rs @@ -23,11 +23,11 @@ use std::sync::Arc; use super::{DisplayAs, PlanProperties, SendableRecordBatchStream}; use crate::execution_plan::{Boundedness, EmissionType}; use crate::stream::RecordBatchStreamAdapter; -use crate::{DisplayFormatType, ExecutionPlan, Partitioning}; +use crate::{DisplayFormatType, ExecutionPlan, MappedExpr, Partitioning}; use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch}; use datafusion_common::display::StringifiedPlan; -use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_common::tree_node::{Transformed, TreeNodeRecursion}; use datafusion_common::{Result, assert_eq_or_internal_err}; use datafusion_execution::TaskContext; use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr}; @@ -129,6 +129,13 @@ impl ExecutionPlan for ExplainExec { Ok(TreeNodeRecursion::Continue) } + fn map_expressions( + self: Arc, + _f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + Ok(Transformed::no(self)) + } + fn with_new_children( self: Arc, _: Vec>, diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 141d9c38469d8..e85971e4c8057 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -41,7 +41,7 @@ use crate::projection::{ try_embed_projection, update_expr, }; use crate::{ - DisplayFormatType, ExecutionPlan, + DisplayFormatType, ExecutionPlan, MappedExpr, RecomputePropertiesBehavior, metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RatioMetrics}, }; @@ -51,7 +51,7 @@ use arrow::record_batch::RecordBatch; use datafusion_common::cast::as_boolean_array; use datafusion_common::config::ConfigOptions; use datafusion_common::stats::Precision; -use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_common::tree_node::{Transformed, TreeNodeRecursion}; use datafusion_common::{ DataFusionError, Result, ScalarValue, internal_err, plan_err, project_schema, }; @@ -535,6 +535,29 @@ impl ExecutionPlan for FilterExec { f(self.predicate.as_ref()) } + fn map_expressions( + self: Arc, + f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + let mapped = f(Arc::clone(&self.predicate))?; + if !mapped.expr.transformed { + return Ok(Transformed::no(self)); + } + let new_predicate = mapped.expr.data; + if mapped.recompute == RecomputePropertiesBehavior::Recompute { + let new_node = FilterExecBuilder::from(&*self) + .with_predicate(new_predicate) + .build()?; + Ok(Transformed::yes(Arc::new(new_node) as _)) + } else { + let new_node = FilterExec { + predicate: new_predicate, + ..(*self).clone() + }; + Ok(Transformed::yes(Arc::new(new_node) as _)) + } + } + fn maintains_input_order(&self) -> Vec { // Tell optimizer this operator doesn't reorder its input vec![true] diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index a895f69dc5138..d73ac9c240b3b 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -33,7 +33,7 @@ use crate::projection::{ }; use crate::{ ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, - ExecutionPlanProperties, PlanProperties, RecordBatchStream, + ExecutionPlanProperties, MappedExpr, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, check_if_same_properties, handle_state, }; @@ -41,7 +41,7 @@ use arrow::array::{RecordBatch, RecordBatchOptions}; use arrow::compute::concat_batches; use arrow::datatypes::{Fields, Schema, SchemaRef}; use datafusion_common::stats::Precision; -use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_common::tree_node::{Transformed, TreeNodeRecursion}; use datafusion_common::{ JoinType, Result, ScalarValue, assert_eq_or_internal_err, internal_err, }; @@ -295,6 +295,13 @@ impl ExecutionPlan for CrossJoinExec { Ok(TreeNodeRecursion::Continue) } + fn map_expressions( + self: Arc, + _f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + Ok(Transformed::no(self)) + } + fn with_new_children( self: Arc, children: Vec>, diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index a03cc36958fc1..8524054610bab 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -54,8 +54,8 @@ use crate::projection::{ use crate::repartition::REPARTITION_RANDOM_STATE; use crate::spill::get_record_batch_memory_size; use crate::{ - DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, - PlanProperties, SendableRecordBatchStream, Statistics, + DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, MappedExpr, Partitioning, + PlanProperties, RecomputePropertiesBehavior, SendableRecordBatchStream, Statistics, common::can_project, joins::utils::{ BuildProbeJoinMetrics, ColumnIndex, JoinFilter, JoinHashMapType, @@ -72,7 +72,7 @@ use arrow::record_batch::RecordBatch; use arrow::util::bit_util; use arrow_schema::{DataType, Schema}; use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_common::tree_node::{Transformed, TreeNodeRecursion}; use datafusion_common::utils::memory::estimate_memory_size; use datafusion_common::{ JoinSide, JoinType, NullEquality, Result, assert_or_internal_err, internal_err, @@ -1259,6 +1259,88 @@ impl ExecutionPlan for HashJoinExec { Ok(tnr) } + fn map_expressions( + self: Arc, + f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + let mut any_transformed = false; + let mut any_recompute = false; + + // Map join key expressions (on clauses) + let mut new_on: Vec<(PhysicalExprRef, PhysicalExprRef)> = + Vec::with_capacity(self.on.len()); + for (left, right) in self.on.iter() { + let mapped_left = f(Arc::clone(left))?; + if mapped_left.expr.transformed { + any_transformed = true; + } + if mapped_left.recompute == RecomputePropertiesBehavior::Recompute { + any_recompute = true; + } + let mapped_right = f(Arc::clone(right))?; + if mapped_right.expr.transformed { + any_transformed = true; + } + if mapped_right.recompute == RecomputePropertiesBehavior::Recompute { + any_recompute = true; + } + new_on.push((mapped_left.expr.data, mapped_right.expr.data)); + } + + // Map join filter expression if present + let new_filter = if let Some(filter) = &self.filter { + let mapped = f(Arc::clone(filter.expression()))?; + if mapped.expr.transformed { + any_transformed = true; + } + if mapped.recompute == RecomputePropertiesBehavior::Recompute { + any_recompute = true; + } + Some(JoinFilter::new( + mapped.expr.data, + filter.column_indices.clone(), + Arc::clone(&filter.schema), + )) + } else { + None + }; + + if !any_transformed { + return Ok(Transformed::no(self)); + } + + if any_recompute { + let new_node = self + .builder() + .with_on(new_on) + .with_filter(new_filter) + .recompute_properties() + .build_exec()?; + Ok(Transformed::yes(new_node)) + } else { + let new_node = HashJoinExec { + left: Arc::clone(&self.left), + right: Arc::clone(&self.right), + on: new_on, + filter: new_filter, + join_type: self.join_type, + join_schema: Arc::clone(&self.join_schema), + left_fut: Arc::clone(&self.left_fut), + random_state: self.random_state.clone(), + mode: self.mode, + metrics: ExecutionPlanMetricsSet::new(), + projection: self.projection.clone(), + column_indices: self.column_indices.clone(), + null_equality: self.null_equality, + null_aware: self.null_aware, + cache: Arc::clone(&self.cache), + dynamic_filter: None, + fetch: self.fetch, + }; + Ok(Transformed::yes(Arc::new(new_node) as _)) + } + } + /// Creates a new HashJoinExec with different children while preserving configuration. /// /// This method is called during query optimization when the optimizer creates new diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index f84cb54dac948..559f39c7303c7 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -45,8 +45,8 @@ use crate::projection::{ }; use crate::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, - PlanProperties, RecordBatchStream, SendableRecordBatchStream, - check_if_same_properties, + MappedExpr, PlanProperties, RecomputePropertiesBehavior, RecordBatchStream, + SendableRecordBatchStream, check_if_same_properties, }; use arrow::array::{ @@ -61,7 +61,7 @@ use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_schema::DataType; use datafusion_common::cast::as_boolean_array; -use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_common::tree_node::{Transformed, TreeNodeRecursion}; use datafusion_common::{ JoinSide, Result, ScalarValue, Statistics, arrow_err, assert_eq_or_internal_err, internal_datafusion_err, internal_err, project_schema, unwrap_or_internal_err, @@ -569,6 +569,47 @@ impl ExecutionPlan for NestedLoopJoinExec { Ok(TreeNodeRecursion::Continue) } + fn map_expressions( + self: Arc, + f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + let Some(filter) = &self.filter else { + return Ok(Transformed::no(self)); + }; + + let mapped = f(Arc::clone(filter.expression()))?; + if !mapped.expr.transformed { + return Ok(Transformed::no(self)); + } + + let new_filter = Some(JoinFilter::new( + mapped.expr.data, + filter.column_indices.clone(), + Arc::clone(&filter.schema), + )); + + if mapped.recompute == RecomputePropertiesBehavior::Recompute { + let new_node = NestedLoopJoinExecBuilder::from(&*self) + .with_filter(new_filter) + .build()?; + Ok(Transformed::yes(Arc::new(new_node) as _)) + } else { + let new_node = NestedLoopJoinExec { + left: Arc::clone(&self.left), + right: Arc::clone(&self.right), + filter: new_filter, + join_type: self.join_type, + join_schema: Arc::clone(&self.join_schema), + build_side_data: Default::default(), + column_indices: self.column_indices.clone(), + projection: self.projection.clone(), + metrics: Default::default(), + cache: Arc::clone(&self.cache), + }; + Ok(Transformed::yes(Arc::new(new_node) as _)) + } + } + fn with_new_children( self: Arc, children: Vec>, diff --git a/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs b/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs index fb1c4b160528d..99860a620cb6d 100644 --- a/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs +++ b/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs @@ -23,7 +23,7 @@ use arrow::{ }; use arrow_schema::{SchemaRef, SortOptions}; use datafusion_common::not_impl_err; -use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_common::tree_node::{Transformed, TreeNodeRecursion}; use datafusion_common::{JoinSide, Result, internal_err}; use datafusion_execution::{ SendableRecordBatchStream, @@ -53,7 +53,8 @@ use crate::joins::piecewise_merge_join::utils::{ use crate::joins::utils::asymmetric_join_output_partitioning; use crate::metrics::MetricsSet; use crate::{ - DisplayAs, DisplayFormatType, ExecutionPlanProperties, check_if_same_properties, + DisplayAs, DisplayFormatType, ExecutionPlanProperties, MappedExpr, + RecomputePropertiesBehavior, check_if_same_properties, }; use crate::{ ExecutionPlan, PlanProperties, @@ -520,6 +521,68 @@ impl ExecutionPlan for PiecewiseMergeJoinExec { f(self.on.0.as_ref())?.visit_sibling(|| f(self.on.1.as_ref())) } + fn map_expressions( + self: Arc, + f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + let mapped_left = f(Arc::clone(&self.on.0))?; + let mapped_right = f(Arc::clone(&self.on.1))?; + + let any_transformed = + mapped_left.expr.transformed || mapped_right.expr.transformed; + if !any_transformed { + return Ok(Transformed::no(self)); + } + + let any_recompute = mapped_left.recompute + == RecomputePropertiesBehavior::Recompute + || mapped_right.recompute == RecomputePropertiesBehavior::Recompute; + + let new_on = (mapped_left.expr.data, mapped_right.expr.data); + + if any_recompute { + let new_node = PiecewiseMergeJoinExec::try_new( + Arc::clone(&self.buffered), + Arc::clone(&self.streamed), + new_on, + self.operator, + self.join_type, + self.num_partitions, + )?; + Ok(Transformed::yes(Arc::new(new_node) as _)) + } else { + // Rebuild sort exprs from new on + existing sort options + let new_left_sort_exprs = vec![PhysicalSortExpr::new( + Arc::clone(&new_on.0), + self.sort_options, + )]; + let new_right_sort_exprs = vec![PhysicalSortExpr::new( + Arc::clone(&new_on.1), + self.sort_options, + )]; + let new_left_lex = LexOrdering::new(new_left_sort_exprs) + .unwrap_or_else(|| self.left_child_plan_required_order.clone()); + let new_right_lex = LexOrdering::new(new_right_sort_exprs) + .unwrap_or_else(|| self.right_batch_required_orders.clone()); + let new_node = PiecewiseMergeJoinExec { + buffered: Arc::clone(&self.buffered), + streamed: Arc::clone(&self.streamed), + on: new_on, + operator: self.operator, + join_type: self.join_type, + schema: Arc::clone(&self.schema), + buffered_fut: Default::default(), + metrics: ExecutionPlanMetricsSet::new(), + left_child_plan_required_order: new_left_lex, + right_batch_required_orders: new_right_lex, + sort_options: self.sort_options, + cache: Arc::clone(&self.cache), + num_partitions: self.num_partitions, + }; + Ok(Transformed::yes(Arc::new(new_node) as _)) + } + } + fn required_input_distribution(&self) -> Vec { vec![ Distribution::SinglePartition, diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs index ac077792f592c..3c7bb56bdbc3c 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs @@ -39,12 +39,13 @@ use crate::projection::{ }; use crate::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, - PlanProperties, SendableRecordBatchStream, Statistics, check_if_same_properties, + MappedExpr, PlanProperties, RecomputePropertiesBehavior, SendableRecordBatchStream, + Statistics, check_if_same_properties, }; use arrow::compute::SortOptions; use arrow::datatypes::SchemaRef; -use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_common::tree_node::{Transformed, TreeNodeRecursion}; use datafusion_common::{ JoinSide, JoinType, NullEquality, Result, assert_eq_or_internal_err, internal_err, plan_err, @@ -468,6 +469,103 @@ impl ExecutionPlan for SortMergeJoinExec { Ok(tnr) } + fn map_expressions( + self: Arc, + f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + let mut any_transformed = false; + let mut any_recompute = false; + + // Map join key expressions (on clauses) + let mut new_on: JoinOn = Vec::with_capacity(self.on.len()); + for (left, right) in self.on.iter() { + let mapped_left = f(Arc::clone(left))?; + if mapped_left.expr.transformed { + any_transformed = true; + } + if mapped_left.recompute == RecomputePropertiesBehavior::Recompute { + any_recompute = true; + } + let mapped_right = f(Arc::clone(right))?; + if mapped_right.expr.transformed { + any_transformed = true; + } + if mapped_right.recompute == RecomputePropertiesBehavior::Recompute { + any_recompute = true; + } + new_on.push((mapped_left.expr.data, mapped_right.expr.data)); + } + + // Map join filter expression if present + let new_filter = if let Some(filter) = &self.filter { + let mapped = f(Arc::clone(filter.expression()))?; + if mapped.expr.transformed { + any_transformed = true; + } + if mapped.recompute == RecomputePropertiesBehavior::Recompute { + any_recompute = true; + } + Some(JoinFilter::new( + mapped.expr.data, + filter.column_indices.clone(), + Arc::clone(&filter.schema), + )) + } else { + None + }; + + if !any_transformed { + return Ok(Transformed::no(self)); + } + + if any_recompute { + let new_node = SortMergeJoinExec::try_new( + Arc::clone(&self.left), + Arc::clone(&self.right), + new_on, + new_filter, + self.join_type, + self.sort_options.clone(), + self.null_equality, + )?; + Ok(Transformed::yes(Arc::new(new_node) as _)) + } else { + // Rebuild sort exprs from new on + existing sort options + let (new_left_sort_exprs, new_right_sort_exprs): (Vec<_>, Vec<_>) = new_on + .iter() + .zip(self.sort_options.iter()) + .map(|((l, r), sort_op)| { + let left = PhysicalSortExpr { + expr: Arc::clone(l), + options: *sort_op, + }; + let right = PhysicalSortExpr { + expr: Arc::clone(r), + options: *sort_op, + }; + (left, right) + }) + .unzip(); + let new_node = SortMergeJoinExec { + left: Arc::clone(&self.left), + right: Arc::clone(&self.right), + on: new_on, + filter: new_filter, + join_type: self.join_type, + schema: Arc::clone(&self.schema), + metrics: ExecutionPlanMetricsSet::new(), + left_sort_exprs: LexOrdering::new(new_left_sort_exprs) + .unwrap_or_else(|| self.left_sort_exprs.clone()), + right_sort_exprs: LexOrdering::new(new_right_sort_exprs) + .unwrap_or_else(|| self.right_sort_exprs.clone()), + sort_options: self.sort_options.clone(), + null_equality: self.null_equality, + cache: Arc::clone(&self.cache), + }; + Ok(Transformed::yes(Arc::new(new_node) as _)) + } + } + fn with_new_children( self: Arc, children: Vec>, diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index f31cd8d446de2..e8130addb01cd 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -53,7 +53,8 @@ use crate::projection::{ }; use crate::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, - PlanProperties, RecordBatchStream, SendableRecordBatchStream, + MappedExpr, PlanProperties, RecomputePropertiesBehavior, RecordBatchStream, + SendableRecordBatchStream, joins::StreamJoinPartitionMode, metrics::{ExecutionPlanMetricsSet, MetricsSet}, }; @@ -66,7 +67,7 @@ use arrow::compute::concat_batches; use arrow::datatypes::{ArrowNativeType, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::hash_utils::create_hashes; -use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_common::tree_node::{Transformed, TreeNodeRecursion}; use datafusion_common::utils::bisect; use datafusion_common::{ HashSet, JoinSide, JoinType, NullEquality, Result, assert_eq_or_internal_err, @@ -482,6 +483,89 @@ impl ExecutionPlan for SymmetricHashJoinExec { Ok(tnr) } + fn map_expressions( + self: Arc, + f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + let mut any_transformed = false; + let mut any_recompute = false; + + // Map join key expressions (on clauses) + let mut new_on: Vec<(PhysicalExprRef, PhysicalExprRef)> = + Vec::with_capacity(self.on.len()); + for (left, right) in self.on.iter() { + let mapped_left = f(Arc::clone(left))?; + if mapped_left.expr.transformed { + any_transformed = true; + } + if mapped_left.recompute == RecomputePropertiesBehavior::Recompute { + any_recompute = true; + } + let mapped_right = f(Arc::clone(right))?; + if mapped_right.expr.transformed { + any_transformed = true; + } + if mapped_right.recompute == RecomputePropertiesBehavior::Recompute { + any_recompute = true; + } + new_on.push((mapped_left.expr.data, mapped_right.expr.data)); + } + + // Map join filter expression if present + let new_filter = if let Some(filter) = &self.filter { + let mapped = f(Arc::clone(filter.expression()))?; + if mapped.expr.transformed { + any_transformed = true; + } + if mapped.recompute == RecomputePropertiesBehavior::Recompute { + any_recompute = true; + } + Some(JoinFilter::new( + mapped.expr.data, + filter.column_indices.clone(), + Arc::clone(&filter.schema), + )) + } else { + None + }; + + if !any_transformed { + return Ok(Transformed::no(self)); + } + + if any_recompute { + let new_node = SymmetricHashJoinExec::try_new( + Arc::clone(&self.left), + Arc::clone(&self.right), + new_on, + new_filter, + &self.join_type, + self.null_equality, + self.left_sort_exprs.clone(), + self.right_sort_exprs.clone(), + self.mode, + )?; + Ok(Transformed::yes(Arc::new(new_node) as _)) + } else { + let new_node = SymmetricHashJoinExec { + left: Arc::clone(&self.left), + right: Arc::clone(&self.right), + on: new_on, + filter: new_filter, + join_type: self.join_type, + random_state: self.random_state.clone(), + metrics: ExecutionPlanMetricsSet::new(), + column_indices: self.column_indices.clone(), + null_equality: self.null_equality, + left_sort_exprs: self.left_sort_exprs.clone(), + right_sort_exprs: self.right_sort_exprs.clone(), + mode: self.mode, + cache: Arc::clone(&self.cache), + }; + Ok(Transformed::yes(Arc::new(new_node) as _)) + } + } + fn with_new_children( self: Arc, children: Vec>, diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 6467d7a2e389d..5ca15c6934f2a 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -42,9 +42,10 @@ pub use datafusion_physical_expr::{ pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay}; pub use crate::execution_plan::{ - ExecutionPlan, ExecutionPlanProperties, PlanProperties, collect, collect_partitioned, - displayable, execute_input_stream, execute_stream, execute_stream_partitioned, - get_plan_string, with_new_children_if_necessary, + ExecutionPlan, ExecutionPlanProperties, MappedExpr, PlanProperties, + RecomputePropertiesBehavior, collect, collect_partitioned, displayable, + execute_input_stream, execute_stream, execute_stream_partitioned, get_plan_string, + with_new_children_if_necessary, }; pub use crate::metrics::Metric; pub use crate::ordering::InputOrderMode; diff --git a/datafusion/physical-plan/src/limit.rs b/datafusion/physical-plan/src/limit.rs index d135434898d8f..b01904fbd34b6 100644 --- a/datafusion/physical-plan/src/limit.rs +++ b/datafusion/physical-plan/src/limit.rs @@ -29,17 +29,18 @@ use super::{ }; use crate::execution_plan::{Boundedness, CardinalityEffect}; use crate::{ - DisplayFormatType, Distribution, ExecutionPlan, Partitioning, - check_if_same_properties, + DisplayFormatType, Distribution, ExecutionPlan, MappedExpr, Partitioning, + RecomputePropertiesBehavior, check_if_same_properties, }; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; -use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_common::tree_node::{Transformed, TreeNodeRecursion}; use datafusion_common::{Result, assert_eq_or_internal_err, internal_err}; use datafusion_execution::TaskContext; use datafusion_physical_expr::{LexOrdering, PhysicalExpr}; +use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use futures::stream::{Stream, StreamExt}; use log::trace; @@ -193,6 +194,46 @@ impl ExecutionPlan for GlobalLimitExec { Ok(tnr) } + fn map_expressions( + self: Arc, + f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + let Some(ordering) = &self.required_ordering else { + return Ok(Transformed::no(self)); + }; + let mut any_transformed = false; + let mut any_recompute = false; + let mut new_ordering = Vec::with_capacity(ordering.len()); + for sort_expr in ordering.iter() { + let mapped = f(Arc::clone(&sort_expr.expr))?; + if mapped.expr.transformed { + any_transformed = true; + } + if mapped.recompute == RecomputePropertiesBehavior::Recompute { + any_recompute = true; + } + new_ordering.push(PhysicalSortExpr { + expr: mapped.expr.data, + options: sort_expr.options, + }); + } + if !any_transformed { + return Ok(Transformed::no(self)); + } + let new_ordering = + LexOrdering::new(new_ordering).or_else(|| self.required_ordering.clone()); + if any_recompute { + let mut new_node = + GlobalLimitExec::new(Arc::clone(&self.input), self.skip, self.fetch); + new_node.set_required_ordering(new_ordering); + Ok(Transformed::yes(Arc::new(new_node) as _)) + } else { + let mut new_node = (*self).clone(); + new_node.required_ordering = new_ordering; + Ok(Transformed::yes(Arc::new(new_node) as _)) + } + } + fn with_new_children( self: Arc, mut children: Vec>, @@ -381,6 +422,45 @@ impl ExecutionPlan for LocalLimitExec { Ok(tnr) } + fn map_expressions( + self: Arc, + f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + let Some(ordering) = &self.required_ordering else { + return Ok(Transformed::no(self)); + }; + let mut any_transformed = false; + let mut any_recompute = false; + let mut new_ordering = Vec::with_capacity(ordering.len()); + for sort_expr in ordering.iter() { + let mapped = f(Arc::clone(&sort_expr.expr))?; + if mapped.expr.transformed { + any_transformed = true; + } + if mapped.recompute == RecomputePropertiesBehavior::Recompute { + any_recompute = true; + } + new_ordering.push(PhysicalSortExpr { + expr: mapped.expr.data, + options: sort_expr.options, + }); + } + if !any_transformed { + return Ok(Transformed::no(self)); + } + let new_ordering = + LexOrdering::new(new_ordering).or_else(|| self.required_ordering.clone()); + if any_recompute { + let mut new_node = LocalLimitExec::new(Arc::clone(&self.input), self.fetch); + new_node.set_required_ordering(new_ordering); + Ok(Transformed::yes(Arc::new(new_node) as _)) + } else { + let mut new_node = (*self).clone(); + new_node.required_ordering = new_ordering; + Ok(Transformed::yes(Arc::new(new_node) as _)) + } + } + fn with_new_children( self: Arc, children: Vec>, diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index d607f2b440f66..6b6353c2ed02a 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -26,13 +26,13 @@ use crate::coop::cooperative; use crate::execution_plan::{Boundedness, EmissionType, SchedulingType}; use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use crate::{ - DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, - RecordBatchStream, SendableRecordBatchStream, + DisplayAs, DisplayFormatType, ExecutionPlan, MappedExpr, Partitioning, + PlanProperties, RecordBatchStream, SendableRecordBatchStream, }; use arrow::array::RecordBatch; use arrow::datatypes::SchemaRef; -use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_common::tree_node::{Transformed, TreeNodeRecursion}; use datafusion_common::{Result, assert_eq_or_internal_err, assert_or_internal_err}; use datafusion_execution::TaskContext; use datafusion_execution::memory_pool::MemoryReservation; @@ -323,6 +323,13 @@ impl ExecutionPlan for LazyMemoryExec { Ok(TreeNodeRecursion::Continue) } + fn map_expressions( + self: Arc, + _f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + Ok(Transformed::no(self)) + } + fn with_new_children( self: Arc, children: Vec>, diff --git a/datafusion/physical-plan/src/placeholder_row.rs b/datafusion/physical-plan/src/placeholder_row.rs index eaa895c821837..0e03ce41760b7 100644 --- a/datafusion/physical-plan/src/placeholder_row.rs +++ b/datafusion/physical-plan/src/placeholder_row.rs @@ -24,13 +24,13 @@ use crate::coop::cooperative; use crate::execution_plan::{Boundedness, EmissionType, SchedulingType}; use crate::memory::MemoryStream; use crate::{ - DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, - SendableRecordBatchStream, Statistics, common, + DisplayAs, DisplayFormatType, ExecutionPlan, MappedExpr, Partitioning, + PlanProperties, SendableRecordBatchStream, Statistics, common, }; use arrow::array::{ArrayRef, NullArray, RecordBatch, RecordBatchOptions}; use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef}; -use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_common::tree_node::{Transformed, TreeNodeRecursion}; use datafusion_common::{Result, assert_or_internal_err}; use datafusion_execution::TaskContext; use datafusion_physical_expr::EquivalenceProperties; @@ -149,6 +149,13 @@ impl ExecutionPlan for PlaceholderRowExec { Ok(TreeNodeRecursion::Continue) } + fn map_expressions( + self: Arc, + _f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + Ok(Transformed::no(self)) + } + fn with_new_children( self: Arc, _: Vec>, diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index a4cce0436b10e..c85576b4bd587 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -33,7 +33,10 @@ use crate::filter_pushdown::{ FilterPushdownPropagation, FilterRemapper, PushedDownPredicate, }; use crate::joins::utils::{ColumnIndex, JoinFilter, JoinOn, JoinOnRef}; -use crate::{DisplayFormatType, ExecutionPlan, PhysicalExpr, check_if_same_properties}; +use crate::{ + DisplayFormatType, ExecutionPlan, MappedExpr, PhysicalExpr, + RecomputePropertiesBehavior, check_if_same_properties, +}; use std::any::Any; use std::collections::HashMap; use std::pin::Pin; @@ -329,6 +332,51 @@ impl ExecutionPlan for ProjectionExec { Ok(tnr) } + fn map_expressions( + self: Arc, + f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + let mut any_transformed = false; + let mut any_recompute = false; + let mut new_exprs: Vec = + Vec::with_capacity(self.projector.projection().as_ref().len()); + for proj_expr in self.projector.projection().as_ref().iter() { + let mapped = f(Arc::clone(&proj_expr.expr))?; + if mapped.expr.transformed { + any_transformed = true; + } + if mapped.recompute == RecomputePropertiesBehavior::Recompute { + any_recompute = true; + } + new_exprs.push(ProjectionExpr { + expr: mapped.expr.data, + alias: proj_expr.alias.clone(), + }); + } + if !any_transformed { + return Ok(Transformed::no(self)); + } + if any_recompute { + let new_node = ProjectionExec::try_new(new_exprs, Arc::clone(&self.input))?; + Ok(Transformed::yes(Arc::new(new_node) as _)) + } else { + // Preserve cache: rebuild the projector with new expressions but reuse + // the existing output schema since properties are preserved. + let new_projection = ProjectionExprs::from_expressions( + new_exprs.into_iter().collect::>(), + ); + let new_projector = + new_projection.make_projector(self.input.schema().as_ref())?; + let new_node = ProjectionExec { + projector: new_projector, + input: Arc::clone(&self.input), + metrics: ExecutionPlanMetricsSet::new(), + cache: Arc::clone(&self.cache), + }; + Ok(Transformed::yes(Arc::new(new_node) as _)) + } + } + fn with_new_children( self: Arc, mut children: Vec>, diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index 049aa9563d52e..17adac17272ab 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -29,8 +29,8 @@ use crate::metrics::{ BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput, }; use crate::{ - DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, RecordBatchStream, - SendableRecordBatchStream, + DisplayAs, DisplayFormatType, ExecutionPlan, MappedExpr, PlanProperties, + RecordBatchStream, SendableRecordBatchStream, }; use arrow::array::{BooleanArray, BooleanBuilder}; use arrow::compute::filter_record_batch; @@ -160,6 +160,13 @@ impl ExecutionPlan for RecursiveQueryExec { Ok(TreeNodeRecursion::Continue) } + fn map_expressions( + self: Arc, + _f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + Ok(Transformed::no(self)) + } + // TODO: control these hints and see whether we can // infer some from the child plans (static/recursive terms). fn maintains_input_order(&self) -> Vec { diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 342b2f50357c5..b39f3c5f57132 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -40,8 +40,8 @@ use crate::spill::spill_manager::SpillManager; use crate::spill::spill_pool::{self, SpillPoolWriter}; use crate::stream::RecordBatchStreamAdapter; use crate::{ - DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics, - check_if_same_properties, + DisplayFormatType, ExecutionPlan, MappedExpr, Partitioning, PlanProperties, + RecomputePropertiesBehavior, Statistics, check_if_same_properties, }; use arrow::array::{PrimitiveArray, RecordBatch, RecordBatchOptions}; @@ -49,7 +49,7 @@ use arrow::compute::take_arrays; use arrow::datatypes::{SchemaRef, UInt32Type}; use datafusion_common::config::ConfigOptions; use datafusion_common::stats::Precision; -use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_common::tree_node::{Transformed, TreeNodeRecursion}; use datafusion_common::utils::transpose; use datafusion_common::{ ColumnStatistics, DataFusionError, HashMap, assert_or_internal_err, @@ -929,6 +929,57 @@ impl ExecutionPlan for RepartitionExec { Ok(TreeNodeRecursion::Continue) } + fn map_expressions( + self: Arc, + f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + let Partitioning::Hash(exprs, n) = self.partitioning() else { + return Ok(Transformed::no(self)); + }; + let n = *n; + let mut any_transformed = false; + let mut any_recompute = false; + let mut new_exprs = Vec::with_capacity(exprs.len()); + for expr in exprs.iter() { + let mapped = f(Arc::clone(expr))?; + if mapped.expr.transformed { + any_transformed = true; + } + if mapped.recompute == RecomputePropertiesBehavior::Recompute { + any_recompute = true; + } + new_exprs.push(mapped.expr.data); + } + if !any_transformed { + return Ok(Transformed::no(self)); + } + let new_partitioning = Partitioning::Hash(new_exprs, n); + // For Preserve, we still need to update the partitioning in the cache. + // For both branches, rebuild using try_new (which recomputes cache). + // The Preserve case skips full property recompute by reusing cache structure + // but partitioning must be updated. + if any_recompute { + let mut new_node = + RepartitionExec::try_new(Arc::clone(&self.input), new_partitioning)?; + if self.preserve_order { + new_node = new_node.with_preserve_order(); + } + Ok(Transformed::yes(Arc::new(new_node) as _)) + } else { + // For preserve, update the partitioning in cache directly + let mut new_cache = (*self.cache).clone(); + new_cache.partitioning = new_partitioning; + let new_node = RepartitionExec { + input: Arc::clone(&self.input), + state: Default::default(), + metrics: ExecutionPlanMetricsSet::new(), + preserve_order: self.preserve_order, + cache: Arc::new(new_cache), + }; + Ok(Transformed::yes(Arc::new(new_node) as _)) + } + } + fn with_new_children( self: Arc, mut children: Vec>, diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs b/datafusion/physical-plan/src/sorts/partial_sort.rs index 127998601fba8..52c28deb2e532 100644 --- a/datafusion/physical-plan/src/sorts/partial_sort.rs +++ b/datafusion/physical-plan/src/sorts/partial_sort.rs @@ -61,18 +61,19 @@ use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use crate::sorts::sort::sort_batch; use crate::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, - Partitioning, PlanProperties, SendableRecordBatchStream, Statistics, - check_if_same_properties, + MappedExpr, Partitioning, PlanProperties, RecomputePropertiesBehavior, + SendableRecordBatchStream, Statistics, check_if_same_properties, }; use arrow::compute::concat_batches; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion_common::Result; -use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_common::tree_node::{Transformed, TreeNodeRecursion}; use datafusion_common::utils::evaluate_partition_ranges; use datafusion_execution::{RecordBatchStream, TaskContext}; use datafusion_physical_expr::{LexOrdering, PhysicalExpr}; +use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use futures::{Stream, StreamExt, ready}; use log::trace; @@ -299,6 +300,54 @@ impl ExecutionPlan for PartialSortExec { Ok(tnr) } + fn map_expressions( + self: Arc, + f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + let mut any_transformed = false; + let mut any_recompute = false; + let mut new_sort_exprs = Vec::with_capacity(self.expr.len()); + for sort_expr in self.expr.iter() { + let mapped = f(Arc::clone(&sort_expr.expr))?; + if mapped.expr.transformed { + any_transformed = true; + } + if mapped.recompute == RecomputePropertiesBehavior::Recompute { + any_recompute = true; + } + new_sort_exprs.push(PhysicalSortExpr { + expr: mapped.expr.data, + options: sort_expr.options, + }); + } + if !any_transformed { + return Ok(Transformed::no(self)); + } + let new_lex = + LexOrdering::new(new_sort_exprs).unwrap_or_else(|| self.expr.clone()); + if any_recompute { + let new_node = PartialSortExec::new( + new_lex, + Arc::clone(&self.input), + self.common_prefix_length, + ) + .with_fetch(self.fetch) + .with_preserve_partitioning(self.preserve_partitioning); + Ok(Transformed::yes(Arc::new(new_node) as _)) + } else { + let new_node = PartialSortExec { + input: Arc::clone(&self.input), + expr: new_lex, + common_prefix_length: self.common_prefix_length, + metrics_set: ExecutionPlanMetricsSet::new(), + preserve_partitioning: self.preserve_partitioning, + fetch: self.fetch, + cache: Arc::clone(&self.cache), + }; + Ok(Transformed::yes(Arc::new(new_node) as _)) + } + } + fn with_new_children( self: Arc, children: Vec>, diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 2c5c82e723f48..c7b8601a7952d 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -50,15 +50,15 @@ use crate::topk::TopK; use crate::topk::TopKDynamicFilters; use crate::{ DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan, - ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream, - Statistics, + ExecutionPlanProperties, MappedExpr, Partitioning, PlanProperties, + RecomputePropertiesBehavior, SendableRecordBatchStream, Statistics, }; use arrow::array::{Array, RecordBatch, RecordBatchOptions, StringViewArray}; use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays}; use arrow::datatypes::SchemaRef; use datafusion_common::config::SpillCompression; -use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_common::tree_node::{Transformed, TreeNodeRecursion}; use datafusion_common::{ DataFusionError, Result, assert_or_internal_err, internal_datafusion_err, unwrap_or_internal_err, @@ -1204,6 +1204,45 @@ impl ExecutionPlan for SortExec { Ok(tnr) } + fn map_expressions( + self: Arc, + f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + let mut any_transformed = false; + let mut any_recompute = false; + let mut new_sort_exprs = Vec::with_capacity(self.expr.len()); + for sort_expr in self.expr.iter() { + let mapped = f(Arc::clone(&sort_expr.expr))?; + if mapped.expr.transformed { + any_transformed = true; + } + if mapped.recompute == RecomputePropertiesBehavior::Recompute { + any_recompute = true; + } + new_sort_exprs.push(PhysicalSortExpr { + expr: mapped.expr.data, + options: sort_expr.options, + }); + } + if !any_transformed { + return Ok(Transformed::no(self)); + } + let new_lex = + LexOrdering::new(new_sort_exprs).unwrap_or_else(|| self.expr.clone()); + if any_recompute { + let mut new_sort = SortExec::new(new_lex, Arc::clone(&self.input)); + new_sort.preserve_partitioning = self.preserve_partitioning; + if let Some(fetch) = self.fetch { + new_sort = new_sort.with_fetch(Some(fetch)); + } + Ok(Transformed::yes(Arc::new(new_sort) as _)) + } else { + let mut new_sort = self.cloned(); + new_sort.expr = new_lex; + Ok(Transformed::yes(Arc::new(new_sort) as _)) + } + } + fn benefits_from_input_partitioning(&self) -> Vec { vec![false] } @@ -1505,6 +1544,13 @@ mod tests { Ok(TreeNodeRecursion::Continue) } + fn map_expressions( + self: Arc, + _f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + Ok(Transformed::no(self)) + } + fn execute( &self, _partition: usize, diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index b1ee5b4d5e8da..29927d8a01c8e 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -27,16 +27,18 @@ use crate::projection::{ProjectionExec, make_with_child, update_ordering}; use crate::sorts::streaming_merge::StreamingMergeBuilder; use crate::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, - Partitioning, PlanProperties, SendableRecordBatchStream, Statistics, - check_if_same_properties, + MappedExpr, Partitioning, PlanProperties, RecomputePropertiesBehavior, + SendableRecordBatchStream, Statistics, check_if_same_properties, }; -use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_common::tree_node::{Transformed, TreeNodeRecursion}; use datafusion_common::{Result, assert_eq_or_internal_err, internal_err}; use datafusion_execution::TaskContext; use datafusion_execution::memory_pool::MemoryConsumer; use datafusion_physical_expr::PhysicalExpr; -use datafusion_physical_expr_common::sort_expr::{LexOrdering, OrderingRequirements}; +use datafusion_physical_expr_common::sort_expr::{ + LexOrdering, OrderingRequirements, PhysicalSortExpr, +}; use crate::execution_plan::{EvaluationType, SchedulingType}; use log::{debug, trace}; @@ -303,6 +305,49 @@ impl ExecutionPlan for SortPreservingMergeExec { Ok(tnr) } + fn map_expressions( + self: Arc, + f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + let mut any_transformed = false; + let mut any_recompute = false; + let mut new_sort_exprs = Vec::with_capacity(self.expr.len()); + for sort_expr in self.expr.iter() { + let mapped = f(Arc::clone(&sort_expr.expr))?; + if mapped.expr.transformed { + any_transformed = true; + } + if mapped.recompute == RecomputePropertiesBehavior::Recompute { + any_recompute = true; + } + new_sort_exprs.push(PhysicalSortExpr { + expr: mapped.expr.data, + options: sort_expr.options, + }); + } + if !any_transformed { + return Ok(Transformed::no(self)); + } + let new_lex = + LexOrdering::new(new_sort_exprs).unwrap_or_else(|| self.expr.clone()); + if any_recompute { + let new_node = SortPreservingMergeExec::new(new_lex, Arc::clone(&self.input)) + .with_fetch(self.fetch) + .with_round_robin_repartition(self.enable_round_robin_repartition); + Ok(Transformed::yes(Arc::new(new_node) as _)) + } else { + let new_node = SortPreservingMergeExec { + input: Arc::clone(&self.input), + expr: new_lex, + metrics: ExecutionPlanMetricsSet::new(), + fetch: self.fetch, + cache: Arc::clone(&self.cache), + enable_round_robin_repartition: self.enable_round_robin_repartition, + }; + Ok(Transformed::yes(Arc::new(new_node) as _)) + } + } + fn with_new_children( self: Arc, mut children: Vec>, @@ -1432,6 +1477,12 @@ mod tests { ) -> Result { Ok(TreeNodeRecursion::Continue) } + fn map_expressions( + self: Arc, + _f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + Ok(Transformed::no(self)) + } fn with_new_children( self: Arc, _: Vec>, diff --git a/datafusion/physical-plan/src/streaming.rs b/datafusion/physical-plan/src/streaming.rs index 5a1206629ac7b..c3f762d1339a5 100644 --- a/datafusion/physical-plan/src/streaming.rs +++ b/datafusion/physical-plan/src/streaming.rs @@ -31,10 +31,10 @@ use crate::projection::{ ProjectionExec, all_alias_free_columns, new_projections_for_columns, update_ordering, }; use crate::stream::RecordBatchStreamAdapter; -use crate::{ExecutionPlan, Partitioning, SendableRecordBatchStream}; +use crate::{ExecutionPlan, MappedExpr, Partitioning, SendableRecordBatchStream}; use arrow::datatypes::{Schema, SchemaRef}; -use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_common::tree_node::{Transformed, TreeNodeRecursion}; use datafusion_common::{Result, internal_err, plan_err}; use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalExpr; @@ -257,6 +257,13 @@ impl ExecutionPlan for StreamingTableExec { Ok(TreeNodeRecursion::Continue) } + fn map_expressions( + self: Arc, + _f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + Ok(Transformed::no(self)) + } + fn with_new_children( self: Arc, children: Vec>, diff --git a/datafusion/physical-plan/src/test.rs b/datafusion/physical-plan/src/test.rs index 0630b8f174563..c9d45bdcd1a58 100644 --- a/datafusion/physical-plan/src/test.rs +++ b/datafusion/physical-plan/src/test.rs @@ -32,11 +32,11 @@ use crate::memory::MemoryStream; use crate::metrics::MetricsSet; use crate::stream::RecordBatchStreamAdapter; use crate::streaming::PartitionStream; -use crate::{DisplayAs, DisplayFormatType, PlanProperties}; +use crate::{DisplayAs, DisplayFormatType, MappedExpr, PlanProperties}; use arrow::array::{Array, ArrayRef, Int32Array, RecordBatch}; use arrow_schema::{DataType, Field, Schema, SchemaRef}; -use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_common::tree_node::{Transformed, TreeNodeRecursion}; use datafusion_common::{ Result, Statistics, assert_or_internal_err, config::ConfigOptions, project_schema, }; @@ -159,6 +159,13 @@ impl ExecutionPlan for TestMemoryExec { Ok(tnr) } + fn map_expressions( + self: Arc, + _f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + Ok(Transformed::no(self)) + } + fn with_new_children( self: Arc, _: Vec>, diff --git a/datafusion/physical-plan/src/test/exec.rs b/datafusion/physical-plan/src/test/exec.rs index 5458fa7ab8264..f4b185671f3d8 100644 --- a/datafusion/physical-plan/src/test/exec.rs +++ b/datafusion/physical-plan/src/test/exec.rs @@ -18,8 +18,8 @@ //! Simple iterator over batches for use in testing use crate::{ - DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, - RecordBatchStream, SendableRecordBatchStream, Statistics, common, + DisplayAs, DisplayFormatType, ExecutionPlan, MappedExpr, Partitioning, + PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, common, execution_plan::Boundedness, }; use crate::{ @@ -36,7 +36,7 @@ use std::{ use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; -use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_common::tree_node::{Transformed, TreeNodeRecursion}; use datafusion_common::{DataFusionError, Result, internal_err}; use datafusion_execution::TaskContext; use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr}; @@ -208,6 +208,13 @@ impl ExecutionPlan for MockExec { Ok(TreeNodeRecursion::Continue) } + fn map_expressions( + self: Arc, + _f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + Ok(Transformed::no(self)) + } + fn with_new_children( self: Arc, _: Vec>, @@ -452,6 +459,13 @@ impl ExecutionPlan for BarrierExec { Ok(TreeNodeRecursion::Continue) } + fn map_expressions( + self: Arc, + _f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + Ok(Transformed::no(self)) + } + /// Returns a stream which yields data fn execute( &self, @@ -595,6 +609,13 @@ impl ExecutionPlan for ErrorExec { Ok(TreeNodeRecursion::Continue) } + fn map_expressions( + self: Arc, + _f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + Ok(Transformed::no(self)) + } + /// Returns a stream which yields data fn execute( &self, @@ -685,6 +706,13 @@ impl ExecutionPlan for StatisticsExec { Ok(TreeNodeRecursion::Continue) } + fn map_expressions( + self: Arc, + _f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + Ok(Transformed::no(self)) + } + fn with_new_children( self: Arc, _: Vec>, @@ -803,6 +831,13 @@ impl ExecutionPlan for BlockingExec { Ok(TreeNodeRecursion::Continue) } + fn map_expressions( + self: Arc, + _f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + Ok(Transformed::no(self)) + } + fn execute( &self, _partition: usize, @@ -949,6 +984,13 @@ impl ExecutionPlan for PanicExec { Ok(TreeNodeRecursion::Continue) } + fn map_expressions( + self: Arc, + _f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + Ok(Transformed::no(self)) + } + fn with_new_children( self: Arc, _: Vec>, diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index dafcd6ee4014d..42626a93f9c63 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -28,7 +28,7 @@ use std::{any::Any, sync::Arc}; use super::{ ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, - ExecutionPlanProperties, Partitioning, PlanProperties, RecordBatchStream, + ExecutionPlanProperties, MappedExpr, Partitioning, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, metrics::{ExecutionPlanMetricsSet, MetricsSet}, }; @@ -50,7 +50,7 @@ use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::config::ConfigOptions; use datafusion_common::stats::Precision; -use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_common::tree_node::{Transformed, TreeNodeRecursion}; use datafusion_common::{ Result, assert_or_internal_err, exec_err, internal_datafusion_err, }; @@ -281,6 +281,13 @@ impl ExecutionPlan for UnionExec { Ok(TreeNodeRecursion::Continue) } + fn map_expressions( + self: Arc, + _f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + Ok(Transformed::no(self)) + } + fn with_new_children( self: Arc, children: Vec>, @@ -616,6 +623,13 @@ impl ExecutionPlan for InterleaveExec { Ok(TreeNodeRecursion::Continue) } + fn map_expressions( + self: Arc, + _f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + Ok(Transformed::no(self)) + } + fn with_new_children( self: Arc, children: Vec>, diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index 85799250181b6..6e5fb82be2ba3 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -27,7 +27,7 @@ use super::metrics::{ }; use super::{DisplayAs, ExecutionPlanProperties, PlanProperties}; use crate::{ - DisplayFormatType, Distribution, ExecutionPlan, RecordBatchStream, + DisplayFormatType, Distribution, ExecutionPlan, MappedExpr, RecordBatchStream, SendableRecordBatchStream, check_if_same_properties, }; @@ -43,7 +43,7 @@ use arrow::datatypes::{DataType, Int64Type, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_ord::cmp::lt; use async_trait::async_trait; -use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_common::tree_node::{Transformed, TreeNodeRecursion}; use datafusion_common::{ Constraints, HashMap, HashSet, Result, UnnestOptions, exec_datafusion_err, exec_err, internal_err, @@ -249,6 +249,13 @@ impl ExecutionPlan for UnnestExec { Ok(TreeNodeRecursion::Continue) } + fn map_expressions( + self: Arc, + _f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + Ok(Transformed::no(self)) + } + fn with_new_children( self: Arc, mut children: Vec>, diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index d0c44c659c20d..a959c1ac80cd8 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -35,8 +35,9 @@ use crate::windows::{ }; use crate::{ ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, - ExecutionPlanProperties, InputOrderMode, PlanProperties, RecordBatchStream, - SendableRecordBatchStream, Statistics, WindowExpr, check_if_same_properties, + ExecutionPlanProperties, InputOrderMode, MappedExpr, PlanProperties, + RecomputePropertiesBehavior, RecordBatchStream, SendableRecordBatchStream, + Statistics, WindowExpr, check_if_same_properties, }; use arrow::compute::take_record_batch; @@ -48,7 +49,7 @@ use arrow::{ }; use datafusion_common::hash_utils::create_hashes; use datafusion_common::stats::Precision; -use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_common::tree_node::{Transformed, TreeNodeRecursion}; use datafusion_common::utils::{ evaluate_partition_ranges, get_at_indices, get_row_at_idx, }; @@ -338,6 +339,88 @@ impl ExecutionPlan for BoundedWindowAggExec { Ok(tnr) } + fn map_expressions( + self: Arc, + f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + let mut any_transformed = false; + let mut any_recompute = false; + let mut new_window_exprs: Vec> = + Vec::with_capacity(self.window_expr.len()); + + for window_expr in self.window_expr.iter() { + let all = window_expr.all_expressions(); + let mut new_args: Vec> = + Vec::with_capacity(all.args.len()); + let mut new_partition_bys: Vec> = + Vec::with_capacity(all.partition_by_exprs.len()); + let mut new_order_bys: Vec> = + Vec::with_capacity(all.order_by_exprs.len()); + + for arg in all.args { + let mapped = f(arg)?; + if mapped.expr.transformed { + any_transformed = true; + } + if mapped.recompute == RecomputePropertiesBehavior::Recompute { + any_recompute = true; + } + new_args.push(mapped.expr.data); + } + for pb in all.partition_by_exprs { + let mapped = f(pb)?; + if mapped.expr.transformed { + any_transformed = true; + } + if mapped.recompute == RecomputePropertiesBehavior::Recompute { + any_recompute = true; + } + new_partition_bys.push(mapped.expr.data); + } + for ob in all.order_by_exprs { + let mapped = f(ob)?; + if mapped.expr.transformed { + any_transformed = true; + } + if mapped.recompute == RecomputePropertiesBehavior::Recompute { + any_recompute = true; + } + new_order_bys.push(mapped.expr.data); + } + + let new_expr = window_expr + .with_new_expressions(new_args, new_partition_bys, new_order_bys) + .unwrap_or_else(|| Arc::clone(window_expr)); + new_window_exprs.push(new_expr); + } + + if !any_transformed { + return Ok(Transformed::no(self)); + } + + if any_recompute { + let new_node = BoundedWindowAggExec::try_new( + new_window_exprs, + Arc::clone(&self.input), + self.input_order_mode.clone(), + self.can_repartition, + )?; + Ok(Transformed::yes(Arc::new(new_node) as _)) + } else { + let new_node = BoundedWindowAggExec { + input: Arc::clone(&self.input), + window_expr: new_window_exprs, + schema: Arc::clone(&self.schema), + metrics: ExecutionPlanMetricsSet::new(), + input_order_mode: self.input_order_mode.clone(), + ordered_partition_by_indices: self.ordered_partition_by_indices.clone(), + cache: Arc::clone(&self.cache), + can_repartition: self.can_repartition, + }; + Ok(Transformed::yes(Arc::new(new_node) as _)) + } + } + fn required_input_ordering(&self) -> Vec> { let partition_bys = self.window_expr()[0].partition_by(); let order_keys = self.window_expr()[0].order_by(); diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index c9958c875c6b6..7ad5ee8e1a697 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -31,8 +31,9 @@ use crate::windows::{ }; use crate::{ ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, - ExecutionPlanProperties, PhysicalExpr, PlanProperties, RecordBatchStream, - SendableRecordBatchStream, Statistics, WindowExpr, check_if_same_properties, + ExecutionPlanProperties, MappedExpr, PhysicalExpr, PlanProperties, + RecomputePropertiesBehavior, RecordBatchStream, SendableRecordBatchStream, + Statistics, WindowExpr, check_if_same_properties, }; use arrow::array::ArrayRef; @@ -41,7 +42,7 @@ use arrow::datatypes::SchemaRef; use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; use datafusion_common::stats::Precision; -use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_common::tree_node::{Transformed, TreeNodeRecursion}; use datafusion_common::utils::{evaluate_partition_ranges, transpose}; use datafusion_common::{Result, assert_eq_or_internal_err}; use datafusion_execution::TaskContext; @@ -239,6 +240,86 @@ impl ExecutionPlan for WindowAggExec { Ok(tnr) } + fn map_expressions( + self: Arc, + f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + let mut any_transformed = false; + let mut any_recompute = false; + let mut new_window_exprs: Vec> = + Vec::with_capacity(self.window_expr.len()); + + for window_expr in self.window_expr.iter() { + let all = window_expr.all_expressions(); + let mut new_args: Vec> = + Vec::with_capacity(all.args.len()); + let mut new_partition_bys: Vec> = + Vec::with_capacity(all.partition_by_exprs.len()); + let mut new_order_bys: Vec> = + Vec::with_capacity(all.order_by_exprs.len()); + + for arg in all.args { + let mapped = f(arg)?; + if mapped.expr.transformed { + any_transformed = true; + } + if mapped.recompute == RecomputePropertiesBehavior::Recompute { + any_recompute = true; + } + new_args.push(mapped.expr.data); + } + for pb in all.partition_by_exprs { + let mapped = f(pb)?; + if mapped.expr.transformed { + any_transformed = true; + } + if mapped.recompute == RecomputePropertiesBehavior::Recompute { + any_recompute = true; + } + new_partition_bys.push(mapped.expr.data); + } + for ob in all.order_by_exprs { + let mapped = f(ob)?; + if mapped.expr.transformed { + any_transformed = true; + } + if mapped.recompute == RecomputePropertiesBehavior::Recompute { + any_recompute = true; + } + new_order_bys.push(mapped.expr.data); + } + + let new_expr = window_expr + .with_new_expressions(new_args, new_partition_bys, new_order_bys) + .unwrap_or_else(|| Arc::clone(window_expr)); + new_window_exprs.push(new_expr); + } + + if !any_transformed { + return Ok(Transformed::no(self)); + } + + if any_recompute { + let new_node = WindowAggExec::try_new( + new_window_exprs, + Arc::clone(&self.input), + self.can_repartition, + )?; + Ok(Transformed::yes(Arc::new(new_node) as _)) + } else { + let new_node = WindowAggExec { + input: Arc::clone(&self.input), + window_expr: new_window_exprs, + schema: Arc::clone(&self.schema), + metrics: ExecutionPlanMetricsSet::new(), + ordered_partition_by_indices: self.ordered_partition_by_indices.clone(), + cache: Arc::clone(&self.cache), + can_repartition: self.can_repartition, + }; + Ok(Transformed::yes(Arc::new(new_node) as _)) + } + } + fn maintains_input_order(&self) -> Vec { vec![true] } diff --git a/datafusion/physical-plan/src/work_table.rs b/datafusion/physical-plan/src/work_table.rs index c2ef6bf071c43..32bf7dda045a6 100644 --- a/datafusion/physical-plan/src/work_table.rs +++ b/datafusion/physical-plan/src/work_table.rs @@ -25,13 +25,13 @@ use crate::execution_plan::{Boundedness, EmissionType, SchedulingType}; use crate::memory::MemoryStream; use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::{ - DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, + DisplayAs, DisplayFormatType, ExecutionPlan, MappedExpr, PlanProperties, SendableRecordBatchStream, Statistics, }; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; -use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_common::tree_node::{Transformed, TreeNodeRecursion}; use datafusion_common::{Result, assert_eq_or_internal_err, internal_datafusion_err}; use datafusion_execution::TaskContext; use datafusion_execution::memory_pool::MemoryReservation; @@ -197,6 +197,13 @@ impl ExecutionPlan for WorkTableExec { Ok(TreeNodeRecursion::Continue) } + fn map_expressions( + self: Arc, + _f: &mut dyn FnMut(Arc) -> Result, + ) -> Result>> { + Ok(Transformed::no(self)) + } + fn with_new_children( self: Arc, _: Vec>,