Skip to content

[AURON #2177] Implement native support for lag window function#2199

Open
officialasishkumar wants to merge 1 commit intoapache:masterfrom
officialasishkumar:feat/native-window-lag
Open

[AURON #2177] Implement native support for lag window function#2199
officialasishkumar wants to merge 1 commit intoapache:masterfrom
officialasishkumar:feat/native-window-lag

Conversation

@officialasishkumar
Copy link
Copy Markdown

Which issue does this PR close?

Closes #2177

Rationale for this change

Auron's native window support previously covered rank-like functions and a subset of aggregate window functions, but did not support offset-based window functions such as lag(...).

This PR extends native window coverage to include lag(...):

  • support lag(...)
  • preserve Spark-compatible behavior for input, inputOffset, and default
  • keep unsupported semantics out of the native path rather than approximating them incorrectly

What changes are included in this PR?

This PR:

  • adds Lag handling in NativeWindowBase
  • extends the protobuf/planner window function enum with LAG
  • adds native planner support to decode LAG into the native window plan
  • introduces a native LagProcessor in datafusion-ext-plans
  • evaluates lag using Spark-compatible offset/default/null behavior
  • adds a full-partition processing path for lag so that lookback works correctly across input batches
  • adds Rust regression coverage for cross-batch lag
  • adds Scala regression tests for:
    • native lag(...) execution
    • Spark fallback for lag(...) IGNORE NULLS

The native implementation supports Spark semantics for:

  • lag(input)

    • default offset is 1
    • default value is null
  • lag(input, offset, default)

    • returns the value of input at the offsetth row before the current row in the same window partition
    • if the target row exists and input there is null, returns null
    • if the target row does not exist, returns default

Supported scope in this PR:

  • standard RESPECT NULLS behavior

Not supported natively in this PR:

  • IGNORE NULLS

Unsupported IGNORE NULLS queries continue to fall back to Spark to preserve correctness.

The full-partition processing infrastructure added here mirrors the approach used for offset-based window functions, ensuring all rows in a partition are available before computing lag values across batch boundaries.

Are there any user-facing changes?

Yes.
Queries using lag(...) can now remain on Auron's native window execution path when they use supported semantics.
Queries using unsupported lag(...) IGNORE NULLS behavior will continue to fall back to Spark.

How was this patch tested?

CI.

Spark `lag(...)` is not supported in Auron's native window execution
path, causing queries using it to fall back to the Spark path instead
of being executed natively.

This PR extends native window coverage to include `lag(...)`:
- adds `Lag` handling in `NativeWindowBase`
- extends the protobuf/planner window function enum with `LAG`
- adds native planner support to decode `LAG` into the native window plan
- introduces a native `LagProcessor` in `datafusion-ext-plans`
- evaluates `lag` using Spark-compatible offset/default/null behavior
- adds a full-partition processing path for `lag` so that lookback
  works correctly across input batches
- adds Rust regression coverage for cross-batch `lag`
- adds Scala regression tests for:
  - native `lag(...)` execution
  - Spark fallback for `lag(...) IGNORE NULLS`

The native implementation supports Spark semantics for:

- `lag(input)`
  - default offset is 1
  - default value is null

- `lag(input, offset, default)`
  - returns the value of `input` at the `offset`th row before the
    current row in the same window partition
  - if the target row exists and `input` there is null, returns null
  - if the target row does not exist, returns `default`

Supported scope in this PR:
- standard `RESPECT NULLS` behavior

Not supported natively in this PR:
- `IGNORE NULLS`

Unsupported `IGNORE NULLS` queries continue to fall back to Spark
to preserve correctness.

The full-partition processing infrastructure added here mirrors the
approach used for `lead` offset functions, ensuring all rows in a
partition are available before computing lag values across batch
boundaries.

Signed-off-by: Asish Kumar <officialasishkumar@gmail.com>
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds native execution support for Spark’s lag(...) window function in Auron’s native window path, including planner/protobuf wiring and regression tests, while explicitly falling back for unsupported IGNORE NULLS semantics.

Changes:

  • Add Lag handling in Spark-side native window plan construction (with Spark fallback for IGNORE NULLS).
  • Extend native planner + protobuf to represent LAG, and implement a native Rust LagProcessor.
  • Add Rust + Scala regression tests, including cross-batch correctness and Spark fallback coverage.

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeWindowBase.scala Adds Spark-side detection/planning for LAG window expressions and blocks IGNORE NULLS.
spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronWindowSuite.scala Adds Scala tests for native lag and Spark fallback for IGNORE NULLS.
native-engine/datafusion-ext-plans/src/window_exec.rs Adds “full-partition” execution path (concat all batches) to support cross-batch lag evaluation.
native-engine/datafusion-ext-plans/src/window/window_context.rs Adds requires_full_partition() helper to drive execution strategy.
native-engine/datafusion-ext-plans/src/window/processors/mod.rs Exposes new lag_processor module.
native-engine/datafusion-ext-plans/src/window/processors/lag_processor.rs Implements LagProcessor computing lag with offset/default semantics.
native-engine/datafusion-ext-plans/src/window/mod.rs Adds WindowFunction::Lag wiring and a requires_full_partition() marker on expressions.
native-engine/auron-planner/src/planner.rs Decodes protobuf LAG into native WindowFunction::Lag.
native-engine/auron-planner/proto/auron.proto Extends protobuf enum with LAG.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +93 to +97
private def lagIgnoreNulls(expr: Lag): Boolean =
expr.getClass.getMethods
.find(method => method.getName == "ignoreNulls" && method.getParameterCount == 0)
.exists(method => method.invoke(expr).asInstanceOf[Boolean])

Comment on lines +222 to +239
if window_ctx.requires_full_partition() {
let mut staging_batches = vec![];
while let Some(batch) = input.next().await.transpose()? {
staging_batches.push(batch);
}

let outputs: Vec<ArrayRef> = batch
.columns()
.iter()
.cloned()
.chain(if window_ctx.output_window_cols {
window_cols
} else {
vec![]
})
.zip(window_ctx.output_schema.fields())
.map(|(array, field)| {
if array.data_type() != field.data_type() {
return cast(&array, field.data_type());
}
Ok(array.clone())
})
.collect::<Result<_>>()?;
let output_batch = RecordBatch::try_new_with_options(
window_ctx.output_schema.clone(),
outputs,
&RecordBatchOptions::new().with_row_count(Some(batch.num_rows())),
)?;
if !staging_batches.is_empty() {
let _timer = elapsed_compute.timer();
let batch = concat_batches(&window_ctx.input_schema, &staging_batches)?;
let output_batch =
process_window_batch(batch, &window_ctx, processors.as_mut_slice())?;
exec_ctx
.baseline_metrics()
.record_output(output_batch.num_rows());
sender.send(output_batch).await;
}
return Ok(());
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to this, the full-partition path buffers all input batches, then concat_batches allocates another contiguous copy. Peak memory is ~3x partition size all live simultaneously

Comment on lines +45 to +65
let input_values = self.children[0]
.evaluate(batch)
.and_then(|v| v.into_array(batch.num_rows()))?;

let offset_values = self.children[1]
.evaluate(batch)
.and_then(|v| v.into_array(batch.num_rows()))?;
let offset_values = if offset_values.data_type() == &DataType::Int32 {
offset_values
} else {
cast(&offset_values, &DataType::Int32)?
};
let offset = match ScalarValue::try_from_array(&offset_values, 0)? {
ScalarValue::Int32(Some(offset)) => offset as i64,
other => {
return Err(DataFusionError::Execution(format!(
"lag offset must be a non-null foldable integer, got {other:?}",
)));
}
};

Comment on lines +39 to +43
assert_eq!(
self.children.len(),
3,
"lag expects input/offset/default children",
);
Copy link
Copy Markdown
Contributor

@ShreyeshArangath ShreyeshArangath left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some high-level comments on the approach, I might be missing details here. If you can capture it in the PR description, it'll be useful for other reviewers as well

Comment on lines +222 to +239
if window_ctx.requires_full_partition() {
let mut staging_batches = vec![];
while let Some(batch) = input.next().await.transpose()? {
staging_batches.push(batch);
}

let outputs: Vec<ArrayRef> = batch
.columns()
.iter()
.cloned()
.chain(if window_ctx.output_window_cols {
window_cols
} else {
vec![]
})
.zip(window_ctx.output_schema.fields())
.map(|(array, field)| {
if array.data_type() != field.data_type() {
return cast(&array, field.data_type());
}
Ok(array.clone())
})
.collect::<Result<_>>()?;
let output_batch = RecordBatch::try_new_with_options(
window_ctx.output_schema.clone(),
outputs,
&RecordBatchOptions::new().with_row_count(Some(batch.num_rows())),
)?;
if !staging_batches.is_empty() {
let _timer = elapsed_compute.timer();
let batch = concat_batches(&window_ctx.input_schema, &staging_batches)?;
let output_batch =
process_window_batch(batch, &window_ctx, processors.as_mut_slice())?;
exec_ctx
.baseline_metrics()
.record_output(output_batch.num_rows());
sender.send(output_batch).await;
}
return Ok(());
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to this, the full-partition path buffers all input batches, then concat_batches allocates another contiguous copy. Peak memory is ~3x partition size all live simultaneously

let value = if target_idx >= partition_start && target_idx < partition_end {
ScalarValue::try_from_array(&input_values, target_idx as usize)?
} else {
ScalarValue::try_from_array(&default_values, row_idx)?
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, the row-by-row ScalarValue::try_from_array + ScalarValue::iter_to_array pattern creates N heap-allocated scalar objects. For large partitions that full-partition buffering implies, this is will become an issue. Can we use arrow::compute::take with a pre-computed indices array to gather values in O(1) allocations?

)?;
if !staging_batches.is_empty() {
let _timer = elapsed_compute.timer();
let batch = concat_batches(&window_ctx.input_schema, &staging_batches)?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The full-partition buffering approach works okay but introduces unbounded memory risk for skewed partitions. Since the input is guaranteed sorted by partition keys, lag can be implemented as a streaming processor (like Rank/Agg) with an O(offset) ring buffer. This would eliminate the need for requires_full_partition(), concat_batches, and the dual code paths in execute_window. Wanted to check with you and see if you had considered a streaming approach here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement native support for lag window function

3 participants