From 41e9318349291607d4221cec93f05a0dd38e9d60 Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Wed, 18 Feb 2026 09:01:09 +0800 Subject: [PATCH 1/7] [AURON #2015] Add Native Scan Support for Apache Iceberg Copy-On-Write Tables. Signed-off-by: slfan1989 --- .github/workflows/iceberg.yml | 96 +++++++ auron-build.sh | 10 +- pom.xml | 215 ++++++++++++++- spark-extension/pom.xml | 8 + .../SparkAuronConfiguration.java | 6 + .../spark/sql/auron/AuronConverters.scala | 4 +- thirdparty/auron-iceberg/pom.xml | 48 ++++ ...pache.spark.sql.auron.AuronConvertProvider | 18 ++ .../iceberg/IcebergConvertProvider.scala | 74 +++++ .../auron/iceberg/IcebergScanSupport.scala | 171 ++++++++++++ .../plan/NativeIcebergTableScanExec.scala | 260 ++++++++++++++++++ .../AuronIcebergIntegrationSuite.scala | 188 +++++++++++++ .../auron/iceberg/BaseAuronIcebergSuite.scala | 16 +- 13 files changed, 1103 insertions(+), 11 deletions(-) create mode 100644 .github/workflows/iceberg.yml create mode 100644 thirdparty/auron-iceberg/src/main/resources/META-INF/services/org.apache.spark.sql.auron.AuronConvertProvider create mode 100644 thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala create mode 100644 thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala create mode 100644 thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala diff --git a/.github/workflows/iceberg.yml b/.github/workflows/iceberg.yml new file mode 100644 index 000000000..038968f6b --- /dev/null +++ b/.github/workflows/iceberg.yml @@ -0,0 +1,96 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +name: Iceberg + +on: + workflow_dispatch: + push: + branches: + - master + - branch-* + pull_request: + branches: + - master + - branch-* + +concurrency: + group: iceberg-${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + +jobs: + test-iceberg: + name: Test Iceberg (${{ matrix.sparkver }} / JDK${{ matrix.javaver }} / Scala${{ matrix.scalaver }}) + runs-on: ubuntu-24.04 + strategy: + fail-fast: false + matrix: + include: + - sparkver: "3.4" + scalaver: "2.12" + javaver: "17" + - sparkver: "3.5" + scalaver: "2.12" + javaver: "17" + - sparkver: "3.5" + scalaver: "2.12" + javaver: "21" + - sparkver: "4.0" + scalaver: "2.13" + javaver: "21" + + steps: + - name: Checkout Auron + uses: actions/checkout@v6 + + - name: Setup Java and Maven cache + uses: actions/setup-java@v5 + with: + distribution: 'adopt-hotspot' + java-version: ${{ matrix.javaver }} + cache: 'maven' + + - name: Build dependencies (skip tests) + run: > + ./build/mvn -B install + -pl thirdparty/auron-iceberg + -am + -Pscala-${{ matrix.scalaver }} + -Pspark-${{ matrix.sparkver }} + -Piceberg + -DicebergEnabled=true + -DicebergVersion=1.10.1 + -DskipBuildNative + -DskipTests + + - name: Test Iceberg Module + run: > + ./build/mvn -B test + -pl thirdparty/auron-iceberg + -Pscala-${{ matrix.scalaver }} + -Pspark-${{ matrix.sparkver }} + -Piceberg + -DicebergEnabled=true + -DicebergVersion=1.10.1 + -DskipBuildNative + + - name: Upload reports + if: failure() + uses: actions/upload-artifact@v6 + with: + name: auron-iceberg-${{ matrix.sparkver }}-jdk${{ matrix.javaver }}-test-report + path: thirdparty/auron-iceberg/target/surefire-reports diff --git a/auron-build.sh b/auron-build.sh index 47e4a5eab..413698fa5 100755 --- a/auron-build.sh +++ b/auron-build.sh @@ -37,7 +37,7 @@ SUPPORTED_CELEBORN_VERSIONS=("0.5" "0.6") SUPPORTED_UNIFFLE_VERSIONS=("0.10") SUPPORTED_PAIMON_VERSIONS=("1.2") SUPPORTED_FLINK_VERSIONS=("1.18") -SUPPORTED_ICEBERG_VERSIONS=("1.9") +SUPPORTED_ICEBERG_VERSIONS=("1.10.1") # ----------------------------------------------------------------------------- # Function: print_help @@ -270,11 +270,11 @@ while [[ $# -gt 0 ]]; do print_invalid_option_error Iceberg "$ICEBERG_VER" "${SUPPORTED_ICEBERG_VERSIONS[@]}" fi if [ -z "$SPARK_VER" ]; then - echo "ERROR: Building iceberg requires spark at the same time, and only Spark versions 3.4 or 3.5 are supported." + echo "ERROR: Building iceberg requires spark at the same time, and only Spark versions 3.4 to 4.0 are supported." exit 1 fi - if [ "$SPARK_VER" != "3.4" ] && [ "$SPARK_VER" != "3.5" ]; then - echo "ERROR: Building iceberg requires spark versions are 3.4 or 3.5." + if [ "$SPARK_VER" != "3.4" ] && [ "$SPARK_VER" != "3.5" ] && [ "$SPARK_VER" != "4.0" ]; then + echo "ERROR: Building iceberg requires spark versions are 3.4, 3.5 or 4.0." exit 1 fi shift 2 @@ -393,7 +393,7 @@ if [[ -n "$FLINK_VER" ]]; then BUILD_ARGS+=("-Pflink-$FLINK_VER") fi if [[ -n "$ICEBERG_VER" ]]; then - BUILD_ARGS+=("-Piceberg-$ICEBERG_VER") + BUILD_ARGS+=("-Piceberg" "-DicebergEnabled=true" "-DicebergVersion=$ICEBERG_VER") fi MVN_ARGS=("${CLEAN_ARGS[@]}" "${BUILD_ARGS[@]}") diff --git a/pom.xml b/pom.xml index c93da9234..b9bfd6871 100644 --- a/pom.xml +++ b/pom.xml @@ -75,6 +75,11 @@ 2.30.0 1.3.0 3.1.0 + 1.10.1 + false + pre + false + 3.6.0 2.1.1 @@ -206,6 +211,16 @@ arrow-compression ${arrowVersion} + + org.apache.arrow + arrow-memory-core + ${arrowVersion} + + + org.apache.arrow + arrow-memory-netty + ${arrowVersion} + org.apache.arrow arrow-memory-unsafe @@ -779,6 +794,32 @@ 3.0 4.1.47.Final + + + + org.apache.maven.plugins + maven-enforcer-plugin + ${maven-enforcer-plugin.version} + + + spark30-disallow-iceberg + + enforce + + + + + icebergEnabled + false + Spark 3.0 is not supported for Iceberg. Use Spark 3.4-4.0 or disable icebergEnabled. + + + + + + + + @@ -790,6 +831,32 @@ 3.1 4.1.51.Final + + + + org.apache.maven.plugins + maven-enforcer-plugin + ${maven-enforcer-plugin.version} + + + spark31-disallow-iceberg + + enforce + + + + + icebergEnabled + false + Spark 3.1 is not supported for Iceberg. Use Spark 3.4-4.0 or disable icebergEnabled. + + + + + + + + @@ -801,6 +868,32 @@ 3.2 4.1.68.Final + + + + org.apache.maven.plugins + maven-enforcer-plugin + ${maven-enforcer-plugin.version} + + + spark32-disallow-iceberg + + enforce + + + + + icebergEnabled + false + Spark 3.2 is not supported for Iceberg. Use Spark 3.4-4.0 or disable icebergEnabled. + + + + + + + + @@ -812,6 +905,32 @@ 3.3 4.1.74.Final + + + + org.apache.maven.plugins + maven-enforcer-plugin + ${maven-enforcer-plugin.version} + + + spark33-disallow-iceberg + + enforce + + + + + icebergEnabled + false + Spark 3.3 is not supported for Iceberg. Use Spark 3.4-4.0 or disable icebergEnabled. + + + + + + + + @@ -822,7 +941,34 @@ 3.4.4 3.4 4.1.87.Final + 1.10.1 + + + + org.apache.maven.plugins + maven-enforcer-plugin + ${maven-enforcer-plugin.version} + + + spark34-enforce-iceberg-version + + enforce + + + + + icebergVersion + 1\.10\.1 + Spark 3.4 requires Iceberg 1.10.1. Current: ${icebergVersion} + + + + + + + + @@ -833,7 +979,34 @@ 3.5.8 3.5 4.1.96.Final + 1.10.1 + + + + org.apache.maven.plugins + maven-enforcer-plugin + ${maven-enforcer-plugin.version} + + + spark35-enforce-iceberg-version + + enforce + + + + + icebergVersion + 1\.10\.1 + Spark 3.5 requires Iceberg 1.10.1. Current: ${icebergVersion} + + + + + + + + @@ -844,6 +1017,7 @@ 4.0.2 4.0 4.1.111.Final + 1.10.1 @@ -872,6 +1046,21 @@ + + spark40-enforce-iceberg-version + + enforce + + + + + icebergVersion + 1\.10\.1 + Spark 4.0 requires Iceberg 1.10.1. Current: ${icebergVersion} + + + + @@ -914,6 +1103,21 @@ + + spark41-disallow-iceberg + + enforce + + + + + icebergEnabled + false + Spark 4.1 is not supported for Iceberg. Use Spark 3.4-4.0 or disable icebergEnabled. + + + + @@ -1110,13 +1314,16 @@ - iceberg-1.9 + iceberg + + + icebergEnabled + true + + thirdparty/auron-iceberg - - 1.9.2 - diff --git a/spark-extension/pom.xml b/spark-extension/pom.xml index 0d6cf7c1e..7d26257c9 100644 --- a/spark-extension/pom.xml +++ b/spark-extension/pom.xml @@ -80,6 +80,14 @@ org.apache.arrow arrow-compression + + org.apache.arrow + arrow-memory-core + + + org.apache.arrow + arrow-memory-netty + org.apache.arrow arrow-memory-unsafe diff --git a/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java index 8846700f1..3cc1e534a 100644 --- a/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java +++ b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java @@ -346,6 +346,12 @@ public class SparkAuronConfiguration extends AuronConfiguration { .withDescription("Enable PaimonScanExec operation conversion to native Auron implementations.") .withDefaultValue(true); + public static final ConfigOption ENABLE_ICEBERG_SCAN = new SQLConfOption<>(Boolean.class) + .withKey("auron.enable.iceberg.scan") + .withCategory("Operator Supports") + .withDescription("Enable Iceberg scan operation conversion to native Auron implementations.") + .withDefaultValue(true); + public static final ConfigOption ENABLE_PROJECT = new SQLConfOption<>(Boolean.class) .withKey("auron.enable.project") .withCategory("Operator Supports") diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala index def645d51..8ff41836b 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala @@ -126,7 +126,9 @@ object AuronConverters extends Logging { Seq("AuronShuffleManager", "AuronUniffleShuffleManager", "AuronCelebornShuffleManager") def supportedShuffleManager: Boolean = { - val name = SQLConf.get.getConfString(config.SHUFFLE_MANAGER.key) + val name = SQLConf.get.getConfString( + config.SHUFFLE_MANAGER.key, + config.SHUFFLE_MANAGER.defaultValueString) supportedShuffleManagers.exists(name.contains) } diff --git a/thirdparty/auron-iceberg/pom.xml b/thirdparty/auron-iceberg/pom.xml index 44b247ae0..bb686f481 100644 --- a/thirdparty/auron-iceberg/pom.xml +++ b/thirdparty/auron-iceberg/pom.xml @@ -31,6 +31,11 @@ Apache Auron Iceberg ${icebergVersion} ${scalaVersion} + + org.apache.auron + spark-extension_${scalaVersion} + ${project.version} + org.apache.iceberg iceberg-spark-runtime-${shortSparkVersion}_${scalaVersion} @@ -70,6 +75,49 @@ spark-sql_${scalaVersion} test-jar + + org.apache.auron + spark-extension-shims-spark_${scalaVersion} + ${project.version} + test + + + + + org.apache.maven.plugins + maven-enforcer-plugin + ${maven-enforcer-plugin.version} + + + iceberg-spark-version-compat + + enforce + + + + + icebergEnabled + true + Iceberg module requires icebergEnabled=true. + + + shortSparkVersion + 3\.4|3\.5|4\.0 + Iceberg integration supports Spark 3.4-4.0 only. Current: ${shortSparkVersion} + + + icebergVersion + 1\.10\.1 + Iceberg integration supports only Iceberg 1.10.1. Current: ${icebergVersion} + + + + + + + + + diff --git a/thirdparty/auron-iceberg/src/main/resources/META-INF/services/org.apache.spark.sql.auron.AuronConvertProvider b/thirdparty/auron-iceberg/src/main/resources/META-INF/services/org.apache.spark.sql.auron.AuronConvertProvider new file mode 100644 index 000000000..409e48d68 --- /dev/null +++ b/thirdparty/auron-iceberg/src/main/resources/META-INF/services/org.apache.spark.sql.auron.AuronConvertProvider @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.spark.sql.auron.iceberg.IcebergConvertProvider diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala new file mode 100644 index 000000000..e6e218661 --- /dev/null +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.auron.iceberg + +import org.apache.spark.SPARK_VERSION +import org.apache.spark.internal.Logging +import org.apache.spark.sql.auron.{AuronConvertProvider, AuronConverters} +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.auron.plan.NativeIcebergTableScanExec +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec + +import org.apache.auron.spark.configuration.SparkAuronConfiguration +import org.apache.auron.util.SemanticVersion + +class IcebergConvertProvider extends AuronConvertProvider with Logging { + + override def isEnabled: Boolean = { + val enabled = SparkAuronConfiguration.ENABLE_ICEBERG_SCAN.get() + if (!enabled) { + return false + } + if (!sparkCompatible) { + logWarning( + s"Disable Iceberg native scan: Spark $SPARK_VERSION is not supported. " + + s"Supported Spark versions: 3.4 to 4.0 (Iceberg ${icebergVersionOrUnknown}).") + return false + } + true + } + + override def isSupported(exec: SparkPlan): Boolean = { + exec match { + case e: BatchScanExec => IcebergScanSupport.plan(e).nonEmpty + case _ => false + } + } + + override def convert(exec: SparkPlan): SparkPlan = { + exec match { + case e: BatchScanExec => + IcebergScanSupport.plan(e) match { + case Some(plan) => + AuronConverters.addRenameColumnsExec(NativeIcebergTableScanExec(e, plan)) + case None => + exec + } + case _ => exec + } + } + + private lazy val sparkCompatible: Boolean = { + SemanticVersion(SPARK_VERSION) >= "3.4" && SemanticVersion(SPARK_VERSION) < "4.1" + } + + private def icebergVersionOrUnknown: String = { + val pkg = classOf[org.apache.iceberg.Table].getPackage + val version = if (pkg != null) pkg.getImplementationVersion else null + Option(version).getOrElse("unknown") + } +} diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala new file mode 100644 index 000000000..7386161e7 --- /dev/null +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.auron.iceberg + +import scala.collection.JavaConverters._ + +import org.apache.iceberg.{FileFormat, FileScanTask, MetadataColumns} +import org.apache.iceberg.expressions.Expressions +import org.apache.spark.internal.Logging +import org.apache.spark.sql.auron.NativeConverters +import org.apache.spark.sql.connector.read.InputPartition +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.types.StructType + +final case class IcebergScanPlan( + fileTasks: Seq[FileScanTask], + fileFormat: FileFormat, + readSchema: StructType) + +object IcebergScanSupport extends Logging { + + def plan(exec: BatchScanExec): Option[IcebergScanPlan] = { + val scan = exec.scan + val scanClassName = scan.getClass.getName + // Only handle Iceberg scans; other sources must stay on Spark's path. + if (!scanClassName.startsWith("org.apache.iceberg.spark.source.")) { + return None + } + + // Changelog scan carries row-level changes; not supported by native COW-only path. + if (scanClassName == "org.apache.iceberg.spark.source.SparkChangelogScan") { + return None + } + + val readSchema = scan.readSchema + // Native scan does not support Iceberg metadata columns (e.g. _file, _pos). + if (hasMetadataColumns(readSchema)) { + return None + } + + if (!readSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType))) { + return None + } + + val partitions = inputPartitions(exec) + // Empty scan (e.g. empty table) should still build a plan to return no rows. + if (partitions.isEmpty) { + return Some(IcebergScanPlan(Seq.empty, FileFormat.PARQUET, readSchema)) + } + + val icebergPartitions = partitions.flatMap(icebergPartition) + // All partitions must be Iceberg SparkInputPartition; otherwise fallback. + if (icebergPartitions.size != partitions.size) { + return None + } + + val fileTasks = icebergPartitions.flatMap(_.fileTasks) + + // Native scan does not apply delete files; only allow pure data files (COW). + if (!fileTasks.forall(task => task.deletes() == null || task.deletes().isEmpty)) { + return None + } + + // Residual filters require row-level evaluation, not supported in native scan. + if (!fileTasks.forall(task => Expressions.alwaysTrue().equals(task.residual()))) { + return None + } + + // Native scan handles a single file format; mixed formats must fallback. + val formats = fileTasks.map(_.file().format()).distinct + if (formats.size > 1) { + return None + } + + val format = formats.headOption.getOrElse(FileFormat.PARQUET) + if (format != FileFormat.PARQUET && format != FileFormat.ORC) { + return None + } + + Some(IcebergScanPlan(fileTasks, format, readSchema)) + } + + private def hasMetadataColumns(schema: StructType): Boolean = + schema.fields.exists(field => MetadataColumns.isMetadataColumn(field.name)) + + private def inputPartitions(exec: BatchScanExec): Seq[InputPartition] = { + // Prefer DataSource V2 batch API; if not available, fallback to exec methods via reflection. + val fromBatch = + try { + val batch = exec.scan.toBatch + if (batch != null) { + batch.planInputPartitions().toSeq + } else { + Seq.empty + } + } catch { + case _: Throwable => Seq.empty + } + if (fromBatch.nonEmpty) { + return fromBatch + } + + // Some Spark versions expose partitions through inputPartitions/partitions methods on BatchScanExec. + val methods = exec.getClass.getMethods + val inputPartitionsMethod = methods.find(_.getName == "inputPartitions") + val partitionsMethod = methods.find(_.getName == "partitions") + + val raw = inputPartitionsMethod + .orElse(partitionsMethod) + .map(_.invoke(exec)) + .getOrElse(Seq.empty) + + // Normalize to Seq[InputPartition], flattening nested Seq if needed. + raw match { + case seq: scala.collection.Seq[_] + if seq.nonEmpty && + seq.head.isInstanceOf[scala.collection.Seq[_]] => + seq.asInstanceOf[scala.collection.Seq[scala.collection.Seq[InputPartition]]].flatten.toSeq + case seq: scala.collection.Seq[_] => + seq.asInstanceOf[scala.collection.Seq[InputPartition]].toSeq + case _ => + Seq.empty + } + } + + private case class IcebergPartitionView(fileTasks: Seq[FileScanTask]) + + private def icebergPartition(partition: InputPartition): Option[IcebergPartitionView] = { + val className = partition.getClass.getName + // Only accept Iceberg SparkInputPartition to access task groups. + if (className != "org.apache.iceberg.spark.source.SparkInputPartition") { + return None + } + + try { + // SparkInputPartition is package-private; use reflection to read its task group. + val taskGroupField = partition.getClass.getDeclaredField("taskGroup") + taskGroupField.setAccessible(true) + val taskGroup = taskGroupField.get(partition) + + // Extract tasks and keep only file scan tasks. + val tasksMethod = taskGroup.getClass.getDeclaredMethod("tasks") + tasksMethod.setAccessible(true) + val tasks = tasksMethod.invoke(taskGroup).asInstanceOf[java.util.Collection[_]].asScala + val fileTasks = tasks.collect { case task: FileScanTask => task }.toSeq + + // If any task is not a FileScanTask, fallback. + if (fileTasks.size != tasks.size) { + return None + } + + Some(IcebergPartitionView(fileTasks)) + } catch { + case _: ReflectiveOperationException => None + } + } +} diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala new file mode 100644 index 000000000..63927e0b0 --- /dev/null +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.auron.plan + +import java.net.URI +import java.security.PrivilegedExceptionAction +import java.util.Locale +import java.util.UUID + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.fs.FileSystem +import org.apache.iceberg.{FileFormat, FileScanTask} +import org.apache.spark.Partition +import org.apache.spark.TaskContext +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.auron.{EmptyNativeRDD, NativeConverters, NativeHelper, NativeRDD, NativeSupports, Shims} +import org.apache.spark.sql.auron.iceberg.IcebergScanPlan +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.LeafExecNode +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.SerializableConfiguration + +import org.apache.auron.{protobuf => pb} +import org.apache.auron.jni.JniBridge +import org.apache.auron.metric.SparkMetricNode + +case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergScanPlan) + extends LeafExecNode + with NativeSupports + with Logging { + + override lazy val metrics: Map[String, SQLMetric] = + NativeHelper.getNativeFileScanMetrics(sparkContext) + + override val output = basedScan.output + override val outputPartitioning = basedScan.outputPartitioning + + private lazy val readSchema: StructType = plan.readSchema + private lazy val fileTasks: Seq[FileScanTask] = plan.fileTasks + + private lazy val partitions: Array[FilePartition] = buildFilePartitions() + private lazy val fileSizes: Map[String, Long] = buildFileSizes() + + private lazy val nativeFileSchema: pb.Schema = NativeConverters.convertSchema(readSchema) + private lazy val nativePartitionSchema: pb.Schema = + NativeConverters.convertSchema(StructType(Nil)) + + private lazy val caseSensitive: Boolean = SQLConf.get.caseSensitiveAnalysis + + private lazy val fieldIndexByName: Map[String, Int] = { + if (caseSensitive) { + readSchema.fieldNames.zipWithIndex.toMap + } else { + readSchema.fieldNames.map(_.toLowerCase(Locale.ROOT)).zipWithIndex.toMap + } + } + + private def fieldIndexFor(name: String): Int = { + if (caseSensitive) { + fieldIndexByName.getOrElse(name, readSchema.fieldIndex(name)) + } else { + fieldIndexByName.getOrElse(name.toLowerCase(Locale.ROOT), readSchema.fieldIndex(name)) + } + } + + private lazy val projection: Seq[Integer] = + output.map(attr => Integer.valueOf(fieldIndexFor(attr.name))) + + private lazy val nativeFileGroups: FilePartition => pb.FileGroup = (partition: FilePartition) => + { + val nativePartitionedFile = (file: PartitionedFile) => { + val filePath = file.filePath.toString + val size = fileSizes.getOrElse(filePath, file.length) + pb.PartitionedFile + .newBuilder() + .setPath(filePath) + .setSize(size) + .setLastModifiedNs(0) + .setRange( + pb.FileRange + .newBuilder() + .setStart(file.start) + .setEnd(file.start + file.length) + .build()) + .build() + } + pb.FileGroup + .newBuilder() + .addAllFiles(partition.files.map(nativePartitionedFile).toList.asJava) + .build() + } + + override def doExecuteNative(): NativeRDD = { + if (partitions.isEmpty) { + return new EmptyNativeRDD(sparkContext) + } + + val nativeMetrics = SparkMetricNode( + metrics, + Nil, + Some({ + case ("bytes_scanned", v) => + val inputMetric = TaskContext.get.taskMetrics().inputMetrics + inputMetric.incBytesRead(v) + case ("output_rows", v) => + val inputMetric = TaskContext.get.taskMetrics().inputMetrics + inputMetric.incRecordsRead(v) + case _ => + })) + + val fileFormat = plan.fileFormat + val broadcastedHadoopConf = this.broadcastedHadoopConf + val numPartitions = partitions.length + + // Build per-partition native scan plans and execute them via NativeRDD. + new NativeRDD( + sparkContext, + nativeMetrics, + partitions.asInstanceOf[Array[Partition]], + None, + Nil, + rddShuffleReadFull = true, + (partition, _) => { + // Register the Hadoop conf for native readers via a per-task resource id. + val resourceId = s"NativeIcebergTableScan:${UUID.randomUUID().toString}" + putJniBridgeResource(resourceId, broadcastedHadoopConf) + + // Convert Spark FilePartition to native FileGroup. + val nativeFileGroup = nativeFileGroups(partition.asInstanceOf[FilePartition]) + val nativeFileScanConf = pb.FileScanExecConf + .newBuilder() + .setNumPartitions(numPartitions) + .setPartitionIndex(partition.index) + .setStatistics(pb.Statistics.getDefaultInstance) + .setSchema(nativeFileSchema) + .setFileGroup(nativeFileGroup) + .addAllProjection(projection.asJava) + .setPartitionSchema(nativePartitionSchema) + .build() + + // Choose a native scan node based on file format. + if (fileFormat == FileFormat.ORC) { + val nativeOrcScanExecBuilder = pb.OrcScanExecNode + .newBuilder() + .setBaseConf(nativeFileScanConf) + .setFsResourceId(resourceId) + // No pruning predicates are pushed down in the native scan yet. + .addAllPruningPredicates(new java.util.ArrayList()) + + pb.PhysicalPlanNode + .newBuilder() + .setOrcScan(nativeOrcScanExecBuilder.build()) + .build() + } else { + val nativeParquetScanExecBuilder = pb.ParquetScanExecNode + .newBuilder() + .setBaseConf(nativeFileScanConf) + .setFsResourceId(resourceId) + // No pruning predicates are pushed down in the native scan yet. + .addAllPruningPredicates(new java.util.ArrayList()) + + pb.PhysicalPlanNode + .newBuilder() + .setParquetScan(nativeParquetScanExecBuilder.build()) + .build() + } + }, + friendlyName = "NativeRDD.IcebergScan") + } + + override val nodeName: String = "NativeIcebergTableScan" + + // Delegate canonicalization to the original scan to keep plan equivalence checks consistent. + override protected def doCanonicalize(): SparkPlan = basedScan.canonicalized + + private def buildFileSizes(): Map[String, Long] = { + // Map file path to full file size; tasks may split a file into multiple ranges. + fileTasks + .map(task => task.file().location() -> task.file().fileSizeInBytes()) + .groupBy(_._1) + .mapValues(_.head._2) + .toMap + } + + private def buildFilePartitions(): Array[FilePartition] = { + // Convert Iceberg file tasks into Spark FilePartition groups for execution. + if (fileTasks.isEmpty) { + return Array.empty + } + + val sparkSession = Shims.get.getSqlContext(basedScan).sparkSession + val maxSplitBytes = getMaxSplitBytes(sparkSession, fileTasks) + val partitionedFiles = fileTasks + .map { task => + val filePath = task.file().location() + Shims.get.getPartitionedFile(InternalRow.empty, filePath, task.start(), task.length()) + } + .sortBy(_.length)(Ordering[Long].reverse) + .toSeq + + FilePartition.getFilePartitions(sparkSession, partitionedFiles, maxSplitBytes).toArray + } + + private def getMaxSplitBytes(sparkSession: SparkSession, tasks: Seq[FileScanTask]): Long = { + val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes + val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes + val minPartitionNum = Shims.get.getMinPartitionNum(sparkSession) + val totalBytes = tasks + .map(task => task.file().fileSizeInBytes() + openCostInBytes) + .sum + val bytesPerCore = if (minPartitionNum > 0) totalBytes / minPartitionNum else totalBytes + + Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) + } + + private def putJniBridgeResource( + resourceId: String, + broadcastedHadoopConf: Broadcast[SerializableConfiguration]): Unit = { + val sharedConf = broadcastedHadoopConf.value.value + JniBridge.putResource( + resourceId, + (location: String) => { + val getFsTimeMetric = metrics("io_time_getfs") + val currentTimeMillis = System.currentTimeMillis() + val fs = NativeHelper.currentUser.doAs(new PrivilegedExceptionAction[FileSystem] { + override def run(): FileSystem = FileSystem.get(new URI(location), sharedConf) + }) + getFsTimeMetric.add((System.currentTimeMillis() - currentTimeMillis) * 1000000) + fs + }) + } + + private def broadcastedHadoopConf: Broadcast[SerializableConfiguration] = { + val sparkSession = Shims.get.getSqlContext(basedScan).sparkSession + val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(Map.empty) + sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) + } +} diff --git a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala index d16d47b4f..1340795e6 100644 --- a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala +++ b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala @@ -16,6 +16,14 @@ */ package org.apache.auron.iceberg +import java.util.UUID + +import scala.collection.JavaConverters._ + +import org.apache.iceberg.{FileFormat, FileScanTask} +import org.apache.iceberg.data.{GenericAppenderFactory, Record} +import org.apache.iceberg.deletes.PositionDelete +import org.apache.iceberg.spark.Spark3Util import org.apache.spark.sql.Row class AuronIcebergIntegrationSuite @@ -31,4 +39,184 @@ class AuronIcebergIntegrationSuite } } + test("iceberg native scan is applied for simple COW table") { + withTable("local.db.t2") { + sql("create table local.db.t2 using iceberg as select 1 as id, 'a' as v") + val df = sql("select * from local.db.t2") + df.collect() + val plan = df.queryExecution.executedPlan.toString() + assert(plan.contains("NativeIcebergTableScan")) + } + } + + test("iceberg native scan is applied for projection on COW table") { + withTable("local.db.t3") { + sql("create table local.db.t3 using iceberg as select 1 as id, 'a' as v") + val df = sql("select id from local.db.t3") + checkAnswer(df, Seq(Row(1))) + val plan = df.queryExecution.executedPlan.toString() + assert(plan.contains("NativeIcebergTableScan")) + } + } + + test("iceberg native scan is applied for partitioned COW table with filter") { + withTable("local.db.t_partition") { + sql(""" + |create table local.db.t_partition (id int, v string, p string) + |using iceberg + |partitioned by (p) + |""".stripMargin) + sql("insert into local.db.t_partition values (1, 'a', 'p1'), (2, 'b', 'p2')") + val df = sql("select * from local.db.t_partition where p = 'p1'") + checkAnswer(df, Seq(Row(1, "a", "p1"))) + val plan = df.queryExecution.executedPlan.toString() + assert(plan.contains("NativeIcebergTableScan")) + } + } + + test("iceberg native scan is applied for ORC COW table") { + withTable("local.db.t_orc") { + sql(""" + |create table local.db.t_orc (id int, v string) + |using iceberg + |tblproperties ('write.format.default' = 'orc') + |""".stripMargin) + sql("insert into local.db.t_orc values (1, 'a'), (2, 'b')") + val df = sql("select * from local.db.t_orc") + checkAnswer(df, Seq(Row(1, "a"), Row(2, "b"))) + val plan = df.queryExecution.executedPlan.toString() + assert(plan.contains("NativeIcebergTableScan")) + } + } + + test("iceberg native scan is applied when delete files are null (format v1)") { + withTable("local.db.t_v1") { + sql(""" + |create table local.db.t_v1 (id int, v string) + |using iceberg + |tblproperties ('format-version' = '1') + |""".stripMargin) + sql("insert into local.db.t_v1 values (1, 'a'), (2, 'b')") + val icebergTable = Spark3Util.loadIcebergTable(spark, "local.db.t_v1") + val scanTasks = icebergTable.newScan().planFiles() + val allDeletesNull = + try { + scanTasks + .iterator() + .asScala + .forall(task => task.deletes() == null) + } finally { + scanTasks.close() + } + assert(allDeletesNull) + val df = sql("select * from local.db.t_v1") + checkAnswer(df, Seq(Row(1, "a"), Row(2, "b"))) + val plan = df.queryExecution.executedPlan.toString() + assert(plan.contains("NativeIcebergTableScan")) + } + } + + test("iceberg scan falls back for residual filters on data columns") { + withTable("local.db.t_residual") { + sql("create table local.db.t_residual (id int, v string) using iceberg") + sql("insert into local.db.t_residual values (1, 'a'), (2, 'b')") + val df = sql("select * from local.db.t_residual where v = 'a'") + checkAnswer(df, Seq(Row(1, "a"))) + val plan = df.queryExecution.executedPlan.toString() + assert(!plan.contains("NativeIcebergTableScan")) + } + } + + test("iceberg scan falls back when reading metadata columns") { + withTable("local.db.t4") { + sql("create table local.db.t4 using iceberg as select 1 as id, 'a' as v") + val df = sql("select _file from local.db.t4") + df.collect() + val plan = df.queryExecution.executedPlan.toString() + assert(!plan.contains("NativeIcebergTableScan")) + } + } + + test("iceberg scan falls back for unsupported decimal types") { + withTable("local.db.t5") { + sql("create table local.db.t5 (id int, amount decimal(38, 10)) using iceberg") + sql("insert into local.db.t5 values (1, 123.45)") + val df = sql("select * from local.db.t5") + checkAnswer(df, Seq(Row(1, new java.math.BigDecimal("123.4500000000")))) + val plan = df.queryExecution.executedPlan.toString() + assert(!plan.contains("NativeIcebergTableScan")) + } + } + + test("iceberg scan falls back when delete files exist") { + withTable("local.db.t_delete") { + sql(""" + |create table local.db.t_delete (id int, v string) + |using iceberg + |tblproperties ( + | 'format-version' = '2', + | 'write.delete.mode' = 'merge-on-read' + |) + |""".stripMargin) + sql("insert into local.db.t_delete values (1, 'a'), (2, 'b')") + addPositionDeleteFile("local.db.t_delete") + val icebergTable = Spark3Util.loadIcebergTable(spark, "local.db.t_delete") + val scanTasks = icebergTable.newScan().planFiles() + val hasDeletes = + try { + scanTasks + .iterator() + .asScala + .exists(task => task.deletes() != null && !task.deletes().isEmpty) + } finally { + scanTasks.close() + } + assert(hasDeletes) + val df = sql("select * from local.db.t_delete") + df.collect() + val plan = df.queryExecution.executedPlan.toString() + assert(!plan.contains("NativeIcebergTableScan")) + } + } + + test("iceberg scan is disabled via spark.auron.enable.iceberg.scan") { + withTable("local.db.t_disable") { + sql("create table local.db.t_disable using iceberg as select 1 as id, 'a' as v") + withSQLConf("spark.auron.enable.iceberg.scan" -> "false") { + assert( + !org.apache.auron.spark.configuration.SparkAuronConfiguration.ENABLE_ICEBERG_SCAN.get()) + val df = sql("select * from local.db.t_disable") + df.collect() + val plan = df.queryExecution.executedPlan.toString() + assert(!plan.contains("NativeIcebergTableScan")) + } + } + } + + private def addPositionDeleteFile(tableName: String): Unit = { + val table = Spark3Util.loadIcebergTable(spark, tableName) + val taskIterable = table.newScan().planFiles() + val taskIter = taskIterable.iterator() + if (!taskIter.hasNext) { + taskIterable.close() + return + } + + val task = taskIter.next().asInstanceOf[FileScanTask] + val deletePath = + table.locationProvider().newDataLocation(s"delete-${UUID.randomUUID().toString}.parquet") + val outputFile = table.io().newOutputFile(deletePath) + val encryptedOutput = table.encryption().encrypt(outputFile) + val appenderFactory = new GenericAppenderFactory(table.schema(), table.spec()) + val writer = + appenderFactory.newPosDeleteWriter(encryptedOutput, FileFormat.PARQUET, task.partition()) + + val delete = PositionDelete.create[Record]().set(task.file().location(), 0L, null) + writer.write(delete) + writer.close() + + val deleteFile = writer.toDeleteFile() + table.newRowDelta().addDeletes(deleteFile).commit() + taskIterable.close() + } } diff --git a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/BaseAuronIcebergSuite.scala b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/BaseAuronIcebergSuite.scala index 59ffbd5a2..cfe7070ac 100644 --- a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/BaseAuronIcebergSuite.scala +++ b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/BaseAuronIcebergSuite.scala @@ -22,14 +22,28 @@ import org.apache.spark.sql.test.SharedSparkSession trait BaseAuronIcebergSuite extends SharedSparkSession { override protected def sparkConf: SparkConf = { + val extraJavaOptions = + "--add-opens=java.base/java.lang=ALL-UNNAMED " + + "--add-opens=java.base/java.nio=ALL-UNNAMED " + + "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED " + + "-Dio.netty.tryReflectionSetAccessible=true" super.sparkConf .set( "spark.sql.extensions", - "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") + "org.apache.spark.sql.auron.AuronSparkSessionExtension," + + "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") .set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") .set("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") .set("spark.sql.catalog.local.type", "hadoop") .set("spark.sql.catalog.local.warehouse", "iceberg_warehouse") + .set("spark.auron.enabled", "true") + .set( + "spark.shuffle.manager", + "org.apache.spark.sql.execution.auron.shuffle.AuronShuffleManager") + .set("spark.auron.enable.shuffleExchange", "true") + .set("spark.auron.enable.project", "false") .set("spark.ui.enabled", "false") + .set("spark.driver.extraJavaOptions", extraJavaOptions) + .set("spark.executor.extraJavaOptions", extraJavaOptions) } } From af511fb62e7f575d916558238c6c5db41c0970ea Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Wed, 18 Feb 2026 10:42:25 +0800 Subject: [PATCH 2/7] [AURON #2015] Add Native Scan Support for Apache Iceberg Copy-On-Write Tables. Signed-off-by: slfan1989 --- .../apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala index e6e218661..aae3f576e 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.auron.iceberg import org.apache.spark.SPARK_VERSION import org.apache.spark.internal.Logging -import org.apache.spark.sql.auron.{AuronConvertProvider, AuronConverters} +import org.apache.spark.sql.auron.{AuronConverters, AuronConvertProvider} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.auron.plan.NativeIcebergTableScanExec import org.apache.spark.sql.execution.datasources.v2.BatchScanExec From 581d87c5231a9507c0a80934cf554f23202ceb1d Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Wed, 18 Feb 2026 10:59:26 +0800 Subject: [PATCH 3/7] [AURON #2015] Add Native Scan Support for Apache Iceberg Copy-On-Write Tables. Signed-off-by: slfan1989 --- .../scala/org/apache/auron/iceberg/BaseAuronIcebergSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/BaseAuronIcebergSuite.scala b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/BaseAuronIcebergSuite.scala index cfe7070ac..56508fd62 100644 --- a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/BaseAuronIcebergSuite.scala +++ b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/BaseAuronIcebergSuite.scala @@ -42,6 +42,7 @@ trait BaseAuronIcebergSuite extends SharedSparkSession { "org.apache.spark.sql.execution.auron.shuffle.AuronShuffleManager") .set("spark.auron.enable.shuffleExchange", "true") .set("spark.auron.enable.project", "false") + .set("auron.ui.enabled", "false") .set("spark.ui.enabled", "false") .set("spark.driver.extraJavaOptions", extraJavaOptions) .set("spark.executor.extraJavaOptions", extraJavaOptions) From d10732371616f997a0379890095cf939b57ca9e3 Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Wed, 18 Feb 2026 11:11:04 +0800 Subject: [PATCH 4/7] [AURON #2015] Add Native Scan Support for Apache Iceberg Copy-On-Write Tables. Signed-off-by: slfan1989 --- .../scala/org/apache/auron/iceberg/BaseAuronIcebergSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/BaseAuronIcebergSuite.scala b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/BaseAuronIcebergSuite.scala index 56508fd62..e88d96bf7 100644 --- a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/BaseAuronIcebergSuite.scala +++ b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/BaseAuronIcebergSuite.scala @@ -42,7 +42,7 @@ trait BaseAuronIcebergSuite extends SharedSparkSession { "org.apache.spark.sql.execution.auron.shuffle.AuronShuffleManager") .set("spark.auron.enable.shuffleExchange", "true") .set("spark.auron.enable.project", "false") - .set("auron.ui.enabled", "false") + .set("spark.auron.ui.enabled", "false") .set("spark.ui.enabled", "false") .set("spark.driver.extraJavaOptions", extraJavaOptions) .set("spark.executor.extraJavaOptions", extraJavaOptions) From 20e7f297d024a5b16be11c7fbe3f0f930966f58e Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Wed, 18 Feb 2026 11:24:25 +0800 Subject: [PATCH 5/7] [AURON #2015] Add Native Scan Support for Apache Iceberg Copy-On-Write Tables. Signed-off-by: slfan1989 --- .github/workflows/iceberg.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.github/workflows/iceberg.yml b/.github/workflows/iceberg.yml index 038968f6b..22de83219 100644 --- a/.github/workflows/iceberg.yml +++ b/.github/workflows/iceberg.yml @@ -74,7 +74,6 @@ jobs: -Piceberg -DicebergEnabled=true -DicebergVersion=1.10.1 - -DskipBuildNative -DskipTests - name: Test Iceberg Module @@ -86,7 +85,6 @@ jobs: -Piceberg -DicebergEnabled=true -DicebergVersion=1.10.1 - -DskipBuildNative - name: Upload reports if: failure() From e09a8bdaa2abdb8bb7039793f80049e6936a37d7 Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Wed, 18 Feb 2026 11:49:41 +0800 Subject: [PATCH 6/7] [AURON #2015] Add Native Scan Support for Apache Iceberg Copy-On-Write Tables. Signed-off-by: slfan1989 --- dev/reformat | 11 +++++++++-- .../auron/iceberg/AuronIcebergIntegrationSuite.scala | 6 +++--- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/dev/reformat b/dev/reformat index 5a463a639..fe2cd92a0 100755 --- a/dev/reformat +++ b/dev/reformat @@ -52,11 +52,18 @@ fi sparkver=spark-3.5 for celebornver in celeborn-0.5 celeborn-0.6 do - run_maven -P"${sparkver}" -Pceleborn,"${celebornver}" -Puniffle,uniffle-0.10 -Ppaimon,paimon-1.2 -Pflink,flink-1.18 -Piceberg,iceberg-1.9 + run_maven -P"${sparkver}" -P"${celebornver}" -Puniffle-0.10 -Ppaimon-1.2 -Pflink-1.18 -Piceberg -DicebergEnabled=true -DicebergVersion=1.10.1 done -sparkvers=(spark-3.0 spark-3.1 spark-3.2 spark-3.3 spark-3.4) +sparkvers=(spark-3.0 spark-3.1 spark-3.2 spark-3.3 spark-3.4 spark-3.5) +for sparkver in "${sparkvers[@]}" +do + run_maven -P"${sparkver}" +done + +SCALA_PROFILE=scala-2.13 +sparkvers=(spark-4.0 spark-4.1) for sparkver in "${sparkvers[@]}" do run_maven -P"${sparkver}" diff --git a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala index 1340795e6..23d57d5fa 100644 --- a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala +++ b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala @@ -99,16 +99,16 @@ class AuronIcebergIntegrationSuite sql("insert into local.db.t_v1 values (1, 'a'), (2, 'b')") val icebergTable = Spark3Util.loadIcebergTable(spark, "local.db.t_v1") val scanTasks = icebergTable.newScan().planFiles() - val allDeletesNull = + val allDeletesEmpty = try { scanTasks .iterator() .asScala - .forall(task => task.deletes() == null) + .forall(task => task.deletes() == null || task.deletes().isEmpty) } finally { scanTasks.close() } - assert(allDeletesNull) + assert(allDeletesEmpty) val df = sql("select * from local.db.t_v1") checkAnswer(df, Seq(Row(1, "a"), Row(2, "b"))) val plan = df.queryExecution.executedPlan.toString() From eb482aeba1c2e196b9fbbae9ece0ebfdb7fab3eb Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Wed, 18 Feb 2026 14:08:49 +0800 Subject: [PATCH 7/7] [AURON #2015] Add Native Scan Support for Apache Iceberg Copy-On-Write Tables. Signed-off-by: slfan1989 --- dev/reformat | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/dev/reformat b/dev/reformat index fe2cd92a0..d931b7695 100755 --- a/dev/reformat +++ b/dev/reformat @@ -48,7 +48,7 @@ else cargo fmt --all -q -- fi -# Check or format all code, including third-party code, with spark-3.5 +# Check or format all code, including third-party code, with spark-3.4 sparkver=spark-3.5 for celebornver in celeborn-0.5 celeborn-0.6 do @@ -56,14 +56,7 @@ do done -sparkvers=(spark-3.0 spark-3.1 spark-3.2 spark-3.3 spark-3.4 spark-3.5) -for sparkver in "${sparkvers[@]}" -do - run_maven -P"${sparkver}" -done - -SCALA_PROFILE=scala-2.13 -sparkvers=(spark-4.0 spark-4.1) +sparkvers=(spark-3.0 spark-3.1 spark-3.2 spark-3.3 spark-3.5) for sparkver in "${sparkvers[@]}" do run_maven -P"${sparkver}"