From b701938451ce92e53667f85b1025eb1cc8b778df Mon Sep 17 00:00:00 2001 From: Ford Date: Thu, 26 Mar 2026 19:47:24 -0700 Subject: [PATCH 1/2] feat(common): add filter pushthrough optimization for evm_decode Automatically rewrites filters on evm_decode output fields into equivalent filters on raw topic columns when the field corresponds to an indexed event parameter. The derived topic filter is pushed down to the parquet scan level, enabling statistics pruning, bloom filters, and row-level pushdown before any decoding occurs. --- .../core/common/src/context/session_state.rs | 7 +- .../common/src/filter_pushthrough_decode.rs | 944 ++++++++++++++++++ crates/core/common/src/lib.rs | 1 + crates/core/common/src/udfs.rs | 2 +- crates/core/common/src/udfs/evm_common.rs | 16 + 5 files changed, 968 insertions(+), 2 deletions(-) create mode 100644 crates/core/common/src/filter_pushthrough_decode.rs diff --git a/crates/core/common/src/context/session_state.rs b/crates/core/common/src/context/session_state.rs index c83c2a979..4770b8d7f 100644 --- a/crates/core/common/src/context/session_state.rs +++ b/crates/core/common/src/context/session_state.rs @@ -124,13 +124,18 @@ impl SessionState { /// Applies DataFusion logical optimizations to an existing plan. /// + /// Runs custom pre-optimisation rewrites (filter pushthrough for + /// `evm_decode`) before delegating to DataFusion's standard optimizer. + /// This ensures derived predicates participate in predicate pushdown. + /// /// Does not trigger SQL pre-resolution; the plan is assumed to be fully /// resolved already. /// /// Mirrors [`DfSessionState::optimize`] (synchronous). #[tracing::instrument(skip_all, err)] pub fn optimize(&self, plan: &LogicalPlan) -> Result { - self.state.optimize(plan) + let plan = crate::filter_pushthrough_decode::filter_pushthrough_decode(plan.clone())?; + self.state.optimize(&plan) } /// Returns a new `SessionState` with async catalog pre-resolution diff --git a/crates/core/common/src/filter_pushthrough_decode.rs b/crates/core/common/src/filter_pushthrough_decode.rs new file mode 100644 index 000000000..31da4d2f7 --- /dev/null +++ b/crates/core/common/src/filter_pushthrough_decode.rs @@ -0,0 +1,944 @@ +//! Filter pushthrough for `evm_decode` / `evm_decode_log`. +//! +//! When a query filters on a decoded field that maps to an indexed event +//! parameter, this rewrite derives an equivalent filter on the raw topic +//! column. The derived filter is added conjunctively (AND) with the original +//! so that downstream pushdown optimisations can push it to the scan level. + +use std::sync::Arc; + +use alloy::dyn_abi::DynSolType; +use datafusion::{ + common::tree_node::{Transformed, TreeNode as _, TreeNodeRewriter}, + error::DataFusionError, + logical_expr::{BinaryExpr, Filter, LogicalPlan, Operator, Projection, SubqueryAlias}, + prelude::Expr, + scalar::ScalarValue, +}; + +use crate::udfs::evm_common::Event; + +/// Rewrites a logical plan, injecting derived topic-column predicates for +/// filters on `evm_decode` / `evm_decode_log` output fields that correspond +/// to indexed event parameters. +pub fn filter_pushthrough_decode(plan: LogicalPlan) -> Result { + let mut rewriter = FilterPushthroughDecode; + plan.rewrite(&mut rewriter).map(|t| t.data) +} + +// --------------------------------------------------------------------------- +// TreeNodeRewriter +// --------------------------------------------------------------------------- + +struct FilterPushthroughDecode; + +/// A derived topic predicate, tagged with where it should be placed. +enum Derived { + /// The evm_decode call is inline in the filter expression — the derived + /// predicate references columns visible at the current filter level and + /// can simply be ANDed onto the filter predicate. + Local(Expr), + /// The evm_decode call lives inside a child Projection behind a column + /// alias. The derived predicate references topic columns that are only + /// visible *below* that projection, so it must be injected as a new + /// Filter between the Projection and its input. + Deep { col_name: String, predicate: Expr }, +} + +impl TreeNodeRewriter for FilterPushthroughDecode { + type Node = LogicalPlan; + + fn f_up(&mut self, node: Self::Node) -> Result, DataFusionError> { + let LogicalPlan::Filter(ref filter) = node else { + return Ok(Transformed::no(node)); + }; + + let derivations = derive_topic_predicates(&filter.predicate, &filter.input); + if derivations.is_empty() { + return Ok(Transformed::no(node)); + } + + let mut new_predicate = filter.predicate.clone(); + let mut new_input: LogicalPlan = (*filter.input).clone(); + + for d in derivations { + match d { + Derived::Local(pred) => { + new_predicate = new_predicate.and(pred); + } + Derived::Deep { col_name, predicate } => { + new_input = + inject_filter_below_decode_projection(new_input, &col_name, predicate); + } + } + } + + let new_filter = + LogicalPlan::Filter(Filter::try_new(new_predicate, Arc::new(new_input))?); + Ok(Transformed::yes(new_filter)) + } +} + +// --------------------------------------------------------------------------- +// Predicate derivation +// --------------------------------------------------------------------------- + +/// Walks the predicate expression tree and collects derived topic predicates. +fn derive_topic_predicates(predicate: &Expr, input: &Arc) -> Vec { + let mut derived = Vec::new(); + collect_derived(predicate, input, &mut derived); + derived +} + +fn collect_derived(expr: &Expr, input: &Arc, out: &mut Vec) { + match expr { + // Recurse into AND conjunctions. + Expr::BinaryExpr(binary) if binary.op == Operator::And => { + collect_derived(&binary.left, input, out); + collect_derived(&binary.right, input, out); + } + + // Equality: get_field(evm_decode(...), field) = literal + Expr::BinaryExpr(binary) if binary.op == Operator::Eq => { + if let Some(d) = try_derive_eq(&binary.left, &binary.right, input) { + out.push(d); + } else if let Some(d) = try_derive_eq(&binary.right, &binary.left, input) { + out.push(d); + } + } + + // IN list: get_field(evm_decode(...), field) IN (lit, lit, ...) + Expr::InList(in_list) if !in_list.negated => { + if let Some(d) = try_derive_in_list(&in_list.expr, &in_list.list, input) { + out.push(d); + } + } + + _ => {} + } +} + +/// Attempts to derive a `topic_col = encoded_literal` predicate. +fn try_derive_eq( + field_expr: &Expr, + literal_expr: &Expr, + input: &Arc, +) -> Option { + let resolved = resolve_get_field_to_topic(field_expr, input)?; + let literal_bytes = extract_literal_bytes(literal_expr)?; + + if !is_value_type(&resolved.sol_type) { + return None; + } + + let encoded = encode_to_topic(&resolved.sol_type, &literal_bytes).ok()?; + let encoded_literal = ScalarValue::FixedSizeBinary(32, Some(encoded.to_vec())); + + let predicate = Expr::BinaryExpr(BinaryExpr::new( + Box::new(resolved.topic_col_expr), + Operator::Eq, + Box::new(Expr::Literal(encoded_literal, None)), + )); + + Some(match resolved.col_name { + None => Derived::Local(predicate), + Some(col_name) => Derived::Deep { col_name, predicate }, + }) +} + +/// Attempts to derive a `topic_col IN (enc1, enc2, ...)` predicate. +fn try_derive_in_list( + field_expr: &Expr, + list: &[Expr], + input: &Arc, +) -> Option { + let resolved = resolve_get_field_to_topic(field_expr, input)?; + + if !is_value_type(&resolved.sol_type) { + return None; + } + + let mut encoded_list = Vec::with_capacity(list.len()); + for item in list { + let bytes = extract_literal_bytes(item)?; + let encoded = encode_to_topic(&resolved.sol_type, &bytes).ok()?; + encoded_list.push(Expr::Literal( + ScalarValue::FixedSizeBinary(32, Some(encoded.to_vec())), + None, + )); + } + + let predicate = Expr::InList(datafusion::logical_expr::expr::InList::new( + Box::new(resolved.topic_col_expr), + encoded_list, + false, + )); + + Some(match resolved.col_name { + None => Derived::Local(predicate), + Some(col_name) => Derived::Deep { col_name, predicate }, + }) +} + +// --------------------------------------------------------------------------- +// Expression pattern matching +// --------------------------------------------------------------------------- + +/// Result of resolving a `get_field(evm_decode_log(...), field)` expression. +struct ResolvedTopic { + /// The expression referencing the topic column (e.g. `Column("topic2")`). + topic_col_expr: Expr, + /// The Solidity type of the indexed parameter. + sol_type: DynSolType, + /// If the evm_decode was resolved through a column alias in a child + /// projection, this is the alias name. `None` when the evm_decode call + /// appears directly in the filter expression. + col_name: Option, +} + +/// Resolves a `get_field(evm_decode_log(...), field_name)` expression. +/// +/// Handles both: +/// - Direct inline `evm_decode_log(...)` references in the expression. +/// - Column references that resolve to `evm_decode_log(...)` via a child +/// projection. +fn resolve_get_field_to_topic(expr: &Expr, input: &Arc) -> Option { + let Expr::ScalarFunction(get_field_sf) = expr else { + return None; + }; + if get_field_sf.func.name() != "get_field" || get_field_sf.args.len() < 2 { + return None; + } + + let field_name = match &get_field_sf.args[1] { + Expr::Literal(ScalarValue::Utf8(Some(name)), _) => name.as_str(), + _ => return None, + }; + + let struct_expr = &get_field_sf.args[0]; + + // Case 1: direct evm_decode_log(...) call. + if let Expr::ScalarFunction(sf) = struct_expr + && let Some((topic_col_expr, sol_type)) = resolve_from_decode_call(sf, field_name) + { + return Some(ResolvedTopic { + topic_col_expr, + sol_type, + col_name: None, + }); + } + + // Case 2: column reference → resolve through the input plan. + if let Expr::Column(col) = struct_expr + && let Some(sf) = find_evm_decode_in_plan(&col.name, input) + && let Some((topic_col_expr, sol_type)) = resolve_from_decode_call(&sf, field_name) + { + return Some(ResolvedTopic { + topic_col_expr, + sol_type, + col_name: Some(col.name.clone()), + }); + } + + None +} + +/// Given a `ScalarFunction` that is an `evm_decode_log` / `evm_decode` call, +/// resolve which topic column corresponds to `field_name` and return the +/// topic column expression together with the Solidity type. +fn resolve_from_decode_call( + sf: &datafusion::logical_expr::expr::ScalarFunction, + field_name: &str, +) -> Option<(Expr, DynSolType)> { + let name = sf.func.name(); + if name != "evm_decode_log" && name != "evm_decode" { + return None; + } + + // evm_decode_log(topic1, topic2, topic3, data, signature) + if sf.args.len() != 5 { + return None; + } + + let sig_str = match &sf.args[4] { + Expr::Literal(ScalarValue::Utf8(Some(s)), _) => s, + _ => return None, + }; + + let event: Event = sig_str.parse().ok()?; + let (topic_index, sol_type) = event.find_indexed_param(field_name)?; + + // topic_index 0 → sf.args[0] (topic1), 1 → sf.args[1] (topic2), etc. + let topic_col_expr: Expr = sf.args[topic_index].clone(); + let sol_type: DynSolType = sol_type.clone(); + Some((topic_col_expr, sol_type)) +} + +/// Walks the plan tree below a Filter to find a Projection expression +/// aliased to `col_name` that is an `evm_decode_log` / `evm_decode` call. +fn find_evm_decode_in_plan( + col_name: &str, + plan: &LogicalPlan, +) -> Option { + match plan { + LogicalPlan::Projection(proj) => { + for expr in &proj.expr { + let (alias_name, inner) = match expr { + Expr::Alias(alias) => (alias.name.as_str(), alias.expr.as_ref()), + _ => continue, + }; + if alias_name != col_name { + continue; + } + if let Expr::ScalarFunction(sf) = inner { + let n = sf.func.name(); + if n == "evm_decode_log" || n == "evm_decode" { + return Some(sf.clone()); + } + } + } + None + } + LogicalPlan::SubqueryAlias(sq) => find_evm_decode_in_plan(col_name, &sq.input), + _ => None, + } +} + +// --------------------------------------------------------------------------- +// Deep-injection: place a derived predicate below a decode projection +// --------------------------------------------------------------------------- + +/// Returns `true` if `proj` contains an `evm_decode_log` / `evm_decode` +/// expression aliased to `col_name`. +fn projection_has_decode_alias(proj: &Projection, col_name: &str) -> bool { + proj.expr.iter().any(|e| { + if let Expr::Alias(alias) = e + && alias.name == col_name + && let Expr::ScalarFunction(sf) = alias.expr.as_ref() + { + let n = sf.func.name(); + n == "evm_decode_log" || n == "evm_decode" + } else { + false + } + }) +} + +/// Walks through SubqueryAlias/Projection nodes and injects `predicate` as a +/// new Filter between the decode Projection and its input. Returns the +/// modified plan. If the target projection is not found, the plan is +/// returned unchanged (safety: no modification on mismatch). +fn inject_filter_below_decode_projection( + plan: LogicalPlan, + col_name: &str, + predicate: Expr, +) -> LogicalPlan { + match plan { + LogicalPlan::Projection(proj) if projection_has_decode_alias(&proj, col_name) => { + // Inject a Filter between the Projection and its input. + match Filter::try_new(predicate, proj.input.clone()) { + Ok(new_filter) => Projection::try_new( + proj.expr.clone(), + Arc::new(LogicalPlan::Filter(new_filter)), + ) + .map(LogicalPlan::Projection) + .unwrap_or(LogicalPlan::Projection(proj)), + Err(_) => LogicalPlan::Projection(proj), + } + } + LogicalPlan::SubqueryAlias(sq) => { + let modified_inner = + inject_filter_below_decode_projection((*sq.input).clone(), col_name, predicate); + match SubqueryAlias::try_new(Arc::new(modified_inner), sq.alias.clone()) { + Ok(new_sq) => LogicalPlan::SubqueryAlias(new_sq), + Err(_) => LogicalPlan::SubqueryAlias(sq), + } + } + other => other, + } +} + +// --------------------------------------------------------------------------- +// Literal extraction +// --------------------------------------------------------------------------- + +/// Extracts raw bytes from a literal expression suitable for ABI encoding. +fn extract_literal_bytes(expr: &Expr) -> Option> { + match expr { + Expr::Literal(scalar, _) => extract_scalar_bytes(scalar), + // Handle CAST(literal AS type) — DF may wrap literals in casts. + Expr::Cast(cast) => extract_literal_bytes(&cast.expr), + Expr::TryCast(cast) => extract_literal_bytes(&cast.expr), + _ => None, + } +} + +fn extract_scalar_bytes(scalar: &ScalarValue) -> Option> { + match scalar { + ScalarValue::Binary(Some(b)) | ScalarValue::LargeBinary(Some(b)) => Some(b.clone()), + ScalarValue::FixedSizeBinary(_, Some(b)) => Some(b.clone()), + ScalarValue::Boolean(Some(b)) => Some(vec![*b as u8]), + ScalarValue::UInt8(Some(n)) => Some(strip_leading_zeros(&n.to_be_bytes())), + ScalarValue::UInt16(Some(n)) => Some(strip_leading_zeros(&n.to_be_bytes())), + ScalarValue::UInt32(Some(n)) => Some(strip_leading_zeros(&n.to_be_bytes())), + ScalarValue::UInt64(Some(n)) => Some(strip_leading_zeros(&n.to_be_bytes())), + ScalarValue::Int8(Some(n)) => Some(n.to_be_bytes().to_vec()), + ScalarValue::Int16(Some(n)) => Some(n.to_be_bytes().to_vec()), + ScalarValue::Int32(Some(n)) => Some(n.to_be_bytes().to_vec()), + ScalarValue::Int64(Some(n)) => Some(n.to_be_bytes().to_vec()), + _ => None, + } +} + +/// Strips leading zero bytes so that `encode_to_topic` can left-pad +/// correctly for unsigned integers. +fn strip_leading_zeros(bytes: &[u8]) -> Vec { + let first_nonzero = bytes.iter().position(|&b| b != 0).unwrap_or(bytes.len()); + if first_nonzero == bytes.len() { + // Value is zero — keep one byte. + vec![0] + } else { + bytes[first_nonzero..].to_vec() + } +} + +// --------------------------------------------------------------------------- +// ABI topic encoding +// --------------------------------------------------------------------------- + +/// Returns `true` if the Solidity type is a value type that can be +/// meaningfully encoded into a 32-byte topic word. Reference types +/// (`string`, `bytes`, arrays, tuples) are hashed by Solidity, so the +/// original value cannot be reconstructed from the topic. +fn is_value_type(ty: &DynSolType) -> bool { + matches!( + ty, + DynSolType::Address + | DynSolType::Bool + | DynSolType::Uint(_) + | DynSolType::Int(_) + | DynSolType::FixedBytes(_) + ) +} + +/// Encodes a raw value into a 32-byte topic word following Solidity's +/// indexed-parameter encoding rules. +fn encode_to_topic(ty: &DynSolType, value: &[u8]) -> Result<[u8; 32], DataFusionError> { + let mut word = [0u8; 32]; + match ty { + DynSolType::Address => { + // Addresses are 20 bytes, left-padded with 12 zero bytes. + if value.len() > 20 { + return Err(DataFusionError::Plan(format!( + "filter pushthrough: expected <=20 bytes for address, got {}", + value.len() + ))); + } + word[32 - value.len()..].copy_from_slice(value); + } + DynSolType::Bool => { + word[31] = u8::from(!value.is_empty() && value[value.len() - 1] != 0); + } + DynSolType::Uint(_) => { + // Big-endian, left-padded with zeros. + if value.len() > 32 { + return Err(DataFusionError::Plan(format!( + "filter pushthrough: value too large for uint ({}B)", + value.len() + ))); + } + word[32 - value.len()..].copy_from_slice(value); + } + DynSolType::Int(_) => { + // Big-endian, sign-extended. + if value.len() > 32 { + return Err(DataFusionError::Plan(format!( + "filter pushthrough: value too large for int ({}B)", + value.len() + ))); + } + let sign_byte = if !value.is_empty() && value[0] & 0x80 != 0 { + 0xFF + } else { + 0x00 + }; + word[..32 - value.len()].fill(sign_byte); + word[32 - value.len()..].copy_from_slice(value); + } + DynSolType::FixedBytes(n) => { + // Right-padded: value occupies the leftmost bytes. + let n = *n; + if value.len() > n { + return Err(DataFusionError::Plan(format!( + "filter pushthrough: value too large for bytes{n} ({}B)", + value.len() + ))); + } + word[..value.len()].copy_from_slice(value); + } + _ => { + return Err(DataFusionError::Plan(format!( + "filter pushthrough: unsupported type for topic encoding: {ty:?}" + ))); + } + } + Ok(word) +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use datafusion::{ + arrow::{ + array::RecordBatch, + datatypes::{DataType, Field, Schema}, + }, + common::Column, + datasource::{MemTable, provider_as_source}, + logical_expr::{ + LogicalPlan, LogicalPlanBuilder, + expr::{InList, ScalarFunction as ScalarFunctionExpr}, + }, + prelude::Expr, + scalar::ScalarValue, + }; + + use super::*; + use crate::udfs::evm_decode_log::EvmDecodeLog; + + /// Shorthand for creating an `Expr::Literal` in DF 52 (which requires metadata). + fn scalar_lit(val: ScalarValue) -> Expr { + Expr::Literal(val, None) + } + + /// Helper: build a table-scan plan with the standard log columns. + fn logs_scan() -> LogicalPlan { + let schema = Arc::new(Schema::new(vec![ + Field::new("topic0", DataType::FixedSizeBinary(32), true), + Field::new("topic1", DataType::FixedSizeBinary(32), true), + Field::new("topic2", DataType::FixedSizeBinary(32), true), + Field::new("topic3", DataType::FixedSizeBinary(32), true), + Field::new("data", DataType::Binary, true), + Field::new("address", DataType::FixedSizeBinary(20), true), + ])); + let batch = RecordBatch::new_empty(schema.clone()); + let table = MemTable::try_new(schema, vec![vec![batch]]).unwrap(); + LogicalPlanBuilder::scan("logs", provider_as_source(Arc::new(table)), None) + .unwrap() + .build() + .unwrap() + } + + /// Helper: create an `evm_decode_log(topic1, topic2, topic3, data, sig)` expression. + fn evm_decode_expr(sig: &str) -> Expr { + let udf: datafusion::logical_expr::ScalarUDF = EvmDecodeLog::new().into(); + Expr::ScalarFunction(ScalarFunctionExpr { + func: Arc::new(udf), + args: vec![ + Expr::Column(Column::new_unqualified("topic1")), + Expr::Column(Column::new_unqualified("topic2")), + Expr::Column(Column::new_unqualified("topic3")), + Expr::Column(Column::new_unqualified("data")), + Expr::Literal(ScalarValue::Utf8(Some(sig.to_string())), None), + ], + }) + } + + /// Helper: wrap `inner_expr["field_name"]` using get_field. + fn get_field_expr(inner: Expr, field_name: &str) -> Expr { + datafusion::functions::core::expr_fn::get_field( + inner, + ScalarValue::Utf8(Some(field_name.to_string())), + ) + } + + const TRANSFER_SIG: &str = "Transfer(address indexed from, address indexed to, uint256 value)"; + + // ------------------------------------------------------------------ + // Test 1: basic equality on indexed address field (to → topic2) + // ------------------------------------------------------------------ + #[test] + fn test_eq_indexed_address_to() { + let scan = logs_scan(); + let dec = evm_decode_expr(TRANSFER_SIG); + let plan = + LogicalPlanBuilder::from(scan) + .project(vec![dec.clone().alias("dec")]) + .unwrap() + .filter(get_field_expr(dec.clone(), "to").eq(scalar_lit( + ScalarValue::FixedSizeBinary(20, Some(vec![0xba; 20])), + ))) + .unwrap() + .build() + .unwrap(); + + let rewritten = filter_pushthrough_decode(plan).unwrap(); + let plan_str = format!("{rewritten}"); + + // The derived predicate should reference topic2 and contain the + // left-padded 32-byte encoding of the address. + assert!( + plan_str.contains("topic2"), + "expected topic2 in plan:\n{plan_str}" + ); + } + + // ------------------------------------------------------------------ + // Test 2: equality on first indexed parameter (from → topic1) + // ------------------------------------------------------------------ + #[test] + fn test_eq_indexed_address_from() { + let scan = logs_scan(); + let dec = evm_decode_expr(TRANSFER_SIG); + let plan = LogicalPlanBuilder::from(scan) + .project(vec![dec.clone().alias("dec")]) + .unwrap() + .filter(get_field_expr(dec.clone(), "from").eq(scalar_lit( + ScalarValue::FixedSizeBinary(20, Some(vec![0xab; 20])), + ))) + .unwrap() + .build() + .unwrap(); + + let rewritten = filter_pushthrough_decode(plan).unwrap(); + let plan_str = format!("{rewritten}"); + + assert!( + plan_str.contains("topic1"), + "expected topic1 in plan:\n{plan_str}" + ); + } + + // ------------------------------------------------------------------ + // Test 3: non-indexed field — no rewrite + // ------------------------------------------------------------------ + #[test] + fn test_non_indexed_field_no_rewrite() { + let scan = logs_scan(); + let dec = evm_decode_expr(TRANSFER_SIG); + let plan = LogicalPlanBuilder::from(scan) + .project(vec![dec.clone().alias("dec")]) + .unwrap() + .filter( + get_field_expr(dec.clone(), "value") + .gt(scalar_lit(ScalarValue::UInt64(Some(1000)))), + ) + .unwrap() + .build() + .unwrap(); + + let original_str = format!("{plan}"); + let rewritten = filter_pushthrough_decode(plan).unwrap(); + let rewritten_str = format!("{rewritten}"); + + assert_eq!( + original_str, rewritten_str, + "plan should not be modified for non-indexed field" + ); + } + + // ------------------------------------------------------------------ + // Test 4: indexed reference type (string) — no rewrite + // ------------------------------------------------------------------ + #[test] + fn test_indexed_reference_type_no_rewrite() { + let sig = "DataStored(string indexed key, uint256 value)"; + let scan = logs_scan(); + let dec = evm_decode_expr(sig); + let plan = LogicalPlanBuilder::from(scan) + .project(vec![dec.clone().alias("dec")]) + .unwrap() + .filter( + get_field_expr(dec.clone(), "key") + .eq(scalar_lit(ScalarValue::Utf8(Some("hello".to_string())))), + ) + .unwrap() + .build() + .unwrap(); + + let original_str = format!("{plan}"); + let rewritten = filter_pushthrough_decode(plan).unwrap(); + let rewritten_str = format!("{rewritten}"); + + assert_eq!( + original_str, rewritten_str, + "plan should not be modified for indexed reference type" + ); + } + + // ------------------------------------------------------------------ + // Test 5: multiple decoded-field filters (from AND to) + // ------------------------------------------------------------------ + #[test] + fn test_multiple_decoded_field_filters() { + let scan = logs_scan(); + let dec = evm_decode_expr(TRANSFER_SIG); + let plan = LogicalPlanBuilder::from(scan) + .project(vec![dec.clone().alias("dec")]) + .unwrap() + .filter( + get_field_expr(dec.clone(), "from") + .eq(scalar_lit(ScalarValue::FixedSizeBinary( + 20, + Some(vec![0xaa; 20]), + ))) + .and(get_field_expr(dec.clone(), "to").eq(scalar_lit( + ScalarValue::FixedSizeBinary(20, Some(vec![0xbb; 20])), + ))), + ) + .unwrap() + .build() + .unwrap(); + + let rewritten = filter_pushthrough_decode(plan).unwrap(); + let plan_str = format!("{rewritten}"); + + assert!( + plan_str.contains("topic1"), + "expected topic1 in plan:\n{plan_str}" + ); + assert!( + plan_str.contains("topic2"), + "expected topic2 in plan:\n{plan_str}" + ); + } + + // ------------------------------------------------------------------ + // Test 6: bool indexed parameter + // ------------------------------------------------------------------ + #[test] + fn test_bool_indexed_parameter() { + let sig = "StatusChanged(bool indexed active, uint256 value)"; + let scan = logs_scan(); + let dec = evm_decode_expr(sig); + let plan = LogicalPlanBuilder::from(scan) + .project(vec![dec.clone().alias("dec")]) + .unwrap() + .filter( + get_field_expr(dec.clone(), "active") + .eq(scalar_lit(ScalarValue::Boolean(Some(true)))), + ) + .unwrap() + .build() + .unwrap(); + + let rewritten = filter_pushthrough_decode(plan).unwrap(); + let plan_str = format!("{rewritten}"); + + // Derived predicate should encode true as ...0001. + assert!( + plan_str.contains("topic1"), + "expected topic1 in plan:\n{plan_str}" + ); + } + + // ------------------------------------------------------------------ + // Test 7: uint256 indexed parameter + // ------------------------------------------------------------------ + #[test] + fn test_uint256_indexed_parameter() { + let sig = "TokenMinted(uint256 indexed id, address to)"; + let scan = logs_scan(); + let dec = evm_decode_expr(sig); + let plan = LogicalPlanBuilder::from(scan) + .project(vec![dec.clone().alias("dec")]) + .unwrap() + .filter(get_field_expr(dec.clone(), "id").eq(scalar_lit(ScalarValue::UInt64(Some(42))))) + .unwrap() + .build() + .unwrap(); + + let rewritten = filter_pushthrough_decode(plan).unwrap(); + let plan_str = format!("{rewritten}"); + + assert!( + plan_str.contains("topic1"), + "expected topic1 in plan:\n{plan_str}" + ); + } + + // ------------------------------------------------------------------ + // Test 8: IN list + // ------------------------------------------------------------------ + #[test] + fn test_in_list() { + let scan = logs_scan(); + let dec = evm_decode_expr(TRANSFER_SIG); + let plan = LogicalPlanBuilder::from(scan) + .project(vec![dec.clone().alias("dec")]) + .unwrap() + .filter(Expr::InList(InList::new( + Box::new(get_field_expr(dec.clone(), "to")), + vec![ + scalar_lit(ScalarValue::FixedSizeBinary(20, Some(vec![0xaa; 20]))), + scalar_lit(ScalarValue::FixedSizeBinary(20, Some(vec![0xbb; 20]))), + ], + false, + ))) + .unwrap() + .build() + .unwrap(); + + let rewritten = filter_pushthrough_decode(plan).unwrap(); + let plan_str = format!("{rewritten}"); + + assert!( + plan_str.contains("topic2"), + "expected topic2 in plan:\n{plan_str}" + ); + assert!( + plan_str.contains("IN"), + "expected IN list in plan:\n{plan_str}" + ); + } + + // ------------------------------------------------------------------ + // ABI encoding unit tests + // ------------------------------------------------------------------ + #[test] + fn test_encode_address() { + let addr = vec![0xba; 20]; + let word = encode_to_topic(&DynSolType::Address, &addr).unwrap(); + assert_eq!(&word[..12], &[0u8; 12], "first 12 bytes should be zero"); + assert_eq!(&word[12..], &addr[..], "last 20 bytes should be address"); + } + + #[test] + fn test_encode_bool_true() { + let word = encode_to_topic(&DynSolType::Bool, &[1]).unwrap(); + assert_eq!(word[31], 1); + assert_eq!(&word[..31], &[0u8; 31]); + } + + #[test] + fn test_encode_bool_false() { + let word = encode_to_topic(&DynSolType::Bool, &[0]).unwrap(); + assert_eq!(word, [0u8; 32]); + } + + #[test] + fn test_encode_uint() { + // 42 = 0x2a + let word = encode_to_topic(&DynSolType::Uint(256), &[0x2a]).unwrap(); + assert_eq!(&word[..31], &[0u8; 31]); + assert_eq!(word[31], 0x2a); + } + + #[test] + fn test_encode_int_negative() { + // -1 in int8 = 0xFF + let word = encode_to_topic(&DynSolType::Int(8), &[0xFF]).unwrap(); + // Sign-extended: all bytes should be 0xFF. + assert_eq!(word, [0xFF; 32]); + } + + #[test] + fn test_encode_fixed_bytes() { + let val = vec![0xDE, 0xAD]; + let word = encode_to_topic(&DynSolType::FixedBytes(4), &val).unwrap(); + assert_eq!(&word[..2], &[0xDE, 0xAD]); + assert_eq!(&word[2..], &[0u8; 30]); + } + + // ------------------------------------------------------------------ + // Column-reference resolution through projection + // ------------------------------------------------------------------ + #[test] + fn test_column_ref_through_projection() { + let scan = logs_scan(); + let dec = evm_decode_expr(TRANSFER_SIG); + + // Build: Filter(dec['to'] = addr) -> SubqueryAlias(pc) -> Projection(evm_decode as dec) + let projection = LogicalPlanBuilder::from(scan) + .project(vec![dec.alias("dec")]) + .unwrap() + .alias("pc") + .unwrap() + .build() + .unwrap(); + + let dec_col = Expr::Column(Column::new(Some("pc"), "dec")); + let plan = LogicalPlanBuilder::from(projection) + .filter( + get_field_expr(dec_col, "to").eq(scalar_lit(ScalarValue::FixedSizeBinary( + 20, + Some(vec![0xba; 20]), + ))), + ) + .unwrap() + .build() + .unwrap(); + + let rewritten = filter_pushthrough_decode(plan).unwrap(); + let plan_str = format!("{rewritten}"); + + assert!( + plan_str.contains("topic2"), + "expected topic2 in plan:\n{plan_str}" + ); + } + + // ------------------------------------------------------------------ + // Integration test: SQL → plan → optimize, verify topic predicate + // ------------------------------------------------------------------ + #[tokio::test] + async fn test_sql_explain_plan_contains_topic_predicate() { + use datafusion::{logical_expr::ScalarUDF, prelude::SessionContext}; + + use crate::udfs::evm_decode_log::EvmDecodeLog; + + // Set up a session with the logs table and evm_decode_log UDF. + let schema = Arc::new(Schema::new(vec![ + Field::new("topic0", DataType::FixedSizeBinary(32), true), + Field::new("topic1", DataType::FixedSizeBinary(32), true), + Field::new("topic2", DataType::FixedSizeBinary(32), true), + Field::new("topic3", DataType::FixedSizeBinary(32), true), + Field::new("data", DataType::Binary, true), + Field::new("address", DataType::FixedSizeBinary(20), true), + ])); + let batch = RecordBatch::new_empty(schema.clone()); + let table = Arc::new(MemTable::try_new(schema, vec![vec![batch]]).unwrap()); + + let ctx = SessionContext::new(); + ctx.register_udf(ScalarUDF::from(EvmDecodeLog::new())); + // Also register the deprecated "evm_decode" alias. + ctx.register_udf(ScalarUDF::from(EvmDecodeLog::new().with_deprecated_name())); + ctx.register_table("logs", table).unwrap(); + + // Plan a query that filters on an indexed decoded field. + let sql = "\ + SELECT pc.dec['to'] AS to_address \ + FROM ( \ + SELECT evm_decode(topic1, topic2, topic3, data, \ + 'Transfer(address indexed from, address indexed to, uint256 value)') AS dec \ + FROM logs \ + ) pc \ + WHERE pc.dec['to'] = X'bafead7c60ea473758ed6c6021505e8bbd7e8e5d'"; + + let plan = ctx + .sql(sql) + .await + .expect("should parse SQL") + .logical_plan() + .clone(); + + // Apply the filter-pushthrough rewrite, then DF optimization. + let rewritten = filter_pushthrough_decode(plan).unwrap(); + let optimized = ctx.state().optimize(&rewritten).unwrap(); + + let plan_str = format!("{}", optimized.display_indent()); + eprintln!("optimized plan:\n{plan_str}"); + + // The derived topic2 predicate should appear in the optimized plan, + // pushed down close to the scan. + assert!( + plan_str.contains("topic2"), + "expected derived topic2 predicate in optimized plan:\n{plan_str}" + ); + } +} diff --git a/crates/core/common/src/lib.rs b/crates/core/common/src/lib.rs index ebf093a29..7eab0811a 100644 --- a/crates/core/common/src/lib.rs +++ b/crates/core/common/src/lib.rs @@ -11,6 +11,7 @@ pub mod dataset_schema_provider; pub mod datasets_cache; pub mod detached_logical_plan; pub mod exec_env; +pub mod filter_pushthrough_decode; pub mod func_catalog; pub mod incrementalizer; pub mod memory_pool; diff --git a/crates/core/common/src/udfs.rs b/crates/core/common/src/udfs.rs index 9e31c4fce..b8a7b19f3 100644 --- a/crates/core/common/src/udfs.rs +++ b/crates/core/common/src/udfs.rs @@ -15,7 +15,7 @@ use datafusion::logical_expr::ScalarUDF; pub mod block_num; pub mod eth_call; -mod evm_common; +pub(crate) mod evm_common; mod evm_decode_fast; pub mod evm_decode_hex; pub mod evm_decode_log; diff --git a/crates/core/common/src/udfs/evm_common.rs b/crates/core/common/src/udfs/evm_common.rs index d0c38ea3e..2b5270bfc 100644 --- a/crates/core/common/src/udfs/evm_common.rs +++ b/crates/core/common/src/udfs/evm_common.rs @@ -226,6 +226,22 @@ impl Event { Ok(Fields::from(fields)) } + /// Returns the names of the indexed (topic) parameters. + pub fn topic_names(&self) -> &[String] { + &self.topic_names + } + + /// Looks up a field by name among the indexed parameters. + /// + /// Returns `Some((topic_index_0based, solidity_type))` if the field is an + /// indexed parameter, `None` otherwise. + pub fn find_indexed_param(&self, field_name: &str) -> Option<(usize, &DynSolType)> { + self.topic_names + .iter() + .position(|n| n == field_name) + .map(|i| (i, &self.topic_types[i])) + } + pub fn topic_types(&self) -> &[DynSolType] { &self.topic_types } From 940c620e617b7b8f5f51582c21d75113a6603d8c Mon Sep 17 00:00:00 2001 From: Ford Date: Thu, 26 Mar 2026 20:41:18 -0700 Subject: [PATCH 2/2] feat(common): add tracing span for filter pushthrough decode step Also apply formatting --- .../core/common/src/context/session_state.rs | 5 ++++- .../common/src/filter_pushthrough_decode.rs | 18 +++++++++++++----- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/crates/core/common/src/context/session_state.rs b/crates/core/common/src/context/session_state.rs index 4770b8d7f..b1f9c01b0 100644 --- a/crates/core/common/src/context/session_state.rs +++ b/crates/core/common/src/context/session_state.rs @@ -134,7 +134,10 @@ impl SessionState { /// Mirrors [`DfSessionState::optimize`] (synchronous). #[tracing::instrument(skip_all, err)] pub fn optimize(&self, plan: &LogicalPlan) -> Result { - let plan = crate::filter_pushthrough_decode::filter_pushthrough_decode(plan.clone())?; + let plan = { + let _span = tracing::debug_span!("filter_pushthrough_decode").entered(); + crate::filter_pushthrough_decode::filter_pushthrough_decode(plan.clone())? + }; self.state.optimize(&plan) } diff --git a/crates/core/common/src/filter_pushthrough_decode.rs b/crates/core/common/src/filter_pushthrough_decode.rs index 31da4d2f7..471d08993 100644 --- a/crates/core/common/src/filter_pushthrough_decode.rs +++ b/crates/core/common/src/filter_pushthrough_decode.rs @@ -66,15 +66,17 @@ impl TreeNodeRewriter for FilterPushthroughDecode { Derived::Local(pred) => { new_predicate = new_predicate.and(pred); } - Derived::Deep { col_name, predicate } => { + Derived::Deep { + col_name, + predicate, + } => { new_input = inject_filter_below_decode_projection(new_input, &col_name, predicate); } } } - let new_filter = - LogicalPlan::Filter(Filter::try_new(new_predicate, Arc::new(new_input))?); + let new_filter = LogicalPlan::Filter(Filter::try_new(new_predicate, Arc::new(new_input))?); Ok(Transformed::yes(new_filter)) } } @@ -142,7 +144,10 @@ fn try_derive_eq( Some(match resolved.col_name { None => Derived::Local(predicate), - Some(col_name) => Derived::Deep { col_name, predicate }, + Some(col_name) => Derived::Deep { + col_name, + predicate, + }, }) } @@ -176,7 +181,10 @@ fn try_derive_in_list( Some(match resolved.col_name { None => Derived::Local(predicate), - Some(col_name) => Derived::Deep { col_name, predicate }, + Some(col_name) => Derived::Deep { + col_name, + predicate, + }, }) }