From fa4bce72fa1c023caf8d8f02677ef77741572fa1 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Wed, 18 Mar 2026 12:31:16 +0100 Subject: [PATCH 1/9] [SPARK-56046][SQL] Typed SPJ partition key reducers --- .../connector/catalog/functions/Reducer.java | 9 + .../plans/physical/partitioning.scala | 30 +++- .../apache/spark/sql/internal/SQLConf.scala | 11 ++ .../connector/catalog/InMemoryBaseTable.scala | 6 +- .../datasources/v2/GroupPartitionsExec.scala | 12 +- .../exchange/EnsureRequirements.scala | 64 ++++++- .../KeyGroupedPartitioningSuite.scala | 164 +++++++++++++++++- .../functions/transformFunctions.scala | 121 +++++++++++-- 8 files changed, 384 insertions(+), 33 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java index 561d66092d641..6772b454efc7f 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java @@ -17,6 +17,7 @@ package org.apache.spark.sql.connector.catalog.functions; import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.types.DataType; /** * A 'reducer' for output of user-defined functions. @@ -39,4 +40,12 @@ @Evolving public interface Reducer { O reduce(I arg); + + /** + * Returns the {@link DataType data type} of values produced by this function. + * It can return null to signal it doesn't change the input type. + * + * @return a data type for values produced by this function. + */ + default DataType resultType() { return null; } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 28a9225b6ce23..7d1c80e3f903e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -464,11 +464,15 @@ case class KeyedPartitioning( KeyedPartitioning.projectKeys(partitionKeys, expressionDataTypes, positions) /** - * Reduces this partitioning's partition keys by applying the given reducers. + * Reduces this partitioning's partition keys by applying the given reducers and use the provided + * types for comparison. * Returns the distinct reduced keys. */ - def reduceKeys(reducers: Seq[Option[Reducer[_, _]]]): Seq[InternalRowComparableWrapper] = - KeyedPartitioning.reduceKeys(partitionKeys, expressionDataTypes, reducers).distinct + def reduceKeys( + reducers: Seq[Option[Reducer[_, _]]], + reducedDataTypes: Seq[DataType]): Seq[InternalRowComparableWrapper] = + KeyedPartitioning.reduceKeys(partitionKeys, expressionDataTypes, reducers, reducedDataTypes) + .distinct override def satisfies0(required: Distribution): Boolean = { nonGroupedSatisfies(required) || groupedSatisfies(required) @@ -581,14 +585,28 @@ object KeyedPartitioning { } /** - * Reduces a sequence of partition keys by applying reducers to each position. + * Reduces a sequence of data types by applying reducers to each position. + */ + def reduceTypes( + dataTypes: Seq[DataType], + reducers: Seq[Option[Reducer[_, _]]]): Seq[DataType] = { + dataTypes.zip(reducers).map { + case (t, Some(reducer: Reducer[Any, Any])) => Option(reducer.resultType()).getOrElse(t) + case (t, _) => t + } + } + + /** + * Reduces a sequence of partition keys by applying reducers to each position and using the + * provided types for comparison. */ def reduceKeys( keys: Seq[InternalRowComparableWrapper], dataTypes: Seq[DataType], - reducers: Seq[Option[Reducer[_, _]]]): Seq[InternalRowComparableWrapper] = { + reducers: Seq[Option[Reducer[_, _]]], + reducedDataTypes: Seq[DataType]): Seq[InternalRowComparableWrapper] = { val comparableKeyWrapperFactory = - InternalRowComparableWrapper.getInternalRowComparableWrapperFactory(dataTypes) + InternalRowComparableWrapper.getInternalRowComparableWrapperFactory(reducedDataTypes) keys.map { key => val keyValues = key.row.toSeq(dataTypes) val reducedKey = keyValues.zip(reducers).map { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index fbb67c6d44dcf..5cf833d4cdf9d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2133,6 +2133,14 @@ object SQLConf { .booleanConf .createWithDefault(false) + val V2_BUCKETING_ALLOW_INCOMPATIBLE_TRANSFORM_TYPES = + buildConf("spark.sql.legacy.allowIncompatibleTransformTypes.enabled") + .doc("Whether to allow storage-partition join where the partition transforms produce " + + "incompatible reduced types and use the left partition key type for comparison.") + .version("4.2.0") + .booleanConf + .createWithDefault(true) + val V2_BUCKETING_PARTITION_FILTER_ENABLED = buildConf("spark.sql.sources.v2.bucketing.partition.filter.enabled") .doc(s"Whether to filter partitions when running storage-partition join. " + @@ -7692,6 +7700,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def v2BucketingAllowCompatibleTransforms: Boolean = getConf(SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS) + def v2BucketingAllowIncompatibleTransformTypes: Boolean = + getConf(SQLConf.V2_BUCKETING_ALLOW_INCOMPATIBLE_TRANSFORM_TYPES) + def v2BucketingAllowSorting: Boolean = getConf(SQLConf.V2_BUCKETING_SORTING_ENABLED) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala index 407d592f82199..e7762565f47ec 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala @@ -203,10 +203,10 @@ abstract class InMemoryBaseTable( case YearsTransform(ref) => extractor(ref.fieldNames, cleanedSchema, row) match { case (days: Int, DateType) => - ChronoUnit.YEARS.between(EPOCH_LOCAL_DATE, DateTimeUtils.daysToLocalDate(days)) + ChronoUnit.YEARS.between(EPOCH_LOCAL_DATE, DateTimeUtils.daysToLocalDate(days)).toInt case (micros: Long, TimestampType) => val localDate = DateTimeUtils.microsToInstant(micros).atZone(UTC).toLocalDate - ChronoUnit.YEARS.between(EPOCH_LOCAL_DATE, localDate) + ChronoUnit.YEARS.between(EPOCH_LOCAL_DATE, localDate).toInt case (v, t) => throw new IllegalArgumentException(s"Match: unsupported argument(s) type - ($v, $t)") } @@ -225,7 +225,7 @@ abstract class InMemoryBaseTable( case (days, DateType) => days case (micros: Long, TimestampType) => - ChronoUnit.DAYS.between(Instant.EPOCH, DateTimeUtils.microsToInstant(micros)) + ChronoUnit.DAYS.between(Instant.EPOCH, DateTimeUtils.microsToInstant(micros)).toInt case (v, t) => throw new IllegalArgumentException(s"Match: unsupported argument(s) type - ($v, $t)") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala index d5260cad9c6ec..2a3d7596219e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala @@ -141,15 +141,21 @@ case class GroupPartitionsExec( )(keyedPartitioning.projectKeys) // Reduce keys if reducers are specified - val reducedKeys = reducers.fold(projectedKeys)( - KeyedPartitioning.reduceKeys(projectedKeys, projectedDataTypes, _)) + val (reducedDataTypes, reducedKeys) = reducers match { + case Some(reducers) => + val reducedDataTypes = KeyedPartitioning.reduceTypes(projectedDataTypes, reducers) + val reducedKeys = KeyedPartitioning.reduceKeys(projectedKeys, projectedDataTypes, reducers, + reducedDataTypes) + (reducedDataTypes, reducedKeys) + case _ => (projectedDataTypes, projectedKeys) + } val keyToPartitionIndices = reducedKeys.zipWithIndex.groupMap(_._1)(_._2) if (expectedPartitionKeys.isDefined) { alignToExpectedKeys(keyToPartitionIndices) } else { - (groupAndSortByKeys(keyToPartitionIndices, projectedDataTypes), true) + (groupAndSortByKeys(keyToPartitionIndices, reducedDataTypes), true) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index cca37558584f0..22da36842b0c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -20,17 +20,20 @@ package org.apache.spark.sql.execution.exchange import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import org.apache.spark.SparkException import org.apache.spark.internal.{LogKeys} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.types.PhysicalDataType import org.apache.spark.sql.catalyst.util.InternalRowComparableWrapper import org.apache.spark.sql.connector.catalog.functions.Reducer import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources.v2.GroupPartitionsExec import org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.DataType /** * Ensures that the [[org.apache.spark.sql.catalyst.plans.physical.Partitioning Partitioning]] @@ -509,11 +512,27 @@ case class EnsureRequirements( // in case of compatible but not identical partition expressions, we apply 'reduce' // transforms to group one side's partitions as well as the common partition values val leftReducers = leftSpec.reducers(rightSpec) - val leftReducedKeys = - leftReducers.fold(leftPartitioning.partitionKeys)(leftPartitioning.reduceKeys) val rightReducers = rightSpec.reducers(leftSpec) - val rightReducedKeys = - rightReducers.fold(rightPartitioning.partitionKeys)(rightPartitioning.reduceKeys) + val leftReducedDataTypes = leftReducers.fold(leftPartitioning.expressionDataTypes)( + KeyedPartitioning.reduceTypes(leftPartitioning.expressionDataTypes, _)) + val rightReducedDataTypes = rightReducers.fold(rightPartitioning.expressionDataTypes)( + KeyedPartitioning.reduceTypes(rightPartitioning.expressionDataTypes, _)) + if (leftReducedDataTypes != rightReducedDataTypes && ( + !conf.v2BucketingAllowIncompatibleTransformTypes || + leftReducedDataTypes.map(PhysicalDataType(_)) != + rightReducedDataTypes.map(PhysicalDataType(_)))) { + throw new SparkException("Storage-partition join partition transforms produced " + + s"incompatible reduced types, left: $leftReducedDataTypes, right: " + + s"$rightReducedDataTypes") + } + val commonDataTypes = leftReducedDataTypes + val leftReducedKeys = leftReducers.fold(leftPartitioning.partitionKeys)( + leftPartitioning.reduceKeys(_, commonDataTypes)) + // As we use left side reduced types as common types for comparison, the right side + // partitions keys might need a new comparable wrapper (depending on the legacy flag) + val rightReducedKeys = rightReducers.fold( + rewrapKeys(rightPartitioning.partitionKeys, rightReducedDataTypes, commonDataTypes))( + rightPartitioning.reduceKeys(_, commonDataTypes)) // merge values on both sides var mergedPartitionKeys = @@ -628,10 +647,17 @@ case class EnsureRequirements( } } + val leftMergedPartitionKeys = mergedPartitionKeys + // As we used left side reduced types as common types for comparison, the merged partition + // keys that we push doww to the right side might need a new comparable wrapper (depending + // on the legacy flag) + val rightMergedPartitionKeys = + rewrapKeyMap(mergedPartitionKeys, commonDataTypes, rightReducedDataTypes) + // Now we need to push-down the common partition information to the `GroupPartitionsExec`s. - newLeft = applyGroupPartitions(left, leftSpec.joinKeyPositions, mergedPartitionKeys, + newLeft = applyGroupPartitions(left, leftSpec.joinKeyPositions, leftMergedPartitionKeys, leftReducers, distributePartitions = applyPartialClustering && !replicateLeftSide) - newRight = applyGroupPartitions(right, rightSpec.joinKeyPositions, mergedPartitionKeys, + newRight = applyGroupPartitions(right, rightSpec.joinKeyPositions, rightMergedPartitionKeys, rightReducers, distributePartitions = applyPartialClustering && !replicateRightSide) } } @@ -656,6 +682,32 @@ case class EnsureRequirements( } } + private def rewrapKeys( + keys: Seq[InternalRowComparableWrapper], + currentDataTypes: Seq[DataType], + expectedDataType: Seq[DataType]) = { + if (currentDataTypes != expectedDataType) { + val comparableKeyWrapperFactory = + InternalRowComparableWrapper.getInternalRowComparableWrapperFactory(expectedDataType) + keys.map(key => comparableKeyWrapperFactory(key.row)) + } else { + keys + } + } + + private def rewrapKeyMap( + keyMap: Seq[(InternalRowComparableWrapper, Int)], + currentDataTypes: Seq[DataType], + expectedDataType: Seq[DataType]) = { + if (currentDataTypes != expectedDataType) { + val comparableKeyWrapperFactory = + InternalRowComparableWrapper.getInternalRowComparableWrapperFactory(expectedDataType) + keyMap.map { case (key, numParts) => (comparableKeyWrapperFactory(key.row), numParts) } + } else { + keyMap + } + } + // Similar to `OptimizeSkewedJoin.canSplitRightSide` private def canReplicateLeftSide(joinType: JoinType): Boolean = { joinType == Inner || joinType == Cross || joinType == RightOuter diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala index f2eb02e03846e..e26ae8d7d7035 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.connector import java.sql.Timestamp import java.util.Collections -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.{DataFrame, ExplainSuiteHelper, Row} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Literal, TransformExpression} @@ -75,6 +75,20 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with Column.create("dept_id", IntegerType), Column.create("data", StringType)) + def withFunction[T](fn: UnboundFunction)(f: => T): T = { + val id = Identifier.of(Array.empty, fn.name()) + val oldFn = Option.when(catalog.listFunctions(Array.empty).contains(id)) { + val fn = catalog.loadFunction(id) + catalog.dropFunction(id) + fn + } + catalog.createFunction(id, fn) + try f finally { + catalog.dropFunction(id) + oldFn.foreach(catalog.createFunction(id, _)) + } + } + test("clustered distribution: output partitioning should be KeyedPartitioning") { val partitions: Array[Transform] = Array(Expressions.years("ts")) @@ -88,7 +102,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with var df = sql(s"SELECT count(*) FROM testcat.ns.$table GROUP BY ts") val catalystDistribution = physical.ClusteredDistribution( Seq(TransformExpression(YearsFunction, Seq(attr("ts"))))) - val partitionKeys = Seq(50L, 51L, 52L).map(v => InternalRow.fromSeq(Seq(v))) + val partitionKeys = Seq(50, 51, 52).map(v => InternalRow.fromSeq(Seq(v))) checkQueryPlan(df, catalystDistribution, physical.KeyedPartitioning(catalystDistribution.clustering, partitionKeys)) @@ -3242,4 +3256,150 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with "ExpectedPartitionKeys: 2 Reducers: 1 DistributePartitions: false") } } + + test("SPARK-56046: Reducers with same logical result types") { + val items_partitions = Array(days("arrive_time")) + createTable(items, itemsColumns, items_partitions) + sql(s"INSERT INTO testcat.ns.$items VALUES " + + s"(0, 'aa', 39.0, cast('2020-01-01' as timestamp)), " + + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + + s"(2, 'bb', 41.0, cast('2021-01-03' as timestamp)), " + + s"(3, 'bb', 42.0, cast('2021-01-04' as timestamp))") + + val purchases_partitions = Array(years("time")) + createTable(purchases, purchasesColumns, purchases_partitions) + sql(s"INSERT INTO testcat.ns.$purchases VALUES " + + s"(1, 42.0, cast('2020-01-01' as timestamp)), " + + s"(5, 44.0, cast('2020-01-15' as timestamp)), " + + s"(7, 46.5, cast('2021-02-08' as timestamp))") + + Seq(true, false).foreach { allowIncompatibleTransformTypes => + withSQLConf( + SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true", + SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS.key -> "true", + SQLConf.V2_BUCKETING_ALLOW_INCOMPATIBLE_TRANSFORM_TYPES.key -> + allowIncompatibleTransformTypes.toString) { + Seq( + s"testcat.ns.$items i JOIN testcat.ns.$purchases p ON p.time = i.arrive_time", + s"testcat.ns.$purchases p JOIN testcat.ns.$items i ON i.arrive_time = p.time" + ).foreach { joinSting => + val df = sql( + s""" + |${selectWithMergeJoinHint("i", "p")} id, item_id + |FROM $joinSting + |ORDER BY id, item_id + |""".stripMargin) + + val shuffles = collectShuffles(df.queryExecution.executedPlan) + assert(shuffles.isEmpty, "should not add shuffle for both sides of the join") + val groupPartitions = collectGroupPartitions(df.queryExecution.executedPlan) + assert(groupPartitions.forall(_.outputPartitioning.numPartitions == 2)) + + checkAnswer(df, Seq(Row(0, 1), Row(1, 1))) + } + } + } + } + + test("SPARK-56046: Reducers with different logical but compatible physical result types") { + withFunction(UnboundDaysFunctionWithCompatiblePhysicalTypeReducer) { + val items_partitions = Array(days("arrive_time")) + createTable(items, itemsColumns, items_partitions) + sql(s"INSERT INTO testcat.ns.$items VALUES " + + s"(0, 'aa', 39.0, cast('2020-01-01' as timestamp)), " + + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + + s"(2, 'bb', 41.0, cast('2021-01-03' as timestamp)), " + + s"(3, 'bb', 42.0, cast('2021-01-04' as timestamp))") + + val purchases_partitions = Array(years("time")) + createTable(purchases, purchasesColumns, purchases_partitions) + sql(s"INSERT INTO testcat.ns.$purchases VALUES " + + s"(1, 42.0, cast('2020-01-01' as timestamp)), " + + s"(5, 44.0, cast('2020-01-15' as timestamp)), " + + s"(7, 46.5, cast('2021-02-08' as timestamp))") + + withSQLConf( + SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true", + SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS.key -> "true") { + Seq( + s"testcat.ns.$items i JOIN testcat.ns.$purchases p ON p.time = i.arrive_time", + s"testcat.ns.$purchases p JOIN testcat.ns.$items i ON i.arrive_time = p.time" + ).foreach { joinSting => + val df = sql( + s""" + |${selectWithMergeJoinHint("i", "p")} id, item_id + |FROM $joinSting + |ORDER BY id, item_id + |""".stripMargin) + + val shuffles = collectShuffles(df.queryExecution.executedPlan) + assert(shuffles.isEmpty, "should not add shuffle for both sides of the join") + val groupPartitions = collectGroupPartitions(df.queryExecution.executedPlan) + assert(groupPartitions.forall(_.outputPartitioning.numPartitions == 2)) + + checkAnswer(df, Seq(Row(0, 1), Row(1, 1))) + + withSQLConf(SQLConf.V2_BUCKETING_ALLOW_INCOMPATIBLE_TRANSFORM_TYPES.key -> "false") { + val e = intercept[SparkException] { + val df = sql( + s""" + |${selectWithMergeJoinHint("i", "p")} id, item_id + |FROM $joinSting + |ORDER BY id, item_id + |""".stripMargin) + + df.collect() + } + assert(e.getMessage.startsWith( + "Storage-partition join partition transforms produced incompatible reduced types")) + } + } + } + } + } + + test("SPARK-56046: Reducers with different logical and incompatible physical result types") { + withFunction(UnboundDaysFunctionWithIncompatiblePhysicalTypeReducer) { + val items_partitions = Array(days("arrive_time")) + createTable(items, itemsColumns, items_partitions) + sql(s"INSERT INTO testcat.ns.$items VALUES " + + s"(0, 'aa', 39.0, cast('2020-01-01' as timestamp)), " + + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + + s"(2, 'bb', 41.0, cast('2021-01-03' as timestamp)), " + + s"(3, 'bb', 42.0, cast('2021-01-04' as timestamp))") + + val purchases_partitions = Array(years("time")) + createTable(purchases, purchasesColumns, purchases_partitions) + sql(s"INSERT INTO testcat.ns.$purchases VALUES " + + s"(1, 42.0, cast('2020-01-01' as timestamp)), " + + s"(5, 44.0, cast('2020-01-15' as timestamp)), " + + s"(7, 46.5, cast('2021-02-08' as timestamp))") + + Seq(true, false).foreach { allowIncompatibleTransformTypes => + withSQLConf( + SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true", + SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS.key -> "true", + SQLConf.V2_BUCKETING_ALLOW_INCOMPATIBLE_TRANSFORM_TYPES.key -> + allowIncompatibleTransformTypes.toString) { + Seq( + s"testcat.ns.$items i JOIN testcat.ns.$purchases p ON p.time = i.arrive_time", + s"testcat.ns.$purchases p JOIN testcat.ns.$items i ON i.arrive_time = p.time" + ).foreach { joinSting => + val e = intercept[SparkException] { + val df = sql( + s""" + |${selectWithMergeJoinHint("i", "p")} id, item_id + |FROM $joinSting + |ORDER BY id, item_id + |""".stripMargin) + + df.collect() + } + assert(e.getMessage.startsWith( + "Storage-partition join partition transforms produced incompatible reduced types")) + } + } + } + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala index ed2f81d7e8d6f..8d592b16df004 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala @@ -40,42 +40,137 @@ object UnboundYearsFunction extends UnboundFunction { override def name(): String = "years" } -object YearsFunction extends ScalarFunction[Long] { +object YearsFunction extends ScalarFunction[Int] with ReducibleFunction[Int, Int] { override def inputTypes(): Array[DataType] = Array(TimestampType) - override def resultType(): DataType = LongType + override def resultType(): DataType = IntegerType override def name(): String = "years" override def canonicalName(): String = name() val UTC: ZoneId = ZoneId.of("UTC") val EPOCH_LOCAL_DATE: LocalDate = Instant.EPOCH.atZone(UTC).toLocalDate - def invoke(ts: Long): Long = { + def invoke(ts: Long): Int = { val localDate = DateTimeUtils.microsToInstant(ts).atZone(UTC).toLocalDate - ChronoUnit.YEARS.between(EPOCH_LOCAL_DATE, localDate) + ChronoUnit.YEARS.between(EPOCH_LOCAL_DATE, localDate).toInt } + + override def reducer(otherFunction: ReducibleFunction[_, _]): Reducer[Int, Int] = null } -object DaysFunction extends BoundFunction { - override def inputTypes(): Array[DataType] = Array(TimestampType) - override def resultType(): DataType = LongType +abstract class UnboundDaysFunctionBase extends UnboundFunction { + protected def isValidType(dt: DataType): Boolean = dt match { + case DateType | TimestampType => true + case _ => false + } + + override def description(): String = name() override def name(): String = "days" - override def canonicalName(): String = name() } -object UnboundDaysFunction extends UnboundFunction { +object UnboundDaysFunction extends UnboundDaysFunctionBase { override def bind(inputType: StructType): BoundFunction = { if (inputType.size == 1 && isValidType(inputType.head.dataType)) DaysFunction else throw new UnsupportedOperationException( "'days' only take date or timestamp as input type") } +} - private def isValidType(dt: DataType): Boolean = dt match { - case DateType | TimestampType => true - case _ => false +object UnboundDaysFunctionWithCompatiblePhysicalTypeReducer extends UnboundDaysFunctionBase { + override def bind(inputType: StructType): BoundFunction = { + if (inputType.size == 1 && isValidType(inputType.head.dataType)) { + DaysFunctionWithCompatiblePhysicalTypeReducer + } else throw new UnsupportedOperationException( + "'days' only take date or timestamp as input type") } +} - override def description(): String = name() +object UnboundDaysFunctionWithIncompatiblePhysicalTypeReducer extends UnboundDaysFunctionBase { + override def bind(inputType: StructType): BoundFunction = { + if (inputType.size == 1 && isValidType(inputType.head.dataType)) { + DaysFunctionWithIncompatiblePhysicalTypeReducer + } else throw new UnsupportedOperationException( + "'days' only take date or timestamp as input type") + } +} + +abstract class DaysFunctionBase extends ScalarFunction[Int] { + override def inputTypes(): Array[DataType] = Array(TimestampType) + override def resultType(): DataType = DateType override def name(): String = "days" + override def canonicalName(): String = name() +} + +// This `days` function reduces `DateType` partitions keys to `IntegerType` partitions keys when +// partitions are reduced to partitions of a `years` function, which produces `IntegerType` keys. +object DaysFunction extends DaysFunctionBase with ReducibleFunction[Int, Int] { + override def reducer(otherFunc: ReducibleFunction[_, _]): Reducer[Int, Int] = { + if (otherFunc == YearsFunction) { + DaysToYearsReducer() + } else { + null + } + } +} + +// This `days` function reduces `DateType` partitions keys to `DateType` partitions keys when +// partitions are reduced to partitions of a `years` function, which produces `IntegerType` keys. +// `DateType` and `IntegerType` share the same `PhysicalDataType`. +object DaysFunctionWithCompatiblePhysicalTypeReducer + extends DaysFunctionBase with ReducibleFunction[Int, Int] { + override def reducer(otherFunc: ReducibleFunction[_, _]): Reducer[Int, Int] = { + if (otherFunc == YearsFunction) { + DaysToYearsReducerWithCompatiblePhysicalType() + } else { + null + } + } +} + +// This `days` function reduces `DateType` partitions keys to `LongType` partitions keys when +// partitions are reduced to partitions of a `years` function, which produces `IntegerType` keys. +// `LongType` and `IntegerType` have different `PhysicalDataType`s. +object DaysFunctionWithIncompatiblePhysicalTypeReducer + extends DaysFunctionBase with ReducibleFunction[Int, Long] { + override def reducer(otherFunc: ReducibleFunction[_, _]): Reducer[Int, Long] = { + if (otherFunc == YearsFunction) { + DaysToYearsReducerWithIncompablePhysicalType() + } else { + null + } + } +} + +abstract class DaysToYearsReducerBase { + val UTC: ZoneId = ZoneId.of("UTC") + val EPOCH_LOCAL_DATE: LocalDate = Instant.EPOCH.atZone(UTC).toLocalDate +} + +case class DaysToYearsReducer() extends DaysToYearsReducerBase with Reducer[Int, Int] { + override def reduce(days: Int): Int = { + val localDate = EPOCH_LOCAL_DATE.plusDays(days) + ChronoUnit.YEARS.between(EPOCH_LOCAL_DATE, localDate).toInt + } + + override def resultType(): DataType = IntegerType +} + +// No `resultType()` override means that the reduced type is the original `DateType`. +case class DaysToYearsReducerWithCompatiblePhysicalType() + extends DaysToYearsReducerBase with Reducer[Int, Int] { + override def reduce(days: Int): Int = { + val localDate = EPOCH_LOCAL_DATE.plusDays(days) + ChronoUnit.YEARS.between(EPOCH_LOCAL_DATE, localDate).toInt + } +} + +case class DaysToYearsReducerWithIncompablePhysicalType() + extends DaysToYearsReducerBase with Reducer[Int, Long] { + override def reduce(days: Int): Long = { + val localDate = EPOCH_LOCAL_DATE.plusDays(days) + ChronoUnit.YEARS.between(EPOCH_LOCAL_DATE, localDate) + } + + override def resultType(): DataType = LongType } object UnboundBucketFunction extends UnboundFunction { From 494b92393519631b92f00a2e55a4fb54aeebad2c Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Wed, 18 Mar 2026 17:32:55 +0100 Subject: [PATCH 2/9] fix config --- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 5cf833d4cdf9d..11fb4b994d00d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2124,7 +2124,7 @@ object SQLConf { val V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS = buildConf("spark.sql.sources.v2.bucketing.allowCompatibleTransforms.enabled") .doc("Whether to allow storage-partition join in the case where the partition transforms " + - "are compatible but not identical. This config requires both " + + "are compatible but not identical. This config requires both " + s"${V2_BUCKETING_ENABLED.key} and ${V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key} to be " + s"enabled and ${V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key} " + "to be disabled." @@ -2134,10 +2134,13 @@ object SQLConf { .createWithDefault(false) val V2_BUCKETING_ALLOW_INCOMPATIBLE_TRANSFORM_TYPES = - buildConf("spark.sql.legacy.allowIncompatibleTransformTypes.enabled") - .doc("Whether to allow storage-partition join where the partition transforms produce " + - "incompatible reduced types and use the left partition key type for comparison.") + buildConf("spark.sql.sources.v2.bucketing.allowIncompatibleTransformTypes.enabled") + .doc("Whether to allow storage-partition join where the left and right partition " + + "transforms are reduced to differing logical types and in that case use the left reduced " + + "logical types for comparison. This config requires " + + s"${V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS.key} to be enabled.") .version("4.2.0") + .withBindingPolicy(ConfigBindingPolicy.SESSION) .booleanConf .createWithDefault(true) From e31b3619243c6f1d383b82898aca1729d6cf350e Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Wed, 18 Mar 2026 18:03:52 +0100 Subject: [PATCH 3/9] fix expected ordering type of years transform --- .../spark/sql/connector/WriteDistributionAndOrderingSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala index 7c4852c5e22d5..588490e07dfd6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala @@ -1174,7 +1174,7 @@ class WriteDistributionAndOrderingSuite extends DistributionAndOrderingSuiteBase Invoke( Literal.create(YearsFunction, ObjectType(YearsFunction.getClass)), "invoke", - LongType, + IntegerType, Seq(Cast(attr("day"), TimestampType, Some("America/Los_Angeles"))), Seq(TimestampType), propagateNull = false), From a00c069b55b86d00a24567b05dd378e58b709738 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Wed, 18 Mar 2026 19:47:30 +0100 Subject: [PATCH 4/9] address review comments --- .../spark/sql/connector/catalog/functions/Reducer.java | 7 +++++-- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java index 6772b454efc7f..fa762257359ab 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java @@ -42,8 +42,11 @@ public interface Reducer { O reduce(I arg); /** - * Returns the {@link DataType data type} of values produced by this function. - * It can return null to signal it doesn't change the input type. + * Returns the {@link DataType data type} of values produced by this reducer. + * + * As a reducer doesn't know the result {@link DataType data type} of the reduced transform + * function, for compatibility reasons it can return null to signal it doesn't change the type of + * partition keys when the keys are reduced. * * @return a data type for values produced by this function. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 11fb4b994d00d..cdcbb9de300f1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2124,7 +2124,7 @@ object SQLConf { val V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS = buildConf("spark.sql.sources.v2.bucketing.allowCompatibleTransforms.enabled") .doc("Whether to allow storage-partition join in the case where the partition transforms " + - "are compatible but not identical. This config requires both " + + "are compatible but not identical. This config requires both " + s"${V2_BUCKETING_ENABLED.key} and ${V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key} to be " + s"enabled and ${V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key} " + "to be disabled." From c20b3012bbe6a7f3aff32c219c6cb5e3b88cffa7 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Thu, 19 Mar 2026 09:19:03 +0100 Subject: [PATCH 5/9] Extract `TypedReducer` from `Reducer` --- .../connector/catalog/functions/Reducer.java | 18 +++----- .../catalog/functions/TypedReducer.java | 45 +++++++++++++++++++ .../plans/physical/partitioning.scala | 4 +- .../functions/transformFunctions.scala | 6 +-- 4 files changed, 55 insertions(+), 18 deletions(-) create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/TypedReducer.java diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java index 7ba4024af68ed..dc568cf4ccbfd 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java @@ -33,8 +33,11 @@ * r1(f_source(x)) = r2(f_target(x)) for all input x. * * - * @param reducer input type - * @param reducer output type + *

If the reducer changes the logical Spark {@link DataType data type} of the values it produces, + * implement {@link TypedReducer} instead. + * + * @param the physical Java type of the input + * @param the physical Java type of the output * @since 4.0.0 */ @Evolving @@ -48,15 +51,4 @@ public interface Reducer { default String displayName() { return getClass().getSimpleName(); } - - /** - * Returns the {@link DataType data type} of values produced by this reducer. - * - * As a reducer doesn't know the result {@link DataType data type} of the reduced transform - * function, for compatibility reasons it can return null to signal it doesn't change the type of - * partition keys when the keys are reduced. - * - * @return a data type for values produced by this function. - */ - default DataType resultType() { return null; } } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/TypedReducer.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/TypedReducer.java new file mode 100644 index 0000000000000..89b94d17f6313 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/TypedReducer.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connector.catalog.functions; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.types.DataType; + +/** + * A {@link Reducer} that changes the {@link DataType data type} of the values it produces. + * + *

Implement this interface instead of {@link Reducer} when the reducer produces values of a + * different logical Spark {@link DataType data type} than its input. If a {@link ReducibleFunction} + * returns a {@code TypedReducer} from its {@code reducer()} method, the output type is taken from + * {@link #resultType()}. If it returns a plain {@link Reducer}, the output type is assumed to be + * unchanged. + * + * @param the physical Java type of the input + * @param the physical Java type of the output + * @see Reducer + * @see ReducibleFunction + * @since 4.2.0 + */ +@Evolving +public interface TypedReducer extends Reducer { + /** + * Returns the logical Spark {@link DataType data type} of values produced by this reducer. + * + * @return the data type of values produced by this reducer. + */ + DataType resultType(); +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 7d1c80e3f903e..f81a55fbc771e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -24,7 +24,7 @@ import org.apache.spark.{SparkException, SparkUnsupportedOperationException} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.InternalRowComparableWrapper -import org.apache.spark.sql.connector.catalog.functions.Reducer +import org.apache.spark.sql.connector.catalog.functions.{Reducer, TypedReducer} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, IntegerType} @@ -591,7 +591,7 @@ object KeyedPartitioning { dataTypes: Seq[DataType], reducers: Seq[Option[Reducer[_, _]]]): Seq[DataType] = { dataTypes.zip(reducers).map { - case (t, Some(reducer: Reducer[Any, Any])) => Option(reducer.resultType()).getOrElse(t) + case (t, Some(reducer: TypedReducer[Any, Any])) => reducer.resultType() case (t, _) => t } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala index 3d1b996503cc0..99f0cee2b4dec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala @@ -145,7 +145,7 @@ abstract class DaysToYearsReducerBase { val EPOCH_LOCAL_DATE: LocalDate = Instant.EPOCH.atZone(UTC).toLocalDate } -case class DaysToYearsReducer() extends DaysToYearsReducerBase with Reducer[Int, Int] { +case class DaysToYearsReducer() extends DaysToYearsReducerBase with TypedReducer[Int, Int] { override def reduce(days: Int): Int = { val localDate = EPOCH_LOCAL_DATE.plusDays(days) ChronoUnit.YEARS.between(EPOCH_LOCAL_DATE, localDate).toInt @@ -154,7 +154,7 @@ case class DaysToYearsReducer() extends DaysToYearsReducerBase with Reducer[Int, override def resultType(): DataType = IntegerType } -// No `resultType()` override means that the reduced type is the original `DateType`. +// Not a `TypedReducer`, so the reduced type is the original `DateType`. case class DaysToYearsReducerWithCompatiblePhysicalType() extends DaysToYearsReducerBase with Reducer[Int, Int] { override def reduce(days: Int): Int = { @@ -164,7 +164,7 @@ case class DaysToYearsReducerWithCompatiblePhysicalType() } case class DaysToYearsReducerWithIncompablePhysicalType() - extends DaysToYearsReducerBase with Reducer[Int, Long] { + extends DaysToYearsReducerBase with TypedReducer[Int, Long] { override def reduce(days: Int): Long = { val localDate = EPOCH_LOCAL_DATE.plusDays(days) ChronoUnit.YEARS.between(EPOCH_LOCAL_DATE, localDate) From 595d59ebf924bc30bdf47813b2f664d1b97bad07 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Thu, 19 Mar 2026 18:38:23 +0100 Subject: [PATCH 6/9] address review comments --- .../connector/catalog/functions/Reducer.java | 1 + .../apache/spark/sql/internal/SQLConf.scala | 5 +- .../KeyGroupedPartitioningSuite.scala | 46 +++++++++---------- 3 files changed, 27 insertions(+), 25 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java index dc568cf4ccbfd..9b4f6baad6a8a 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java @@ -32,6 +32,7 @@ *

  • More generally, there exists reducer functions r1(x) and r2(x) such that * r1(f_source(x)) = r2(f_target(x)) for all input x.
  • * + * where = means both value and data type match. * *

    If the reducer changes the logical Spark {@link DataType data type} of the values it produces, * implement {@link TypedReducer} instead. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index d7f1993e03bb8..3916b6e018858 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2145,7 +2145,8 @@ object SQLConf { .createWithDefault(false) val V2_BUCKETING_ALLOW_INCOMPATIBLE_TRANSFORM_TYPES = - buildConf("spark.sql.sources.v2.bucketing.allowIncompatibleTransformTypes.enabled") + buildConf("spark.sql.legacy.sources.v2.bucketing.allowIncompatibleTransformTypes.enabled") + .internal() .doc("Whether to allow storage-partition join where the left and right partition " + "transforms are reduced to differing logical types and in that case use the left reduced " + "logical types for comparison. This config requires " + @@ -2153,7 +2154,7 @@ object SQLConf { .version("4.2.0") .withBindingPolicy(ConfigBindingPolicy.SESSION) .booleanConf - .createWithDefault(true) + .createWithDefault(false) val V2_BUCKETING_PARTITION_FILTER_ENABLED = buildConf("spark.sql.sources.v2.bucketing.partition.filter.enabled") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala index e5d18a33197b1..ff55537cc4842 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala @@ -3329,33 +3329,33 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with s"testcat.ns.$items i JOIN testcat.ns.$purchases p ON p.time = i.arrive_time", s"testcat.ns.$purchases p JOIN testcat.ns.$items i ON i.arrive_time = p.time" ).foreach { joinSting => - val df = sql( - s""" - |${selectWithMergeJoinHint("i", "p")} id, item_id - |FROM $joinSting - |ORDER BY id, item_id - |""".stripMargin) + val e = intercept[SparkException] { + val df = sql( + s""" + |${selectWithMergeJoinHint("i", "p")} id, item_id + |FROM $joinSting + |ORDER BY id, item_id + |""".stripMargin) - val shuffles = collectShuffles(df.queryExecution.executedPlan) - assert(shuffles.isEmpty, "should not add shuffle for both sides of the join") - val groupPartitions = collectGroupPartitions(df.queryExecution.executedPlan) - assert(groupPartitions.forall(_.outputPartitioning.numPartitions == 2)) + df.collect() + } + assert(e.getMessage.startsWith( + "Storage-partition join partition transforms produced incompatible reduced types")) - checkAnswer(df, Seq(Row(0, 1), Row(1, 1))) + withSQLConf(SQLConf.V2_BUCKETING_ALLOW_INCOMPATIBLE_TRANSFORM_TYPES.key -> "true") { + val df = sql( + s""" + |${selectWithMergeJoinHint("i", "p")} id, item_id + |FROM $joinSting + |ORDER BY id, item_id + |""".stripMargin) - withSQLConf(SQLConf.V2_BUCKETING_ALLOW_INCOMPATIBLE_TRANSFORM_TYPES.key -> "false") { - val e = intercept[SparkException] { - val df = sql( - s""" - |${selectWithMergeJoinHint("i", "p")} id, item_id - |FROM $joinSting - |ORDER BY id, item_id - |""".stripMargin) + val shuffles = collectShuffles(df.queryExecution.executedPlan) + assert(shuffles.isEmpty, "should not add shuffle for both sides of the join") + val groupPartitions = collectGroupPartitions(df.queryExecution.executedPlan) + assert(groupPartitions.forall(_.outputPartitioning.numPartitions == 2)) - df.collect() - } - assert(e.getMessage.startsWith( - "Storage-partition join partition transforms produced incompatible reduced types")) + checkAnswer(df, Seq(Row(0, 1), Row(1, 1))) } } } From b82df203035d21a9f66114e6d69ff07f6025dd57 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Fri, 20 Mar 2026 15:43:40 +0100 Subject: [PATCH 7/9] simplify solution --- .../connector/catalog/functions/Reducer.java | 10 ++- .../catalog/functions/TypedReducer.java | 45 ---------- .../plans/physical/partitioning.scala | 39 +++----- .../apache/spark/sql/internal/SQLConf.scala | 15 ---- .../datasources/v2/GroupPartitionsExec.scala | 10 +-- .../exchange/EnsureRequirements.scala | 90 +++++-------------- .../KeyGroupedPartitioningSuite.scala | 81 ++--------------- .../functions/transformFunctions.scala | 65 +++----------- 8 files changed, 66 insertions(+), 289 deletions(-) delete mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/TypedReducer.java diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java index 9b4f6baad6a8a..28520aa56258c 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java @@ -34,9 +34,6 @@ * * where = means both value and data type match. * - *

    If the reducer changes the logical Spark {@link DataType data type} of the values it produces, - * implement {@link TypedReducer} instead. - * * @param the physical Java type of the input * @param the physical Java type of the output * @since 4.0.0 @@ -52,4 +49,11 @@ public interface Reducer { default String displayName() { return getClass().getSimpleName(); } + + /** + * Returns the {@link DataType data type} of values produced by this reducer. + * + * @return the data type of values produced by this reducer. + */ + DataType resultType(); } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/TypedReducer.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/TypedReducer.java deleted file mode 100644 index 89b94d17f6313..0000000000000 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/TypedReducer.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.connector.catalog.functions; - -import org.apache.spark.annotation.Evolving; -import org.apache.spark.sql.types.DataType; - -/** - * A {@link Reducer} that changes the {@link DataType data type} of the values it produces. - * - *

    Implement this interface instead of {@link Reducer} when the reducer produces values of a - * different logical Spark {@link DataType data type} than its input. If a {@link ReducibleFunction} - * returns a {@code TypedReducer} from its {@code reducer()} method, the output type is taken from - * {@link #resultType()}. If it returns a plain {@link Reducer}, the output type is assumed to be - * unchanged. - * - * @param the physical Java type of the input - * @param the physical Java type of the output - * @see Reducer - * @see ReducibleFunction - * @since 4.2.0 - */ -@Evolving -public interface TypedReducer extends Reducer { - /** - * Returns the logical Spark {@link DataType data type} of values produced by this reducer. - * - * @return the data type of values produced by this reducer. - */ - DataType resultType(); -} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index f81a55fbc771e..8b11860d0eb87 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -24,7 +24,7 @@ import org.apache.spark.{SparkException, SparkUnsupportedOperationException} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.InternalRowComparableWrapper -import org.apache.spark.sql.connector.catalog.functions.{Reducer, TypedReducer} +import org.apache.spark.sql.connector.catalog.functions.Reducer import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, IntegerType} @@ -464,15 +464,12 @@ case class KeyedPartitioning( KeyedPartitioning.projectKeys(partitionKeys, expressionDataTypes, positions) /** - * Reduces this partitioning's partition keys by applying the given reducers and use the provided - * types for comparison. - * Returns the distinct reduced keys. + * Reduces this partitioning's partition keys by applying the given reducers. + * Returns the reduced keys and their data types. */ def reduceKeys( - reducers: Seq[Option[Reducer[_, _]]], - reducedDataTypes: Seq[DataType]): Seq[InternalRowComparableWrapper] = - KeyedPartitioning.reduceKeys(partitionKeys, expressionDataTypes, reducers, reducedDataTypes) - .distinct + reducers: Seq[Option[Reducer[_, _]]]): (Seq[DataType], Seq[InternalRowComparableWrapper]) = + KeyedPartitioning.reduceKeys(partitionKeys, expressionDataTypes, reducers) override def satisfies0(required: Distribution): Boolean = { nonGroupedSatisfies(required) || groupedSatisfies(required) @@ -585,29 +582,19 @@ object KeyedPartitioning { } /** - * Reduces a sequence of data types by applying reducers to each position. - */ - def reduceTypes( - dataTypes: Seq[DataType], - reducers: Seq[Option[Reducer[_, _]]]): Seq[DataType] = { - dataTypes.zip(reducers).map { - case (t, Some(reducer: TypedReducer[Any, Any])) => reducer.resultType() - case (t, _) => t - } - } - - /** - * Reduces a sequence of partition keys by applying reducers to each position and using the - * provided types for comparison. + * Reduces a sequence of partition keys by applying reducers to each position. */ def reduceKeys( keys: Seq[InternalRowComparableWrapper], dataTypes: Seq[DataType], - reducers: Seq[Option[Reducer[_, _]]], - reducedDataTypes: Seq[DataType]): Seq[InternalRowComparableWrapper] = { + reducers: Seq[Option[Reducer[_, _]]]): (Seq[DataType], Seq[InternalRowComparableWrapper]) = { + val reducedDataTypes = dataTypes.zip(reducers).map { + case (_, Some(reducer: Reducer[Any, Any])) => reducer.resultType() + case (t, _) => t + } val comparableKeyWrapperFactory = InternalRowComparableWrapper.getInternalRowComparableWrapperFactory(reducedDataTypes) - keys.map { key => + val reducedKeys = keys.map { key => val keyValues = key.row.toSeq(dataTypes) val reducedKey = keyValues.zip(reducers).map { case (v, Some(reducer: Reducer[Any, Any])) => reducer.reduce(v) @@ -615,6 +602,8 @@ object KeyedPartitioning { }.toArray comparableKeyWrapperFactory(new GenericInternalRow(reducedKey)) } + + (reducedDataTypes, reducedKeys) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 3916b6e018858..dc5c580f1b08c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2144,18 +2144,6 @@ object SQLConf { .booleanConf .createWithDefault(false) - val V2_BUCKETING_ALLOW_INCOMPATIBLE_TRANSFORM_TYPES = - buildConf("spark.sql.legacy.sources.v2.bucketing.allowIncompatibleTransformTypes.enabled") - .internal() - .doc("Whether to allow storage-partition join where the left and right partition " + - "transforms are reduced to differing logical types and in that case use the left reduced " + - "logical types for comparison. This config requires " + - s"${V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS.key} to be enabled.") - .version("4.2.0") - .withBindingPolicy(ConfigBindingPolicy.SESSION) - .booleanConf - .createWithDefault(false) - val V2_BUCKETING_PARTITION_FILTER_ENABLED = buildConf("spark.sql.sources.v2.bucketing.partition.filter.enabled") .doc(s"Whether to filter partitions when running storage-partition join. " + @@ -7742,9 +7730,6 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def v2BucketingAllowCompatibleTransforms: Boolean = getConf(SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS) - def v2BucketingAllowIncompatibleTransformTypes: Boolean = - getConf(SQLConf.V2_BUCKETING_ALLOW_INCOMPATIBLE_TRANSFORM_TYPES) - def v2BucketingAllowSorting: Boolean = getConf(SQLConf.V2_BUCKETING_SORTING_ENABLED) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala index 0d668726991fc..455f7f85d2b51 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala @@ -141,14 +141,8 @@ case class GroupPartitionsExec( )(keyedPartitioning.projectKeys) // Reduce keys if reducers are specified - val (reducedDataTypes, reducedKeys) = reducers match { - case Some(reducers) => - val reducedDataTypes = KeyedPartitioning.reduceTypes(projectedDataTypes, reducers) - val reducedKeys = KeyedPartitioning.reduceKeys(projectedKeys, projectedDataTypes, reducers, - reducedDataTypes) - (reducedDataTypes, reducedKeys) - case _ => (projectedDataTypes, projectedKeys) - } + val (reducedDataTypes, reducedKeys) = reducers.fold((projectedDataTypes, projectedKeys))( + KeyedPartitioning.reduceKeys(projectedKeys, projectedDataTypes, _)) val keyToPartitionIndices = reducedKeys.zipWithIndex.groupMap(_._1)(_._2) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index 22da36842b0c1..5f094227d77ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -26,14 +26,12 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.catalyst.types.PhysicalDataType import org.apache.spark.sql.catalyst.util.InternalRowComparableWrapper import org.apache.spark.sql.connector.catalog.functions.Reducer import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources.v2.GroupPartitionsExec import org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.DataType /** * Ensures that the [[org.apache.spark.sql.catalyst.plans.physical.Partitioning Partitioning]] @@ -513,31 +511,21 @@ case class EnsureRequirements( // transforms to group one side's partitions as well as the common partition values val leftReducers = leftSpec.reducers(rightSpec) val rightReducers = rightSpec.reducers(leftSpec) - val leftReducedDataTypes = leftReducers.fold(leftPartitioning.expressionDataTypes)( - KeyedPartitioning.reduceTypes(leftPartitioning.expressionDataTypes, _)) - val rightReducedDataTypes = rightReducers.fold(rightPartitioning.expressionDataTypes)( - KeyedPartitioning.reduceTypes(rightPartitioning.expressionDataTypes, _)) - if (leftReducedDataTypes != rightReducedDataTypes && ( - !conf.v2BucketingAllowIncompatibleTransformTypes || - leftReducedDataTypes.map(PhysicalDataType(_)) != - rightReducedDataTypes.map(PhysicalDataType(_)))) { - throw new SparkException("Storage-partition join partition transforms produced " + - s"incompatible reduced types, left: $leftReducedDataTypes, right: " + - s"$rightReducedDataTypes") + val (leftReducedDataTypes, leftReducedKeys) = leftReducers.fold( + (leftPartitioning.expressionDataTypes, leftPartitioning.partitionKeys) + )(leftPartitioning.reduceKeys) + val (rightReducedDataTypes, rightReducedKeys) = rightReducers.fold( + (rightPartitioning.expressionDataTypes, rightPartitioning.partitionKeys) + )(rightPartitioning.reduceKeys) + if (leftReducedDataTypes != rightReducedDataTypes) { + throw new SparkException("Storage-partition join partition transforms produced " + + s"incompatible reduced types, left: $leftReducedDataTypes, right: " + + s"$rightReducedDataTypes") } - val commonDataTypes = leftReducedDataTypes - val leftReducedKeys = leftReducers.fold(leftPartitioning.partitionKeys)( - leftPartitioning.reduceKeys(_, commonDataTypes)) - // As we use left side reduced types as common types for comparison, the right side - // partitions keys might need a new comparable wrapper (depending on the legacy flag) - val rightReducedKeys = rightReducers.fold( - rewrapKeys(rightPartitioning.partitionKeys, rightReducedDataTypes, commonDataTypes))( - rightPartitioning.reduceKeys(_, commonDataTypes)) // merge values on both sides - var mergedPartitionKeys = - mergePartitions(leftReducedKeys, rightReducedKeys, joinType, leftPartitioning.keyOrdering) - .map((_, 1)) + var mergedPartitionKeys = mergeAndDedupPartitions(leftReducedKeys, rightReducedKeys, + joinType, leftPartitioning.keyOrdering).map((_, 1)) logInfo(log"After merging, there are " + log"${MDC(LogKeys.NUM_PARTITIONS, mergedPartitionKeys.size)} partitions") @@ -647,17 +635,10 @@ case class EnsureRequirements( } } - val leftMergedPartitionKeys = mergedPartitionKeys - // As we used left side reduced types as common types for comparison, the merged partition - // keys that we push doww to the right side might need a new comparable wrapper (depending - // on the legacy flag) - val rightMergedPartitionKeys = - rewrapKeyMap(mergedPartitionKeys, commonDataTypes, rightReducedDataTypes) - // Now we need to push-down the common partition information to the `GroupPartitionsExec`s. - newLeft = applyGroupPartitions(left, leftSpec.joinKeyPositions, leftMergedPartitionKeys, + newLeft = applyGroupPartitions(left, leftSpec.joinKeyPositions, mergedPartitionKeys, leftReducers, distributePartitions = applyPartialClustering && !replicateLeftSide) - newRight = applyGroupPartitions(right, rightSpec.joinKeyPositions, rightMergedPartitionKeys, + newRight = applyGroupPartitions(right, rightSpec.joinKeyPositions, mergedPartitionKeys, rightReducers, distributePartitions = applyPartialClustering && !replicateRightSide) } } @@ -682,32 +663,6 @@ case class EnsureRequirements( } } - private def rewrapKeys( - keys: Seq[InternalRowComparableWrapper], - currentDataTypes: Seq[DataType], - expectedDataType: Seq[DataType]) = { - if (currentDataTypes != expectedDataType) { - val comparableKeyWrapperFactory = - InternalRowComparableWrapper.getInternalRowComparableWrapperFactory(expectedDataType) - keys.map(key => comparableKeyWrapperFactory(key.row)) - } else { - keys - } - } - - private def rewrapKeyMap( - keyMap: Seq[(InternalRowComparableWrapper, Int)], - currentDataTypes: Seq[DataType], - expectedDataType: Seq[DataType]) = { - if (currentDataTypes != expectedDataType) { - val comparableKeyWrapperFactory = - InternalRowComparableWrapper.getInternalRowComparableWrapperFactory(expectedDataType) - keyMap.map { case (key, numParts) => (comparableKeyWrapperFactory(key.row), numParts) } - } else { - keyMap - } - } - // Similar to `OptimizeSkewedJoin.canSplitRightSide` private def canReplicateLeftSide(joinType: JoinType): Boolean = { joinType == Inner || joinType == Cross || joinType == RightOuter @@ -804,7 +759,7 @@ case class EnsureRequirements( } /** - * Merge and sort partitions keys for SPJ and optionally enable partition filtering. + * Merge, dedup and sort partitions keys for SPJ and optionally enable partition filtering. * Both sides must have matching partition expressions. * @param leftPartitionKeys left side partition keys * @param rightPartitionKeys right side partition keys @@ -812,20 +767,21 @@ case class EnsureRequirements( * @keyOrdering ordering to sort partition keys * @return merged and sorted partition values */ - def mergePartitions( + def mergeAndDedupPartitions( leftPartitionKeys: Seq[InternalRowComparableWrapper], rightPartitionKeys: Seq[InternalRowComparableWrapper], joinType: JoinType, keyOrdering: Ordering[InternalRowComparableWrapper]): Seq[InternalRowComparableWrapper] = { val merged = if (SQLConf.get.getConf(SQLConf.V2_BUCKETING_PARTITION_FILTER_ENABLED)) { joinType match { - case Inner => mergePartitionKeys(leftPartitionKeys, rightPartitionKeys, intersect = true) - case LeftOuter => leftPartitionKeys - case RightOuter => rightPartitionKeys - case _ => mergePartitionKeys(leftPartitionKeys, rightPartitionKeys) + case Inner => + mergeAndDedupPartitionKeys(leftPartitionKeys, rightPartitionKeys, intersect = true) + case LeftOuter => leftPartitionKeys.distinct + case RightOuter => rightPartitionKeys.distinct + case _ => mergeAndDedupPartitionKeys(leftPartitionKeys, rightPartitionKeys) } } else { - mergePartitionKeys(leftPartitionKeys, rightPartitionKeys) + mergeAndDedupPartitionKeys(leftPartitionKeys, rightPartitionKeys) } // SPARK-41471: We keep to order of partitions to make sure the order of @@ -833,7 +789,7 @@ case class EnsureRequirements( merged.sorted(keyOrdering) } - private def mergePartitionKeys( + private def mergeAndDedupPartitionKeys( leftPartitionKeys: Seq[InternalRowComparableWrapper], rightPartitionKeys: Seq[InternalRowComparableWrapper], intersect: Boolean = false) = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala index ff55537cc4842..c840582978923 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala @@ -3261,7 +3261,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with } } - test("SPARK-56046: Reducers with same logical result types") { + test("SPARK-56046: Reducers with same result types") { val items_partitions = Array(days("arrive_time")) createTable(items, itemsColumns, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + @@ -3277,12 +3277,9 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with s"(5, 44.0, cast('2020-01-15' as timestamp)), " + s"(7, 46.5, cast('2021-02-08' as timestamp))") - Seq(true, false).foreach { allowIncompatibleTransformTypes => - withSQLConf( + withSQLConf( SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true", - SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS.key -> "true", - SQLConf.V2_BUCKETING_ALLOW_INCOMPATIBLE_TRANSFORM_TYPES.key -> - allowIncompatibleTransformTypes.toString) { + SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS.key -> "true") { Seq( s"testcat.ns.$items i JOIN testcat.ns.$purchases p ON p.time = i.arrive_time", s"testcat.ns.$purchases p JOIN testcat.ns.$items i ON i.arrive_time = p.time" @@ -3302,11 +3299,10 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with checkAnswer(df, Seq(Row(0, 1), Row(1, 1))) } } - } } - test("SPARK-56046: Reducers with different logical but compatible physical result types") { - withFunction(UnboundDaysFunctionWithCompatiblePhysicalTypeReducer) { + test("SPARK-56046: Reducers with different result types") { + withFunction(UnboundDaysFunctionWithIncompatibleResultTypeReducer) { val items_partitions = Array(days("arrive_time")) createTable(items, itemsColumns, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + @@ -3330,78 +3326,15 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with s"testcat.ns.$purchases p JOIN testcat.ns.$items i ON i.arrive_time = p.time" ).foreach { joinSting => val e = intercept[SparkException] { - val df = sql( + sql( s""" |${selectWithMergeJoinHint("i", "p")} id, item_id |FROM $joinSting |ORDER BY id, item_id - |""".stripMargin) - - df.collect() + |""".stripMargin).collect() } assert(e.getMessage.startsWith( "Storage-partition join partition transforms produced incompatible reduced types")) - - withSQLConf(SQLConf.V2_BUCKETING_ALLOW_INCOMPATIBLE_TRANSFORM_TYPES.key -> "true") { - val df = sql( - s""" - |${selectWithMergeJoinHint("i", "p")} id, item_id - |FROM $joinSting - |ORDER BY id, item_id - |""".stripMargin) - - val shuffles = collectShuffles(df.queryExecution.executedPlan) - assert(shuffles.isEmpty, "should not add shuffle for both sides of the join") - val groupPartitions = collectGroupPartitions(df.queryExecution.executedPlan) - assert(groupPartitions.forall(_.outputPartitioning.numPartitions == 2)) - - checkAnswer(df, Seq(Row(0, 1), Row(1, 1))) - } - } - } - } - } - - test("SPARK-56046: Reducers with different logical and incompatible physical result types") { - withFunction(UnboundDaysFunctionWithIncompatiblePhysicalTypeReducer) { - val items_partitions = Array(days("arrive_time")) - createTable(items, itemsColumns, items_partitions) - sql(s"INSERT INTO testcat.ns.$items VALUES " + - s"(0, 'aa', 39.0, cast('2020-01-01' as timestamp)), " + - s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + - s"(2, 'bb', 41.0, cast('2021-01-03' as timestamp)), " + - s"(3, 'bb', 42.0, cast('2021-01-04' as timestamp))") - - val purchases_partitions = Array(years("time")) - createTable(purchases, purchasesColumns, purchases_partitions) - sql(s"INSERT INTO testcat.ns.$purchases VALUES " + - s"(1, 42.0, cast('2020-01-01' as timestamp)), " + - s"(5, 44.0, cast('2020-01-15' as timestamp)), " + - s"(7, 46.5, cast('2021-02-08' as timestamp))") - - Seq(true, false).foreach { allowIncompatibleTransformTypes => - withSQLConf( - SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true", - SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS.key -> "true", - SQLConf.V2_BUCKETING_ALLOW_INCOMPATIBLE_TRANSFORM_TYPES.key -> - allowIncompatibleTransformTypes.toString) { - Seq( - s"testcat.ns.$items i JOIN testcat.ns.$purchases p ON p.time = i.arrive_time", - s"testcat.ns.$purchases p JOIN testcat.ns.$items i ON i.arrive_time = p.time" - ).foreach { joinSting => - val e = intercept[SparkException] { - val df = sql( - s""" - |${selectWithMergeJoinHint("i", "p")} id, item_id - |FROM $joinSting - |ORDER BY id, item_id - |""".stripMargin) - - df.collect() - } - assert(e.getMessage.startsWith( - "Storage-partition join partition transforms produced incompatible reduced types")) - } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala index 99f0cee2b4dec..f9e583ab7a6cf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala @@ -75,25 +75,16 @@ object UnboundDaysFunction extends UnboundDaysFunctionBase { } } -object UnboundDaysFunctionWithCompatiblePhysicalTypeReducer extends UnboundDaysFunctionBase { +object UnboundDaysFunctionWithIncompatibleResultTypeReducer extends UnboundDaysFunctionBase { override def bind(inputType: StructType): BoundFunction = { if (inputType.size == 1 && isValidType(inputType.head.dataType)) { - DaysFunctionWithCompatiblePhysicalTypeReducer + DaysFunctionWithIncompatibleResultTypeReducer } else throw new UnsupportedOperationException( "'days' only take date or timestamp as input type") } } -object UnboundDaysFunctionWithIncompatiblePhysicalTypeReducer extends UnboundDaysFunctionBase { - override def bind(inputType: StructType): BoundFunction = { - if (inputType.size == 1 && isValidType(inputType.head.dataType)) { - DaysFunctionWithIncompatiblePhysicalTypeReducer - } else throw new UnsupportedOperationException( - "'days' only take date or timestamp as input type") - } -} - -abstract class DaysFunctionBase extends ScalarFunction[Int] { +abstract class DaysFunctionBase extends ScalarFunction[Int] with ReducibleFunction[Int, Int] { override def inputTypes(): Array[DataType] = Array(TimestampType) override def resultType(): DataType = DateType override def name(): String = "days" @@ -102,7 +93,7 @@ abstract class DaysFunctionBase extends ScalarFunction[Int] { // This `days` function reduces `DateType` partitions keys to `IntegerType` partitions keys when // partitions are reduced to partitions of a `years` function, which produces `IntegerType` keys. -object DaysFunction extends DaysFunctionBase with ReducibleFunction[Int, Int] { +object DaysFunction extends DaysFunctionBase { override def reducer(otherFunc: ReducibleFunction[_, _]): Reducer[Int, Int] = { if (otherFunc == YearsFunction) { DaysToYearsReducer() @@ -112,65 +103,34 @@ object DaysFunction extends DaysFunctionBase with ReducibleFunction[Int, Int] { } } -// This `days` function reduces `DateType` partitions keys to `DateType` partitions keys when +// This `days` function reduces `DateType` partition keys to `DateType` partition keys when // partitions are reduced to partitions of a `years` function, which produces `IntegerType` keys. -// `DateType` and `IntegerType` share the same `PhysicalDataType`. -object DaysFunctionWithCompatiblePhysicalTypeReducer - extends DaysFunctionBase with ReducibleFunction[Int, Int] { +object DaysFunctionWithIncompatibleResultTypeReducer extends DaysFunctionBase { override def reducer(otherFunc: ReducibleFunction[_, _]): Reducer[Int, Int] = { if (otherFunc == YearsFunction) { - DaysToYearsReducerWithCompatiblePhysicalType() - } else { - null - } - } -} - -// This `days` function reduces `DateType` partitions keys to `LongType` partitions keys when -// partitions are reduced to partitions of a `years` function, which produces `IntegerType` keys. -// `LongType` and `IntegerType` have different `PhysicalDataType`s. -object DaysFunctionWithIncompatiblePhysicalTypeReducer - extends DaysFunctionBase with ReducibleFunction[Int, Long] { - override def reducer(otherFunc: ReducibleFunction[_, _]): Reducer[Int, Long] = { - if (otherFunc == YearsFunction) { - DaysToYearsReducerWithIncompablePhysicalType() + DaysToYearsReducerWithIncompableResultType() } else { null } } } -abstract class DaysToYearsReducerBase { +abstract class DaysToYearsReducerBase extends Reducer[Int, Int] { val UTC: ZoneId = ZoneId.of("UTC") val EPOCH_LOCAL_DATE: LocalDate = Instant.EPOCH.atZone(UTC).toLocalDate -} -case class DaysToYearsReducer() extends DaysToYearsReducerBase with TypedReducer[Int, Int] { override def reduce(days: Int): Int = { val localDate = EPOCH_LOCAL_DATE.plusDays(days) ChronoUnit.YEARS.between(EPOCH_LOCAL_DATE, localDate).toInt } - - override def resultType(): DataType = IntegerType } -// Not a `TypedReducer`, so the reduced type is the original `DateType`. -case class DaysToYearsReducerWithCompatiblePhysicalType() - extends DaysToYearsReducerBase with Reducer[Int, Int] { - override def reduce(days: Int): Int = { - val localDate = EPOCH_LOCAL_DATE.plusDays(days) - ChronoUnit.YEARS.between(EPOCH_LOCAL_DATE, localDate).toInt - } +case class DaysToYearsReducer() extends DaysToYearsReducerBase { + override def resultType(): DataType = IntegerType } -case class DaysToYearsReducerWithIncompablePhysicalType() - extends DaysToYearsReducerBase with TypedReducer[Int, Long] { - override def reduce(days: Int): Long = { - val localDate = EPOCH_LOCAL_DATE.plusDays(days) - ChronoUnit.YEARS.between(EPOCH_LOCAL_DATE, localDate) - } - - override def resultType(): DataType = LongType +case class DaysToYearsReducerWithIncompableResultType() extends DaysToYearsReducerBase { + override def resultType(): DataType = DateType } object UnboundBucketFunction extends UnboundFunction { @@ -209,6 +169,7 @@ object BucketFunction extends ScalarFunction[Int] with ReducibleFunction[Int, In case class BucketReducer(divisor: Int) extends Reducer[Int, Int] { override def reduce(bucket: Int): Int = bucket % divisor + override def resultType(): DataType = IntegerType override def displayName(): String = toString } From 39f7c60147a7c82c1e107c76a867e793f10d295f Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Fri, 20 Mar 2026 16:39:45 +0100 Subject: [PATCH 8/9] fix typo --- .../sql/connector/catalog/functions/transformFunctions.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala index f9e583ab7a6cf..13e84e32d1dfb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala @@ -108,7 +108,7 @@ object DaysFunction extends DaysFunctionBase { object DaysFunctionWithIncompatibleResultTypeReducer extends DaysFunctionBase { override def reducer(otherFunc: ReducibleFunction[_, _]): Reducer[Int, Int] = { if (otherFunc == YearsFunction) { - DaysToYearsReducerWithIncompableResultType() + DaysToYearsReducerWithIncompatibleResultType() } else { null } @@ -129,7 +129,7 @@ case class DaysToYearsReducer() extends DaysToYearsReducerBase { override def resultType(): DataType = IntegerType } -case class DaysToYearsReducerWithIncompableResultType() extends DaysToYearsReducerBase { +case class DaysToYearsReducerWithIncompatibleResultType() extends DaysToYearsReducerBase { override def resultType(): DataType = DateType } From fbf2630925c8bf0b9420232bb78b8a72f2affe90 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Fri, 20 Mar 2026 16:40:16 +0100 Subject: [PATCH 9/9] extract error class --- .../resources/error/error-conditions.json | 8 ++++++ .../sql/errors/QueryExecutionErrors.scala | 25 +++++++++++++++++++ .../exchange/EnsureRequirements.scala | 10 +++++--- .../KeyGroupedPartitioningSuite.scala | 2 +- 4 files changed, 40 insertions(+), 5 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 2f7d50fe764ae..3c210ca7d985b 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -6541,6 +6541,14 @@ ], "sqlState" : "42601" }, + "STORAGE_PARTITION_JOIN_INCOMPATIBLE_REDUCED_TYPES" : { + "message" : [ + "Storage-partition join partition transforms produced incompatible reduced types,", + "left reducers: returned: ,", + "right reducers: returned: ." + ], + "sqlState" : "42K09" + }, "STREAMING_CHECKPOINT_MISSING_METADATA_FILE" : { "message" : [ "Checkpoint location is in an inconsistent state: the metadata file is missing but offset and/or commit logs contain data. Please restore the metadata file or create a new checkpoint directory." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 8985bdb519d19..d32d42f20004c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -47,6 +47,7 @@ import org.apache.spark.sql.catalyst.trees.{Origin, TreeNode} import org.apache.spark.sql.catalyst.util.{sideBySide, CharsetProvider, DateTimeUtils, FailFastMode, IntervalUtils, MapData} import org.apache.spark.sql.connector.catalog.{CatalogNotFoundException, Table, TableProvider} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ +import org.apache.spark.sql.connector.catalog.functions.Reducer import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE @@ -3128,6 +3129,30 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE ) } + def storagePartitionJoinIncompatibleReducedTypesError( + leftReducers: Option[Seq[Option[Reducer[_, _]]]], + leftReducedDataTypes: Seq[DataType], + rightReducers: Option[Seq[Option[Reducer[_, _]]]], + rightReducedDataTypes: Seq[DataType]): Throwable = { + def reducersNames(reducers: Option[Seq[Option[Reducer[_, _]]]]) = { + reducers.toSeq.flatMap(_.map(_.map(_.displayName()).getOrElse("identity"))) + .mkString("[", ", ", "]") + } + + def dataTypeNames(dataTypes: Seq[DataType]) = { + dataTypes.map(toSQLType).mkString("[", ", ", "]") + } + + new SparkException( + errorClass = "STORAGE_PARTITION_JOIN_INCOMPATIBLE_REDUCED_TYPES", + messageParameters = Map( + "leftReducers" -> reducersNames(leftReducers), + "leftReducedDataTypes" -> dataTypeNames(leftReducedDataTypes), + "rightReducers" -> reducersNames(rightReducers), + "rightReducedDataTypes" -> dataTypeNames(rightReducedDataTypes)), + cause = null) + } + def notAbsolutePathError(path: Path): SparkException = { SparkException.internalError(s"$path is not absolute path.") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index 5f094227d77ce..66cc7c90b63e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.exchange import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import org.apache.spark.SparkException import org.apache.spark.internal.{LogKeys} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ @@ -28,6 +27,7 @@ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.InternalRowComparableWrapper import org.apache.spark.sql.connector.catalog.functions.Reducer +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources.v2.GroupPartitionsExec import org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, SortMergeJoinExec} @@ -518,9 +518,11 @@ case class EnsureRequirements( (rightPartitioning.expressionDataTypes, rightPartitioning.partitionKeys) )(rightPartitioning.reduceKeys) if (leftReducedDataTypes != rightReducedDataTypes) { - throw new SparkException("Storage-partition join partition transforms produced " + - s"incompatible reduced types, left: $leftReducedDataTypes, right: " + - s"$rightReducedDataTypes") + throw QueryExecutionErrors.storagePartitionJoinIncompatibleReducedTypesError( + leftReducers = leftReducers, + leftReducedDataTypes = leftReducedDataTypes, + rightReducers = rightReducers, + rightReducedDataTypes = rightReducedDataTypes) } // merge values on both sides diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala index c840582978923..669752d26cff8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala @@ -3333,7 +3333,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with |ORDER BY id, item_id |""".stripMargin).collect() } - assert(e.getMessage.startsWith( + assert(e.getMessage.contains( "Storage-partition join partition transforms produced incompatible reduced types")) } }