Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions datafusion/catalog/src/memory/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use arrow::compute::{and, filter_record_batch};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::error::Result;
use datafusion_common::tree_node::TreeNodeRecursion;
use datafusion_common::tree_node::{Transformed, TreeNodeRecursion};
use datafusion_common::{Constraints, DFSchema, SchemaExt, not_impl_err, plan_err};
use datafusion_common_runtime::JoinSet;
use datafusion_datasource::memory::{MemSink, MemorySourceConfig};
Expand All @@ -46,8 +46,8 @@ use datafusion_physical_expr::{
use datafusion_physical_plan::repartition::RepartitionExec;
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
use datafusion_physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning,
PhysicalExpr, PlanProperties, common,
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, MappedExpr,
Partitioning, PhysicalExpr, PlanProperties, common,
};
use datafusion_session::Session;

Expand Down Expand Up @@ -643,4 +643,11 @@ impl ExecutionPlan for DmlResultExec {
) -> Result<TreeNodeRecursion> {
Ok(TreeNodeRecursion::Continue)
}

fn map_expressions(
self: Arc<Self>,
_f: &mut dyn FnMut(Arc<dyn PhysicalExpr>) -> Result<MappedExpr>,
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
Ok(Transformed::no(self))
}
}
54 changes: 51 additions & 3 deletions datafusion/datasource/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,17 @@ use std::sync::Arc;

use arrow::array::{ArrayRef, RecordBatch, UInt64Array};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::tree_node::TreeNodeRecursion;
use datafusion_common::tree_node::{Transformed, TreeNodeRecursion};
use datafusion_common::{Result, assert_eq_or_internal_err};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{Distribution, EquivalenceProperties, PhysicalExpr};
use datafusion_physical_expr_common::sort_expr::{LexRequirement, OrderingRequirements};
use datafusion_physical_plan::metrics::MetricsSet;
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
use datafusion_physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning,
PlanProperties, SendableRecordBatchStream, execute_input_stream,
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, MappedExpr,
Partitioning, PlanProperties, RecomputePropertiesBehavior, SendableRecordBatchStream,
execute_input_stream,
};

use async_trait::async_trait;
Expand Down Expand Up @@ -233,6 +234,53 @@ impl ExecutionPlan for DataSinkExec {
Ok(TreeNodeRecursion::Continue)
}

fn map_expressions(
self: Arc<Self>,
f: &mut dyn FnMut(Arc<dyn PhysicalExpr>) -> Result<MappedExpr>,
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
let Some(sort_order) = &self.sort_order else {
return Ok(Transformed::no(self));
};

let mut any_transformed = false;
let mut any_recompute = false;
let mut new_reqs = Vec::with_capacity(sort_order.len());
for req in sort_order.iter() {
let mapped = f(Arc::clone(&req.expr))?;
if mapped.expr.transformed {
any_transformed = true;
}
if mapped.recompute == RecomputePropertiesBehavior::Recompute {
any_recompute = true;
}
new_reqs.push(
datafusion_physical_expr_common::sort_expr::PhysicalSortRequirement {
expr: mapped.expr.data,
options: req.options,
},
);
}

if !any_transformed {
return Ok(Transformed::no(self));
}

let new_sort_order = LexRequirement::new(new_reqs);
let new_node = if any_recompute {
DataSinkExec::new(
Arc::clone(&self.input),
Arc::clone(&self.sink),
new_sort_order,
)
} else {
DataSinkExec {
sort_order: new_sort_order,
..(*self).clone()
}
};
Ok(Transformed::yes(Arc::new(new_node) as _))
}

/// Execute the plan and return a stream of `RecordBatch`es for
/// the specified partition.
fn execute(
Expand Down
11 changes: 9 additions & 2 deletions datafusion/datasource/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use datafusion_physical_plan::projection::ProjectionExec;
use datafusion_physical_plan::stream::BatchSplitStream;
use datafusion_physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
DisplayAs, DisplayFormatType, ExecutionPlan, MappedExpr, PlanProperties,
};
use itertools::Itertools;

use crate::file_scan_config::FileScanConfig;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::TreeNodeRecursion;
use datafusion_common::tree_node::{Transformed, TreeNodeRecursion};
use datafusion_common::{Constraints, Result, Statistics};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr};
Expand Down Expand Up @@ -303,6 +303,13 @@ impl ExecutionPlan for DataSourceExec {
self.data_source.apply_expressions(f)
}

fn map_expressions(
self: Arc<Self>,
_f: &mut dyn FnMut(Arc<dyn PhysicalExpr>) -> Result<MappedExpr>,
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
Ok(Transformed::no(self))
}

fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
Expand Down
21 changes: 20 additions & 1 deletion datafusion/ffi/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ use std::sync::Arc;

use abi_stable::StableAbi;
use abi_stable::std_types::{RString, RVec};
use datafusion_common::tree_node::Transformed;
use datafusion_common::tree_node::TreeNodeRecursion;
use datafusion_common::{DataFusionError, Result};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
DisplayAs, DisplayFormatType, ExecutionPlan, MappedExpr, PlanProperties,
};
use tokio::runtime::Handle;

Expand Down Expand Up @@ -310,6 +311,15 @@ impl ExecutionPlan for ForeignExecutionPlan {
}
Ok(tnr)
}

fn map_expressions(
self: Arc<Self>,
_f: &mut dyn FnMut(
Arc<dyn datafusion_physical_plan::PhysicalExpr>,
) -> Result<MappedExpr>,
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
Ok(Transformed::no(self))
}
}

#[cfg(test)]
Expand Down Expand Up @@ -400,6 +410,15 @@ pub(crate) mod tests {
}
Ok(tnr)
}

fn map_expressions(
self: Arc<Self>,
_f: &mut dyn FnMut(
Arc<dyn datafusion_physical_plan::PhysicalExpr>,
) -> Result<MappedExpr>,
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
Ok(Transformed::no(self))
}
}

#[test]
Expand Down
13 changes: 11 additions & 2 deletions datafusion/physical-optimizer/src/output_requirements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ use datafusion_physical_plan::projection::{
use datafusion_physical_plan::sorts::sort::SortExec;
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion_physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
SendableRecordBatchStream,
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, MappedExpr,
PlanProperties, SendableRecordBatchStream,
};

/// This rule either adds or removes [`OutputRequirements`]s to/from the physical
Expand Down Expand Up @@ -328,6 +328,15 @@ impl ExecutionPlan for OutputRequirementExec {

Ok(tnr)
}

fn map_expressions(
self: Arc<Self>,
_f: &mut dyn FnMut(
Arc<dyn datafusion_physical_expr::PhysicalExpr>,
) -> Result<MappedExpr>,
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
Ok(Transformed::no(self))
}
}

impl PhysicalOptimizerRule for OutputRequirements {
Expand Down
131 changes: 128 additions & 3 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ use crate::filter_pushdown::{
};
use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::{
DisplayFormatType, Distribution, ExecutionPlan, InputOrderMode,
SendableRecordBatchStream, Statistics, check_if_same_properties,
DisplayFormatType, Distribution, ExecutionPlan, InputOrderMode, MappedExpr,
RecomputePropertiesBehavior, SendableRecordBatchStream, Statistics,
check_if_same_properties,
};
use datafusion_common::config::ConfigOptions;
use datafusion_physical_expr::utils::collect_columns;
Expand All @@ -45,7 +46,7 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow_schema::FieldRef;
use datafusion_common::stats::Precision;
use datafusion_common::tree_node::TreeNodeRecursion;
use datafusion_common::tree_node::{Transformed, TreeNodeRecursion};
use datafusion_common::{
Constraint, Constraints, Result, ScalarValue, assert_eq_or_internal_err, not_impl_err,
};
Expand Down Expand Up @@ -1414,6 +1415,123 @@ impl ExecutionPlan for AggregateExec {
Ok(tnr)
}

fn map_expressions(
self: Arc<Self>,
f: &mut dyn FnMut(Arc<dyn PhysicalExpr>) -> Result<MappedExpr>,
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
let mut any_transformed = false;
let mut any_recompute = false;

// Map group_by expressions
let mut new_group_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> =
Vec::with_capacity(self.group_by.expr.len());
for (expr, alias) in self.group_by.expr.iter() {
let mapped = f(Arc::clone(expr))?;
if mapped.expr.transformed {
any_transformed = true;
}
if mapped.recompute == RecomputePropertiesBehavior::Recompute {
any_recompute = true;
}
new_group_exprs.push((mapped.expr.data, alias.clone()));
}

// Map aggr_expr (args + order_by expressions)
let mut new_aggr_exprs: Vec<Arc<AggregateFunctionExpr>> =
Vec::with_capacity(self.aggr_expr.len());
for aggr in self.aggr_expr.iter() {
let all = aggr.all_expressions();
let mut new_args: Vec<Arc<dyn PhysicalExpr>> =
Vec::with_capacity(all.args.len());
let mut new_order_bys: Vec<Arc<dyn PhysicalExpr>> =
Vec::with_capacity(all.order_by_exprs.len());
for arg in all.args {
let mapped = f(arg)?;
if mapped.expr.transformed {
any_transformed = true;
}
if mapped.recompute == RecomputePropertiesBehavior::Recompute {
any_recompute = true;
}
new_args.push(mapped.expr.data);
}
for ob_expr in all.order_by_exprs {
let mapped = f(ob_expr)?;
if mapped.expr.transformed {
any_transformed = true;
}
if mapped.recompute == RecomputePropertiesBehavior::Recompute {
any_recompute = true;
}
new_order_bys.push(mapped.expr.data);
}
let new_aggr = aggr
.with_new_expressions(new_args, new_order_bys)
.unwrap_or_else(|| (**aggr).clone());
new_aggr_exprs.push(Arc::new(new_aggr));
}

// Map filter expressions (FILTER WHERE clauses)
let mut new_filter_exprs: Vec<Option<Arc<dyn PhysicalExpr>>> =
Vec::with_capacity(self.filter_expr.len());
for filter_opt in self.filter_expr.iter() {
match filter_opt {
None => new_filter_exprs.push(None),
Some(filter) => {
let mapped = f(Arc::clone(filter))?;
if mapped.expr.transformed {
any_transformed = true;
}
if mapped.recompute == RecomputePropertiesBehavior::Recompute {
any_recompute = true;
}
new_filter_exprs.push(Some(mapped.expr.data));
}
}
}

if !any_transformed {
return Ok(Transformed::no(self));
}

let new_group_by = PhysicalGroupBy::new(
new_group_exprs,
self.group_by.null_expr.clone(),
self.group_by.groups.clone(),
self.group_by.has_grouping_set,
);

if any_recompute {
let mut new_node = AggregateExec::try_new(
self.mode,
new_group_by,
new_aggr_exprs,
new_filter_exprs,
Arc::clone(&self.input),
Arc::clone(&self.input_schema),
)?;
new_node.limit_options = self.limit_options;
Ok(Transformed::yes(Arc::new(new_node) as _))
} else {
let new_node = AggregateExec {
mode: self.mode,
group_by: Arc::new(new_group_by),
aggr_expr: new_aggr_exprs.into(),
filter_expr: new_filter_exprs.into(),
limit_options: self.limit_options,
input: Arc::clone(&self.input),
schema: Arc::clone(&self.schema),
input_schema: Arc::clone(&self.input_schema),
metrics: ExecutionPlanMetricsSet::new(),
required_input_ordering: self.required_input_ordering.clone(),
input_order_mode: self.input_order_mode.clone(),
cache: Arc::clone(&self.cache),
dynamic_filter: self.dynamic_filter.clone(),
};
Ok(Transformed::yes(Arc::new(new_node) as _))
}
}

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
Expand Down Expand Up @@ -2518,6 +2636,13 @@ mod tests {
Ok(TreeNodeRecursion::Continue)
}

fn map_expressions(
self: Arc<Self>,
_f: &mut dyn FnMut(Arc<dyn PhysicalExpr>) -> Result<MappedExpr>,
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
Ok(Transformed::no(self))
}

fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
Expand Down
11 changes: 9 additions & 2 deletions datafusion/physical-plan/src/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ use super::{
};
use crate::display::DisplayableExecutionPlan;
use crate::metrics::MetricType;
use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
use crate::{DisplayFormatType, ExecutionPlan, MappedExpr, Partitioning};

use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
use datafusion_common::instant::Instant;
use datafusion_common::tree_node::TreeNodeRecursion;
use datafusion_common::tree_node::{Transformed, TreeNodeRecursion};
use datafusion_common::{DataFusionError, Result, assert_eq_or_internal_err};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::EquivalenceProperties;
Expand Down Expand Up @@ -152,6 +152,13 @@ impl ExecutionPlan for AnalyzeExec {
Ok(TreeNodeRecursion::Continue)
}

fn map_expressions(
self: Arc<Self>,
_f: &mut dyn FnMut(Arc<dyn PhysicalExpr>) -> Result<MappedExpr>,
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
Ok(Transformed::no(self))
}

fn with_new_children(
self: Arc<Self>,
mut children: Vec<Arc<dyn ExecutionPlan>>,
Expand Down
Loading
Loading