Skip to content

[SPARK-56046][SQL] Typed SPJ partition key Reducers#54884

Open
peter-toth wants to merge 11 commits intoapache:masterfrom
peter-toth:SPARK-56046-typed-spj-reducers
Open

[SPARK-56046][SQL] Typed SPJ partition key Reducers#54884
peter-toth wants to merge 11 commits intoapache:masterfrom
peter-toth:SPARK-56046-typed-spj-reducers

Conversation

@peter-toth
Copy link
Contributor

@peter-toth peter-toth commented Mar 18, 2026

What changes were proposed in this pull request?

This PR adds a new method to SPJ partition key Reducers to return the type of a reduced partition key.

Why are the changes needed?

After the SPJ refactor some Iceberg SPJ tests, that join a hours transform partitioned table with a days transform partitioned table, started to fail. This is because after the refactor the keys of a KeyedPartitioning partitioning are InternalRowComparableWrappers, which include the type of the key, and when the partition keys are reduced the type of the reduced keys are inherited from their original type.

This means that when hours transformed hour keys are reduced to days, the keys actually remain having IntegerType type, while the days transformed keys have DateType type in Iceberg. This type difference causes that the left and right side InternalRowComparableWrappers are not considered equal despite their InternalRow raw key data are equal.

Before the refactor the type of (possibly reduced) partition keys were not stored in the partitioning. When the left and right side raw keys were compared in EnsureRequirement a common comparator was initialized with the type of the left side keys.
So in the Iceberg SPJ tests the IntegerType keys were forced to be interpreted as DateType, or the DateType keys were forced to be interpreted as IntegerType, depending on the join order of the tables.
The reason why this was not causing any issues is that the PhysicalDataType of both DateType and IntegerType logical types is PhysicalIntegerType.

This PR introduces a new resultType() method of Reducer to return the correct type of the reduced keys and properly compares the left and right side reduced key types and thorws an error when they are not the same.

Does this PR introduce any user-facing change?

Yes, the reduced key types are now properly compared and incompatibilities are reported to users.

How was this patch tested?

Added new UTs.

Was this patch authored or co-authored using generative AI tooling?

No.

@peter-toth peter-toth force-pushed the SPARK-56046-typed-spj-reducers branch from 580ca49 to fa4bce7 Compare March 18, 2026 13:54
@peter-toth peter-toth marked this pull request as draft March 18, 2026 16:24
object YearsFunction extends ScalarFunction[Int] with ReducibleFunction[Int, Int] {
override def inputTypes(): Array[DataType] = Array(TimestampType)
override def resultType(): DataType = LongType
override def resultType(): DataType = IntegerType
Copy link
Contributor Author

@peter-toth peter-toth Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the test years transform to return IntegerType and the test days transform to return DateType logical types, because those 2 differ but have the same PhysicalIntegerType physical type.
I also made days reducible to years, which is very similar to what Iceberg can do with hours and days.

@peter-toth
Copy link
Contributor Author

cc @szehon-ho , @dongjoon-hyun

@peter-toth peter-toth marked this pull request as ready for review March 18, 2026 17:20
.createWithDefault(false)

val V2_BUCKETING_ALLOW_INCOMPATIBLE_TRANSFORM_TYPES =
buildConf("spark.sql.sources.v2.bucketing.allowIncompatibleTransformTypes.enabled")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we can set this this configuration false for some cases in the future, @peter-toth ? I'm a little confused when it makes senses that we are going to disallow incompatible transform types.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good question and I too was thinking about it. I feel we should not compare different logical types due to their different semantical meanings, but seemingly this is what we do currently in some cases, so we should probably keep the behavior for now. I think in a future Spark release we can change this config to make sure a comparison makes sense.

Copy link
Member

@szehon-ho szehon-ho Mar 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea im also thinking, if there is some dangerous discrepancy now , it is worth a behave change to fix it.

The only consumer that i know of is Iceberg , which has hoursToDay reducer that changes type, and bucketReducer (which doesnt change type). Iceberg will need to recompile against Spark 4.2 anyway so it's an opportunity for us to fix it there.

wdyt (as regards to the Spark release policy) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, very likely Iceberg is the only project that implemented reducers.

If we are ok with fixing the issue in Iceberg then probably we don't need the latest commit, but we can keep resultType() in Reducer, remove its default value and drop this config.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actively testing Spark 4.2.0 integration in Iceberg. The issue was only in 4.2.0-preview3 and I can work on the Iceberg changes for next preview release. +1 to drop this config.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In effect, every Iceberg release has jars for spark 4.0, 4.1, 4.2, etc.

So in effect, Iceberg 1.11 (or Iceberg 1.12) with Spark 4.2 will be a new jar, and it can start fresh (the affected Reducer will just implement the new interface). As per Iceberg release policy, old Iceberg branches (ie, 1.10 / 1.09 /etc) will never have Spark 4.2 support. So I still feel a bit overkill here, but will defer if you feel strongly

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep resultType() in Reducer, remove its default value and drop this config.

I prefer this way, the Reducer is marked as @Evolving, so I suppose such a change is acceptable.

from the connector developer's perspective, two interfaces introduce an additional understanding cost (the reducer API is already a relatively complex part compared to other DSv2 APIs). in addition, adding a method to the interface makes developers able to write an impl that compiles against Spark 4.2 and also keeps binary compatibility with Spark 4.1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, I'm fine either way. Feel free to choose. :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the implementation in b82df20.

Copy link
Member

@szehon-ho szehon-ho Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@manuzhang as you are working on Spark 4.2 snapshot on Iceberg side, let's make sure the existing Reducer implement the new method

edit: sorry, just saw your comment above

@dongjoon-hyun
Copy link
Member

cc @aokolnychyi , @cloud-fan , @gengliangwang , too.

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Mar 18, 2026

Thank you for the catching this and providing a fix promptly, @peter-toth .
I'll leave this to the other reviewers.

@gengliangwang
Copy link
Member

cc @szehon-ho as well

@szehon-ho
Copy link
Member

im taking a look, thanks

@pan3793
Copy link
Member

pan3793 commented Mar 19, 2026

Properly compares the left and right side reduced key types and return an error when they are not the same.

the previously always using the left side key type behavior is indeed problematic, but the new rule looks too strict, is it possible to follow the behavior of join key type mismatch handling?

When a join has an EqualTo(leftKey, rightKey) condition where types differ, ImplicitTypeCoercion kicks in:

  1. Calls findTightestCommonType(left.dataType, right.dataType) to find a compatible type
  2. Wraps operands in Cast expressions to coerce both to the common type

@peter-toth
Copy link
Contributor Author

peter-toth commented Mar 19, 2026

When a join has an EqualTo(leftKey, rightKey) condition where types differ, ImplicitTypeCoercion kicks in:

  1. Calls findTightestCommonType(left.dataType, right.dataType) to find a compatible type
  2. Wraps operands in Cast expressions to coerce both to the common type

I think this is a bit different issue to type coercion as the ReducibleFunctions on both sides know each other when they return the Reducers. The Reducers' responsibility to produce comparable reduced values. The only issue now is that we don't know the type of those values.

@pan3793
Copy link
Member

pan3793 commented Mar 19, 2026

The Reducers' responsibility to produce comparable reduced values.

@peter-toth, this sounds reasonable, maybe we should emphasize that in the javadocs? the = check requires exactly both value and data type match

  • ... r(f_source(x)) = f_target(x) ...
  • ... r1(f_source(x)) = r2(f_target(x)) ...

@peter-toth
Copy link
Contributor Author

The Reducers' responsibility to produce comparable reduced values.

@peter-toth, this sounds reasonable, maybe we should emphasize that in the javadocs? the = check requires exactly both value and data type match

  • ... r(f_source(x)) = f_target(x) ...
  • ... r1(f_source(x)) = r2(f_target(x)) ...

Fixed in 595d59e.

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Mar 19, 2026

+1 from my side. It would be great if we can have an item in the SQL migration guide as mentioned #54884 (comment) .

!conf.v2BucketingAllowIncompatibleTransformTypes ||
leftReducedDataTypes.map(PhysicalDataType(_)) !=
rightReducedDataTypes.map(PhysicalDataType(_)))) {
throw new SparkException("Storage-partition join partition transforms produced " +
Copy link
Member

@szehon-ho szehon-ho Mar 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any error code/class? Again feel its overkill, but maybe we should do it if we keep this approach. Also maybe we can use Reducer.displayName

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extracted in this commit: fbf2630.

* Returns the reduced keys and their data types.
*/
def reduceKeys(reducers: Seq[Option[Reducer[_, _]]]): Seq[InternalRowComparableWrapper] =
KeyedPartitioning.reduceKeys(partitionKeys, expressionDataTypes, reducers).distinct
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This .distinct was moved to mergeAndDedupPartitions().

@peter-toth
Copy link
Contributor Author

peter-toth commented Mar 20, 2026

I changed the implementation in b82df20 as we dicussed in this thread: #54884 (comment) and updated the PR description.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 once more. :)

Copy link
Member

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks! Left one comment, but feel free to merge as its quite minor

Seq(
s"testcat.ns.$items i JOIN testcat.ns.$purchases p ON p.time = i.arrive_time",
s"testcat.ns.$purchases p JOIN testcat.ns.$items i ON i.arrive_time = p.time"
).foreach { joinSting =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: joinString

Copy link
Member

@gengliangwang gengliangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clean PR — the architecture is well-designed and the type information flows correctly through all layers (Reducer → KeyedPartitioning → EnsureRequirements/GroupPartitionsExec).

override def canonicalName(): String = name()
}

// This `days` function reduces `DateType` partitions keys to `IntegerType` partitions keys when
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: "partitions keys" → "partition keys"

Suggested change
// This `days` function reduces `DateType` partitions keys to `IntegerType` partitions keys when
// This `days` function reduces `DateType` partition keys to `IntegerType` partition keys when

* @param leftPartitionKeys left side partition keys
* @param rightPartitionKeys right side partition keys
* @param joinType join type for optional partition filtering
* @keyOrdering ordering to sort partition keys
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pre-existing, but since this doc block was updated: @keyOrdering is not a valid Scaladoc tag.

Suggested change
* @keyOrdering ordering to sort partition keys
* @param keyOrdering ordering to sort partition keys

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants