[SPARK-56034][SQL] Push down Join through Union when the right side is broadcastable#54865
[SPARK-56034][SQL] Push down Join through Union when the right side is broadcastable#54865LuciferYang wants to merge 6 commits intoapache:masterfrom
Conversation
| .createWithDefault(true) | ||
|
|
||
| val PUSH_DOWN_JOIN_THROUGH_UNION_ENABLED = | ||
| buildConf("spark.sql.optimizer.pushDownJoinThroughUnion.enabled") |
There was a problem hiding this comment.
If it needs to be set to false by default, please let me know.
There was a problem hiding this comment.
+1 for true by default because this configuration is only a safe-guard for any future regression.
There was a problem hiding this comment.
According to the code, we can use spark.sql.optimizer.excludedRules instead of this, right? Is there any difference?
There was a problem hiding this comment.
Good point @dongjoon-hyun. You're right — spark.sql.optimizer.excludedRules already provides a general mechanism to disable any optimizer rule, and adding a dedicated config for each rule would lead to config proliferation. I'll remove the dedicated config spark.sql.optimizer.pushDownJoinThroughUnion.enabled and rely on excludedRules instead. Thanks for the suggestion!
|
cc @yaooqinn and @peter-toth , too. |
|
I might be missing something but I don't get this part:
Why Spark needs to shuffle the union result if the right side is small enough to be broadcasted (i.e. the original join was a broadcast join)? Is there a TPCDS plan where an exchange is removed by this PR? |
| * Sort (59) | ||
| +- Exchange (58) | ||
| +- * Project (57) | ||
| +- * SortMergeJoin Inner (56) |
There was a problem hiding this comment.
This was BroadcastHashJoin before this PR. Why do we have SortMergeJoin now?
There was a problem hiding this comment.
Great catch @peter-toth. I investigated the root cause and it turns out to be a statistics estimation degradation chain triggered by a pre-existing gap in UnionEstimation.
Root cause: UnionEstimation only propagates min/max and nullCount through Union — it does not propagate distinctCount. When PushDownJoinThroughUnion transforms the plan from Aggregate(Join(Union, date_dim)) to Aggregate(Union(Join, Join)), the d_week_seq column loses its distinctCount after passing through the new Union node, which causes AggregateEstimation CBO to fail (since hasCountStats requires both distinctCount and nullCount), falling back to SizeInBytesOnlyStatsPlanVisitor with a vastly inflated estimate.
I wrote a simplified reproduction test using TPC-DS sf100 stats to measure the actual impact:
| Metric | BEFORE (Agg(Join(Union, dd))) |
AFTER (Agg(Union(Join, Join))) |
|---|---|---|
d_week_seq distinctCount |
Some(10010) |
None (lost by Union) |
d_week_seq hasCountStats |
true |
false |
| Aggregate rowCount | Some(10010) |
None (CBO failed) |
| Aggregate sizeInBytes | 195KB | 4.1GB (~21,000x inflation) |
This inflated estimate (4.1GB) far exceeds the broadcast threshold (default 10MB), causing the top-level self-join (year-over-year comparison) to fall from BroadcastHashJoin to SortMergeJoin.
Notably, I think this UnionEstimation gap is pre-existing — any GROUP BY ... FROM (... UNION ALL ...) pattern with CBO column stats will lose distinctCount through the Union. I can try to create a case to reproduce this issue without this pr and attempt to fix it separately first
There was a problem hiding this comment.
@peter-toth I have submitted a pull request at #54883 in an attempt to optimize the issues mentioned earlier.
There was a problem hiding this comment.
I performed a rebase, and it seems that the SMJ has reverted back to a BHJ.
There was a problem hiding this comment.
The change makes sense to me, but let me check this PR thoroughly Monday.
| ) match { | ||
| case Join(_, deduped, _, _, _) => deduped | ||
| case other => | ||
| throw SparkException.internalError( |
There was a problem hiding this comment.
Any other optimization through bug-like errors?
There was a problem hiding this comment.
Thanks @yaooqinn. Yes, SparkException.internalError is used in several optimizer rules as a defensive guard for "should-never-happen" plan shapes, for example:
NestedColumnAliasing:"Unreasonable plan after optimization: $other"PushExtraPredicateThroughJoin/Optimizer:"Unexpected join type: $other"DecorrelateInnerQuery:"Unexpected domain join type $o"subquery.scala:"Unexpected plan when optimizing one row relation subquery: $o"
The dedupRight method here follows the same pattern — it guards against the (theoretically impossible) case where DeduplicateRelations changes the Join plan shape.
That said, InlineCTE uses the same "fake self-join + DeduplicateRelations" approach and simply calls .children(1) directly without any defensive check. I can align with InlineCTE and remove the explicit throw if you think that's cleaner. Alternatively, I could keep the pattern match but return the original plan unchanged in the fallback case (skipping the dedup rather than failing). Which approach would you prefer?
| hint: JoinHint): Boolean = { | ||
| canBroadcastBySize(right, conf) || | ||
| hint.rightHint.exists(_.strategy.contains(BROADCAST)) || | ||
| (joinType == Inner && hint.leftHint.exists(_.strategy.contains(BROADCAST))) |
There was a problem hiding this comment.
the right side is broadcastable
Is this out-of-scope?
There was a problem hiding this comment.
Good catch @yaooqinn. You're right — this leftHint check on line 111 is problematic and should be removed.
|
@dongjoon-hyun @yaooqinn @peter-toth Thank you for your comments. I will carefully review the issues mentioned tomorrow. |
Thanks @peter-toth, you are absolutely right. The PR description was incorrect — since the right side is already broadcastable, the original join is a |
| if conf.getConf(SQLConf.PUSH_DOWN_JOIN_THROUGH_UNION_ENABLED) && | ||
| (joinType == Inner || joinType == LeftOuter) && | ||
| joinCond.isDefined && | ||
| isBroadcastable(joinType, right, hint) && |
There was a problem hiding this comment.
In PushDownLeftSemiAntiJoin we use canPlanAsBroadcastHashJoin(), can you please check if we could use that here as well?
There was a problem hiding this comment.
I think it's feasible. Let's give it a test.
What changes were proposed in this pull request?
This PR adds a new optimizer rule
PushDownJoinThroughUnionthat transforms:into:
when the right side of the join is small enough to broadcast (by size statistics or explicit
BROADCASThints). The rule applies to Inner and LeftOuter joins.It is placed after the "Early Filter and Projection Push-Down" batch in the optimizer to ensure accurate data source statistics are available. The rule can be disabled via
spark.sql.optimizer.excludedRules.Key implementation details:
DeduplicateRelations" pattern (same asInlineCTE) to create independent copies of the right subtree with freshExprIds for each Union branch.DeduplicateRelationsmay not correctly handle correlated references when cloning.Why are the changes needed?
This is a common pattern in TPC-DS queries (e.g., q2, q5, q54, q5a) and real-world analytics workloads: a large fact table is formed by
UNION ALLof multiple sources and then joined with a small dimension table.Since the rule only fires when the right side is already broadcastable, the total probe work and output volume are the same before and after the transformation — the same rows are probed and the same rows are produced, just at a different position in the plan tree. The broadcast exchange is materialized once and shared across Union branches via
ReusedExchange.The benefit is structural: each Union branch becomes a self-contained subplan, enabling AQE to make independent per-branch adaptive decisions (e.g., coalescing partitions, custom shuffle readers) based on each branch's actual runtime data characteristics.
Does this PR introduce any user-facing change?
Query plans for affected patterns (e.g., TPC-DS q2, q5, q54, q5a) will change — the Join is pushed below the Union, and the broadcast exchange for the right side is shared across Union branches via
ReusedExchange.How was this patch tested?
PushDownJoinThroughUnionSuiteinsql/catalyst(13 test cases): verifies plan transformation for Inner/LeftOuter joins, attribute rewriting, ExprId uniqueness across Union branches, negative cases (unsupported join types, no condition, Union on right side, right side too large), and complex right-side subtrees (Filter+Project, Generate/Explode, SubqueryAlias, Aggregate).PushDownJoinThroughUnionSuiteinsql/core(6 test cases): end-to-end correctness tests including 2-way and 3-way UNION ALL with broadcast join, LeftOuter join, optimization enabled vs excluded comparison, column pruning, and predicate push-down interaction.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code