[SPARK-56046][SQL] Typed SPJ partition key Reducers#54884
[SPARK-56046][SQL] Typed SPJ partition key Reducers#54884peter-toth wants to merge 11 commits intoapache:masterfrom
Reducers#54884Conversation
580ca49 to
fa4bce7
Compare
| object YearsFunction extends ScalarFunction[Int] with ReducibleFunction[Int, Int] { | ||
| override def inputTypes(): Array[DataType] = Array(TimestampType) | ||
| override def resultType(): DataType = LongType | ||
| override def resultType(): DataType = IntegerType |
There was a problem hiding this comment.
I changed the test years transform to return IntegerType and the test days transform to return DateType logical types, because those 2 differ but have the same PhysicalIntegerType physical type.
I also made days reducible to years, which is very similar to what Iceberg can do with hours and days.
|
cc @szehon-ho , @dongjoon-hyun |
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
| .createWithDefault(false) | ||
|
|
||
| val V2_BUCKETING_ALLOW_INCOMPATIBLE_TRANSFORM_TYPES = | ||
| buildConf("spark.sql.sources.v2.bucketing.allowIncompatibleTransformTypes.enabled") |
There was a problem hiding this comment.
Do you think we can set this this configuration false for some cases in the future, @peter-toth ? I'm a little confused when it makes senses that we are going to disallow incompatible transform types.
There was a problem hiding this comment.
This is a good question and I too was thinking about it. I feel we should not compare different logical types due to their different semantical meanings, but seemingly this is what we do currently in some cases, so we should probably keep the behavior for now. I think in a future Spark release we can change this config to make sure a comparison makes sense.
There was a problem hiding this comment.
yea im also thinking, if there is some dangerous discrepancy now , it is worth a behave change to fix it.
The only consumer that i know of is Iceberg , which has hoursToDay reducer that changes type, and bucketReducer (which doesnt change type). Iceberg will need to recompile against Spark 4.2 anyway so it's an opportunity for us to fix it there.
wdyt (as regards to the Spark release policy) ?
There was a problem hiding this comment.
Yeah, very likely Iceberg is the only project that implemented reducers.
If we are ok with fixing the issue in Iceberg then probably we don't need the latest commit, but we can keep resultType() in Reducer, remove its default value and drop this config.
There was a problem hiding this comment.
I'm actively testing Spark 4.2.0 integration in Iceberg. The issue was only in 4.2.0-preview3 and I can work on the Iceberg changes for next preview release. +1 to drop this config.
There was a problem hiding this comment.
In effect, every Iceberg release has jars for spark 4.0, 4.1, 4.2, etc.
So in effect, Iceberg 1.11 (or Iceberg 1.12) with Spark 4.2 will be a new jar, and it can start fresh (the affected Reducer will just implement the new interface). As per Iceberg release policy, old Iceberg branches (ie, 1.10 / 1.09 /etc) will never have Spark 4.2 support. So I still feel a bit overkill here, but will defer if you feel strongly
There was a problem hiding this comment.
keep resultType() in Reducer, remove its default value and drop this config.
I prefer this way, the Reducer is marked as @Evolving, so I suppose such a change is acceptable.
from the connector developer's perspective, two interfaces introduce an additional understanding cost (the reducer API is already a relatively complex part compared to other DSv2 APIs). in addition, adding a method to the interface makes developers able to write an impl that compiles against Spark 4.2 and also keeps binary compatibility with Spark 4.1
There was a problem hiding this comment.
In that case, I'm fine either way. Feel free to choose. :)
There was a problem hiding this comment.
@manuzhang as you are working on Spark 4.2 snapshot on Iceberg side, let's make sure the existing Reducer implement the new method
edit: sorry, just saw your comment above
|
cc @aokolnychyi , @cloud-fan , @gengliangwang , too. |
...ore/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala
Show resolved
Hide resolved
|
Thank you for the catching this and providing a fix promptly, @peter-toth . |
|
cc @szehon-ho as well |
|
im taking a look, thanks |
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java
Outdated
Show resolved
Hide resolved
the previously always using the left side key type behavior is indeed problematic, but the new rule looks too strict, is it possible to follow the behavior of join key type mismatch handling? When a join has an
|
I think this is a bit different issue to type coercion as the |
@peter-toth, this sounds reasonable, maybe we should emphasize that in the javadocs? the
|
Fixed in 595d59e. |
|
+1 from my side. |
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java
Outdated
Show resolved
Hide resolved
| !conf.v2BucketingAllowIncompatibleTransformTypes || | ||
| leftReducedDataTypes.map(PhysicalDataType(_)) != | ||
| rightReducedDataTypes.map(PhysicalDataType(_)))) { | ||
| throw new SparkException("Storage-partition join partition transforms produced " + |
There was a problem hiding this comment.
is there any error code/class? Again feel its overkill, but maybe we should do it if we keep this approach. Also maybe we can use Reducer.displayName
...ore/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala
Outdated
Show resolved
Hide resolved
| * Returns the reduced keys and their data types. | ||
| */ | ||
| def reduceKeys(reducers: Seq[Option[Reducer[_, _]]]): Seq[InternalRowComparableWrapper] = | ||
| KeyedPartitioning.reduceKeys(partitionKeys, expressionDataTypes, reducers).distinct |
There was a problem hiding this comment.
This .distinct was moved to mergeAndDedupPartitions().
|
I changed the implementation in b82df20 as we dicussed in this thread: #54884 (comment) and updated the PR description. |
szehon-ho
left a comment
There was a problem hiding this comment.
LGTM, thanks! Left one comment, but feel free to merge as its quite minor
| Seq( | ||
| s"testcat.ns.$items i JOIN testcat.ns.$purchases p ON p.time = i.arrive_time", | ||
| s"testcat.ns.$purchases p JOIN testcat.ns.$items i ON i.arrive_time = p.time" | ||
| ).foreach { joinSting => |
gengliangwang
left a comment
There was a problem hiding this comment.
Clean PR — the architecture is well-designed and the type information flows correctly through all layers (Reducer → KeyedPartitioning → EnsureRequirements/GroupPartitionsExec).
| override def canonicalName(): String = name() | ||
| } | ||
|
|
||
| // This `days` function reduces `DateType` partitions keys to `IntegerType` partitions keys when |
There was a problem hiding this comment.
Typo: "partitions keys" → "partition keys"
| // This `days` function reduces `DateType` partitions keys to `IntegerType` partitions keys when | |
| // This `days` function reduces `DateType` partition keys to `IntegerType` partition keys when |
| * @param leftPartitionKeys left side partition keys | ||
| * @param rightPartitionKeys right side partition keys | ||
| * @param joinType join type for optional partition filtering | ||
| * @keyOrdering ordering to sort partition keys |
There was a problem hiding this comment.
Pre-existing, but since this doc block was updated: @keyOrdering is not a valid Scaladoc tag.
| * @keyOrdering ordering to sort partition keys | |
| * @param keyOrdering ordering to sort partition keys |
What changes were proposed in this pull request?
This PR adds a new method to SPJ partition key
Reducers to return the type of a reduced partition key.Why are the changes needed?
After the SPJ refactor some Iceberg SPJ tests, that join a
hourstransform partitioned table with adaystransform partitioned table, started to fail. This is because after the refactor the keys of aKeyedPartitioningpartitioning areInternalRowComparableWrappers, which include the type of the key, and when the partition keys are reduced the type of the reduced keys are inherited from their original type.KeyGroupedPartitioningand Storage Partition Join #54330This means that when
hourstransformed hour keys are reduced to days, the keys actually remain havingIntegerTypetype, while thedaystransformed keys haveDateTypetype in Iceberg. This type difference causes that the left and right sideInternalRowComparableWrappers are not considered equal despite theirInternalRowraw key data are equal.Before the refactor the type of (possibly reduced) partition keys were not stored in the partitioning. When the left and right side raw keys were compared in
EnsureRequirementa common comparator was initialized with the type of the left side keys.So in the Iceberg SPJ tests the
IntegerTypekeys were forced to be interpreted asDateType, or theDateTypekeys were forced to be interpreted asIntegerType, depending on the join order of the tables.The reason why this was not causing any issues is that the
PhysicalDataTypeof bothDateTypeandIntegerTypelogical types isPhysicalIntegerType.This PR introduces a new
resultType()method ofReducerto return the correct type of the reduced keys and properly compares the left and right side reduced key types and thorws an error when they are not the same.Does this PR introduce any user-facing change?
Yes, the reduced key types are now properly compared and incompatibilities are reported to users.
How was this patch tested?
Added new UTs.
Was this patch authored or co-authored using generative AI tooling?
No.