diff --git a/.github/workflows/iceberg.yml b/.github/workflows/iceberg.yml
new file mode 100644
index 000000000..22de83219
--- /dev/null
+++ b/.github/workflows/iceberg.yml
@@ -0,0 +1,94 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+name: Iceberg
+
+on:
+ workflow_dispatch:
+ push:
+ branches:
+ - master
+ - branch-*
+ pull_request:
+ branches:
+ - master
+ - branch-*
+
+concurrency:
+ group: iceberg-${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
+ cancel-in-progress: true
+
+jobs:
+ test-iceberg:
+ name: Test Iceberg (${{ matrix.sparkver }} / JDK${{ matrix.javaver }} / Scala${{ matrix.scalaver }})
+ runs-on: ubuntu-24.04
+ strategy:
+ fail-fast: false
+ matrix:
+ include:
+ - sparkver: "3.4"
+ scalaver: "2.12"
+ javaver: "17"
+ - sparkver: "3.5"
+ scalaver: "2.12"
+ javaver: "17"
+ - sparkver: "3.5"
+ scalaver: "2.12"
+ javaver: "21"
+ - sparkver: "4.0"
+ scalaver: "2.13"
+ javaver: "21"
+
+ steps:
+ - name: Checkout Auron
+ uses: actions/checkout@v6
+
+ - name: Setup Java and Maven cache
+ uses: actions/setup-java@v5
+ with:
+ distribution: 'adopt-hotspot'
+ java-version: ${{ matrix.javaver }}
+ cache: 'maven'
+
+ - name: Build dependencies (skip tests)
+ run: >
+ ./build/mvn -B install
+ -pl thirdparty/auron-iceberg
+ -am
+ -Pscala-${{ matrix.scalaver }}
+ -Pspark-${{ matrix.sparkver }}
+ -Piceberg
+ -DicebergEnabled=true
+ -DicebergVersion=1.10.1
+ -DskipTests
+
+ - name: Test Iceberg Module
+ run: >
+ ./build/mvn -B test
+ -pl thirdparty/auron-iceberg
+ -Pscala-${{ matrix.scalaver }}
+ -Pspark-${{ matrix.sparkver }}
+ -Piceberg
+ -DicebergEnabled=true
+ -DicebergVersion=1.10.1
+
+ - name: Upload reports
+ if: failure()
+ uses: actions/upload-artifact@v6
+ with:
+ name: auron-iceberg-${{ matrix.sparkver }}-jdk${{ matrix.javaver }}-test-report
+ path: thirdparty/auron-iceberg/target/surefire-reports
diff --git a/auron-build.sh b/auron-build.sh
index 47e4a5eab..413698fa5 100755
--- a/auron-build.sh
+++ b/auron-build.sh
@@ -37,7 +37,7 @@ SUPPORTED_CELEBORN_VERSIONS=("0.5" "0.6")
SUPPORTED_UNIFFLE_VERSIONS=("0.10")
SUPPORTED_PAIMON_VERSIONS=("1.2")
SUPPORTED_FLINK_VERSIONS=("1.18")
-SUPPORTED_ICEBERG_VERSIONS=("1.9")
+SUPPORTED_ICEBERG_VERSIONS=("1.10.1")
# -----------------------------------------------------------------------------
# Function: print_help
@@ -270,11 +270,11 @@ while [[ $# -gt 0 ]]; do
print_invalid_option_error Iceberg "$ICEBERG_VER" "${SUPPORTED_ICEBERG_VERSIONS[@]}"
fi
if [ -z "$SPARK_VER" ]; then
- echo "ERROR: Building iceberg requires spark at the same time, and only Spark versions 3.4 or 3.5 are supported."
+ echo "ERROR: Building iceberg requires spark at the same time, and only Spark versions 3.4 to 4.0 are supported."
exit 1
fi
- if [ "$SPARK_VER" != "3.4" ] && [ "$SPARK_VER" != "3.5" ]; then
- echo "ERROR: Building iceberg requires spark versions are 3.4 or 3.5."
+ if [ "$SPARK_VER" != "3.4" ] && [ "$SPARK_VER" != "3.5" ] && [ "$SPARK_VER" != "4.0" ]; then
+ echo "ERROR: Building iceberg requires spark versions are 3.4, 3.5 or 4.0."
exit 1
fi
shift 2
@@ -393,7 +393,7 @@ if [[ -n "$FLINK_VER" ]]; then
BUILD_ARGS+=("-Pflink-$FLINK_VER")
fi
if [[ -n "$ICEBERG_VER" ]]; then
- BUILD_ARGS+=("-Piceberg-$ICEBERG_VER")
+ BUILD_ARGS+=("-Piceberg" "-DicebergEnabled=true" "-DicebergVersion=$ICEBERG_VER")
fi
MVN_ARGS=("${CLEAN_ARGS[@]}" "${BUILD_ARGS[@]}")
diff --git a/dev/reformat b/dev/reformat
index 5a463a639..d931b7695 100755
--- a/dev/reformat
+++ b/dev/reformat
@@ -48,15 +48,15 @@ else
cargo fmt --all -q --
fi
-# Check or format all code, including third-party code, with spark-3.5
+# Check or format all code, including third-party code, with spark-3.4
sparkver=spark-3.5
for celebornver in celeborn-0.5 celeborn-0.6
do
- run_maven -P"${sparkver}" -Pceleborn,"${celebornver}" -Puniffle,uniffle-0.10 -Ppaimon,paimon-1.2 -Pflink,flink-1.18 -Piceberg,iceberg-1.9
+ run_maven -P"${sparkver}" -P"${celebornver}" -Puniffle-0.10 -Ppaimon-1.2 -Pflink-1.18 -Piceberg -DicebergEnabled=true -DicebergVersion=1.10.1
done
-sparkvers=(spark-3.0 spark-3.1 spark-3.2 spark-3.3 spark-3.4)
+sparkvers=(spark-3.0 spark-3.1 spark-3.2 spark-3.3 spark-3.5)
for sparkver in "${sparkvers[@]}"
do
run_maven -P"${sparkver}"
diff --git a/pom.xml b/pom.xml
index c93da9234..b9bfd6871 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,6 +75,11 @@
2.30.0
1.3.0
3.1.0
+ 1.10.1
+ false
+ pre
+ false
+
3.6.0
2.1.1
@@ -206,6 +211,16 @@
arrow-compression
${arrowVersion}
+
+ org.apache.arrow
+ arrow-memory-core
+ ${arrowVersion}
+
+
+ org.apache.arrow
+ arrow-memory-netty
+ ${arrowVersion}
+
org.apache.arrow
arrow-memory-unsafe
@@ -779,6 +794,32 @@
3.0
4.1.47.Final
+
+
+
+ org.apache.maven.plugins
+ maven-enforcer-plugin
+ ${maven-enforcer-plugin.version}
+
+
+ spark30-disallow-iceberg
+
+ enforce
+
+
+
+
+ icebergEnabled
+ false
+ Spark 3.0 is not supported for Iceberg. Use Spark 3.4-4.0 or disable icebergEnabled.
+
+
+
+
+
+
+
+
@@ -790,6 +831,32 @@
3.1
4.1.51.Final
+
+
+
+ org.apache.maven.plugins
+ maven-enforcer-plugin
+ ${maven-enforcer-plugin.version}
+
+
+ spark31-disallow-iceberg
+
+ enforce
+
+
+
+
+ icebergEnabled
+ false
+ Spark 3.1 is not supported for Iceberg. Use Spark 3.4-4.0 or disable icebergEnabled.
+
+
+
+
+
+
+
+
@@ -801,6 +868,32 @@
3.2
4.1.68.Final
+
+
+
+ org.apache.maven.plugins
+ maven-enforcer-plugin
+ ${maven-enforcer-plugin.version}
+
+
+ spark32-disallow-iceberg
+
+ enforce
+
+
+
+
+ icebergEnabled
+ false
+ Spark 3.2 is not supported for Iceberg. Use Spark 3.4-4.0 or disable icebergEnabled.
+
+
+
+
+
+
+
+
@@ -812,6 +905,32 @@
3.3
4.1.74.Final
+
+
+
+ org.apache.maven.plugins
+ maven-enforcer-plugin
+ ${maven-enforcer-plugin.version}
+
+
+ spark33-disallow-iceberg
+
+ enforce
+
+
+
+
+ icebergEnabled
+ false
+ Spark 3.3 is not supported for Iceberg. Use Spark 3.4-4.0 or disable icebergEnabled.
+
+
+
+
+
+
+
+
@@ -822,7 +941,34 @@
3.4.4
3.4
4.1.87.Final
+ 1.10.1
+
+
+
+ org.apache.maven.plugins
+ maven-enforcer-plugin
+ ${maven-enforcer-plugin.version}
+
+
+ spark34-enforce-iceberg-version
+
+ enforce
+
+
+
+
+ icebergVersion
+ 1\.10\.1
+ Spark 3.4 requires Iceberg 1.10.1. Current: ${icebergVersion}
+
+
+
+
+
+
+
+
@@ -833,7 +979,34 @@
3.5.8
3.5
4.1.96.Final
+ 1.10.1
+
+
+
+ org.apache.maven.plugins
+ maven-enforcer-plugin
+ ${maven-enforcer-plugin.version}
+
+
+ spark35-enforce-iceberg-version
+
+ enforce
+
+
+
+
+ icebergVersion
+ 1\.10\.1
+ Spark 3.5 requires Iceberg 1.10.1. Current: ${icebergVersion}
+
+
+
+
+
+
+
+
@@ -844,6 +1017,7 @@
4.0.2
4.0
4.1.111.Final
+ 1.10.1
@@ -872,6 +1046,21 @@
+
+ spark40-enforce-iceberg-version
+
+ enforce
+
+
+
+
+ icebergVersion
+ 1\.10\.1
+ Spark 4.0 requires Iceberg 1.10.1. Current: ${icebergVersion}
+
+
+
+
@@ -914,6 +1103,21 @@
+
+ spark41-disallow-iceberg
+
+ enforce
+
+
+
+
+ icebergEnabled
+ false
+ Spark 4.1 is not supported for Iceberg. Use Spark 3.4-4.0 or disable icebergEnabled.
+
+
+
+
@@ -1110,13 +1314,16 @@
- iceberg-1.9
+ iceberg
+
+
+ icebergEnabled
+ true
+
+
thirdparty/auron-iceberg
-
- 1.9.2
-
diff --git a/spark-extension/pom.xml b/spark-extension/pom.xml
index 0d6cf7c1e..7d26257c9 100644
--- a/spark-extension/pom.xml
+++ b/spark-extension/pom.xml
@@ -80,6 +80,14 @@
org.apache.arrow
arrow-compression
+
+ org.apache.arrow
+ arrow-memory-core
+
+
+ org.apache.arrow
+ arrow-memory-netty
+
org.apache.arrow
arrow-memory-unsafe
diff --git a/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
index 8846700f1..3cc1e534a 100644
--- a/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
+++ b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
@@ -346,6 +346,12 @@ public class SparkAuronConfiguration extends AuronConfiguration {
.withDescription("Enable PaimonScanExec operation conversion to native Auron implementations.")
.withDefaultValue(true);
+ public static final ConfigOption ENABLE_ICEBERG_SCAN = new SQLConfOption<>(Boolean.class)
+ .withKey("auron.enable.iceberg.scan")
+ .withCategory("Operator Supports")
+ .withDescription("Enable Iceberg scan operation conversion to native Auron implementations.")
+ .withDefaultValue(true);
+
public static final ConfigOption ENABLE_PROJECT = new SQLConfOption<>(Boolean.class)
.withKey("auron.enable.project")
.withCategory("Operator Supports")
diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
index def645d51..8ff41836b 100644
--- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
+++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
@@ -126,7 +126,9 @@ object AuronConverters extends Logging {
Seq("AuronShuffleManager", "AuronUniffleShuffleManager", "AuronCelebornShuffleManager")
def supportedShuffleManager: Boolean = {
- val name = SQLConf.get.getConfString(config.SHUFFLE_MANAGER.key)
+ val name = SQLConf.get.getConfString(
+ config.SHUFFLE_MANAGER.key,
+ config.SHUFFLE_MANAGER.defaultValueString)
supportedShuffleManagers.exists(name.contains)
}
diff --git a/thirdparty/auron-iceberg/pom.xml b/thirdparty/auron-iceberg/pom.xml
index 44b247ae0..bb686f481 100644
--- a/thirdparty/auron-iceberg/pom.xml
+++ b/thirdparty/auron-iceberg/pom.xml
@@ -31,6 +31,11 @@
Apache Auron Iceberg ${icebergVersion} ${scalaVersion}
+
+ org.apache.auron
+ spark-extension_${scalaVersion}
+ ${project.version}
+
org.apache.iceberg
iceberg-spark-runtime-${shortSparkVersion}_${scalaVersion}
@@ -70,6 +75,49 @@
spark-sql_${scalaVersion}
test-jar
+
+ org.apache.auron
+ spark-extension-shims-spark_${scalaVersion}
+ ${project.version}
+ test
+
+
+
+
+ org.apache.maven.plugins
+ maven-enforcer-plugin
+ ${maven-enforcer-plugin.version}
+
+
+ iceberg-spark-version-compat
+
+ enforce
+
+
+
+
+ icebergEnabled
+ true
+ Iceberg module requires icebergEnabled=true.
+
+
+ shortSparkVersion
+ 3\.4|3\.5|4\.0
+ Iceberg integration supports Spark 3.4-4.0 only. Current: ${shortSparkVersion}
+
+
+ icebergVersion
+ 1\.10\.1
+ Iceberg integration supports only Iceberg 1.10.1. Current: ${icebergVersion}
+
+
+
+
+
+
+
+
+
diff --git a/thirdparty/auron-iceberg/src/main/resources/META-INF/services/org.apache.spark.sql.auron.AuronConvertProvider b/thirdparty/auron-iceberg/src/main/resources/META-INF/services/org.apache.spark.sql.auron.AuronConvertProvider
new file mode 100644
index 000000000..409e48d68
--- /dev/null
+++ b/thirdparty/auron-iceberg/src/main/resources/META-INF/services/org.apache.spark.sql.auron.AuronConvertProvider
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.spark.sql.auron.iceberg.IcebergConvertProvider
diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala
new file mode 100644
index 000000000..aae3f576e
--- /dev/null
+++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.auron.iceberg
+
+import org.apache.spark.SPARK_VERSION
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.auron.{AuronConverters, AuronConvertProvider}
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.auron.plan.NativeIcebergTableScanExec
+import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+
+import org.apache.auron.spark.configuration.SparkAuronConfiguration
+import org.apache.auron.util.SemanticVersion
+
+class IcebergConvertProvider extends AuronConvertProvider with Logging {
+
+ override def isEnabled: Boolean = {
+ val enabled = SparkAuronConfiguration.ENABLE_ICEBERG_SCAN.get()
+ if (!enabled) {
+ return false
+ }
+ if (!sparkCompatible) {
+ logWarning(
+ s"Disable Iceberg native scan: Spark $SPARK_VERSION is not supported. " +
+ s"Supported Spark versions: 3.4 to 4.0 (Iceberg ${icebergVersionOrUnknown}).")
+ return false
+ }
+ true
+ }
+
+ override def isSupported(exec: SparkPlan): Boolean = {
+ exec match {
+ case e: BatchScanExec => IcebergScanSupport.plan(e).nonEmpty
+ case _ => false
+ }
+ }
+
+ override def convert(exec: SparkPlan): SparkPlan = {
+ exec match {
+ case e: BatchScanExec =>
+ IcebergScanSupport.plan(e) match {
+ case Some(plan) =>
+ AuronConverters.addRenameColumnsExec(NativeIcebergTableScanExec(e, plan))
+ case None =>
+ exec
+ }
+ case _ => exec
+ }
+ }
+
+ private lazy val sparkCompatible: Boolean = {
+ SemanticVersion(SPARK_VERSION) >= "3.4" && SemanticVersion(SPARK_VERSION) < "4.1"
+ }
+
+ private def icebergVersionOrUnknown: String = {
+ val pkg = classOf[org.apache.iceberg.Table].getPackage
+ val version = if (pkg != null) pkg.getImplementationVersion else null
+ Option(version).getOrElse("unknown")
+ }
+}
diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala
new file mode 100644
index 000000000..7386161e7
--- /dev/null
+++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.auron.iceberg
+
+import scala.collection.JavaConverters._
+
+import org.apache.iceberg.{FileFormat, FileScanTask, MetadataColumns}
+import org.apache.iceberg.expressions.Expressions
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.auron.NativeConverters
+import org.apache.spark.sql.connector.read.InputPartition
+import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+import org.apache.spark.sql.types.StructType
+
+final case class IcebergScanPlan(
+ fileTasks: Seq[FileScanTask],
+ fileFormat: FileFormat,
+ readSchema: StructType)
+
+object IcebergScanSupport extends Logging {
+
+ def plan(exec: BatchScanExec): Option[IcebergScanPlan] = {
+ val scan = exec.scan
+ val scanClassName = scan.getClass.getName
+ // Only handle Iceberg scans; other sources must stay on Spark's path.
+ if (!scanClassName.startsWith("org.apache.iceberg.spark.source.")) {
+ return None
+ }
+
+ // Changelog scan carries row-level changes; not supported by native COW-only path.
+ if (scanClassName == "org.apache.iceberg.spark.source.SparkChangelogScan") {
+ return None
+ }
+
+ val readSchema = scan.readSchema
+ // Native scan does not support Iceberg metadata columns (e.g. _file, _pos).
+ if (hasMetadataColumns(readSchema)) {
+ return None
+ }
+
+ if (!readSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType))) {
+ return None
+ }
+
+ val partitions = inputPartitions(exec)
+ // Empty scan (e.g. empty table) should still build a plan to return no rows.
+ if (partitions.isEmpty) {
+ return Some(IcebergScanPlan(Seq.empty, FileFormat.PARQUET, readSchema))
+ }
+
+ val icebergPartitions = partitions.flatMap(icebergPartition)
+ // All partitions must be Iceberg SparkInputPartition; otherwise fallback.
+ if (icebergPartitions.size != partitions.size) {
+ return None
+ }
+
+ val fileTasks = icebergPartitions.flatMap(_.fileTasks)
+
+ // Native scan does not apply delete files; only allow pure data files (COW).
+ if (!fileTasks.forall(task => task.deletes() == null || task.deletes().isEmpty)) {
+ return None
+ }
+
+ // Residual filters require row-level evaluation, not supported in native scan.
+ if (!fileTasks.forall(task => Expressions.alwaysTrue().equals(task.residual()))) {
+ return None
+ }
+
+ // Native scan handles a single file format; mixed formats must fallback.
+ val formats = fileTasks.map(_.file().format()).distinct
+ if (formats.size > 1) {
+ return None
+ }
+
+ val format = formats.headOption.getOrElse(FileFormat.PARQUET)
+ if (format != FileFormat.PARQUET && format != FileFormat.ORC) {
+ return None
+ }
+
+ Some(IcebergScanPlan(fileTasks, format, readSchema))
+ }
+
+ private def hasMetadataColumns(schema: StructType): Boolean =
+ schema.fields.exists(field => MetadataColumns.isMetadataColumn(field.name))
+
+ private def inputPartitions(exec: BatchScanExec): Seq[InputPartition] = {
+ // Prefer DataSource V2 batch API; if not available, fallback to exec methods via reflection.
+ val fromBatch =
+ try {
+ val batch = exec.scan.toBatch
+ if (batch != null) {
+ batch.planInputPartitions().toSeq
+ } else {
+ Seq.empty
+ }
+ } catch {
+ case _: Throwable => Seq.empty
+ }
+ if (fromBatch.nonEmpty) {
+ return fromBatch
+ }
+
+ // Some Spark versions expose partitions through inputPartitions/partitions methods on BatchScanExec.
+ val methods = exec.getClass.getMethods
+ val inputPartitionsMethod = methods.find(_.getName == "inputPartitions")
+ val partitionsMethod = methods.find(_.getName == "partitions")
+
+ val raw = inputPartitionsMethod
+ .orElse(partitionsMethod)
+ .map(_.invoke(exec))
+ .getOrElse(Seq.empty)
+
+ // Normalize to Seq[InputPartition], flattening nested Seq if needed.
+ raw match {
+ case seq: scala.collection.Seq[_]
+ if seq.nonEmpty &&
+ seq.head.isInstanceOf[scala.collection.Seq[_]] =>
+ seq.asInstanceOf[scala.collection.Seq[scala.collection.Seq[InputPartition]]].flatten.toSeq
+ case seq: scala.collection.Seq[_] =>
+ seq.asInstanceOf[scala.collection.Seq[InputPartition]].toSeq
+ case _ =>
+ Seq.empty
+ }
+ }
+
+ private case class IcebergPartitionView(fileTasks: Seq[FileScanTask])
+
+ private def icebergPartition(partition: InputPartition): Option[IcebergPartitionView] = {
+ val className = partition.getClass.getName
+ // Only accept Iceberg SparkInputPartition to access task groups.
+ if (className != "org.apache.iceberg.spark.source.SparkInputPartition") {
+ return None
+ }
+
+ try {
+ // SparkInputPartition is package-private; use reflection to read its task group.
+ val taskGroupField = partition.getClass.getDeclaredField("taskGroup")
+ taskGroupField.setAccessible(true)
+ val taskGroup = taskGroupField.get(partition)
+
+ // Extract tasks and keep only file scan tasks.
+ val tasksMethod = taskGroup.getClass.getDeclaredMethod("tasks")
+ tasksMethod.setAccessible(true)
+ val tasks = tasksMethod.invoke(taskGroup).asInstanceOf[java.util.Collection[_]].asScala
+ val fileTasks = tasks.collect { case task: FileScanTask => task }.toSeq
+
+ // If any task is not a FileScanTask, fallback.
+ if (fileTasks.size != tasks.size) {
+ return None
+ }
+
+ Some(IcebergPartitionView(fileTasks))
+ } catch {
+ case _: ReflectiveOperationException => None
+ }
+ }
+}
diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala
new file mode 100644
index 000000000..63927e0b0
--- /dev/null
+++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.auron.plan
+
+import java.net.URI
+import java.security.PrivilegedExceptionAction
+import java.util.Locale
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.FileSystem
+import org.apache.iceberg.{FileFormat, FileScanTask}
+import org.apache.spark.Partition
+import org.apache.spark.TaskContext
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.auron.{EmptyNativeRDD, NativeConverters, NativeHelper, NativeRDD, NativeSupports, Shims}
+import org.apache.spark.sql.auron.iceberg.IcebergScanPlan
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.LeafExecNode
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
+import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+import org.apache.auron.{protobuf => pb}
+import org.apache.auron.jni.JniBridge
+import org.apache.auron.metric.SparkMetricNode
+
+case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergScanPlan)
+ extends LeafExecNode
+ with NativeSupports
+ with Logging {
+
+ override lazy val metrics: Map[String, SQLMetric] =
+ NativeHelper.getNativeFileScanMetrics(sparkContext)
+
+ override val output = basedScan.output
+ override val outputPartitioning = basedScan.outputPartitioning
+
+ private lazy val readSchema: StructType = plan.readSchema
+ private lazy val fileTasks: Seq[FileScanTask] = plan.fileTasks
+
+ private lazy val partitions: Array[FilePartition] = buildFilePartitions()
+ private lazy val fileSizes: Map[String, Long] = buildFileSizes()
+
+ private lazy val nativeFileSchema: pb.Schema = NativeConverters.convertSchema(readSchema)
+ private lazy val nativePartitionSchema: pb.Schema =
+ NativeConverters.convertSchema(StructType(Nil))
+
+ private lazy val caseSensitive: Boolean = SQLConf.get.caseSensitiveAnalysis
+
+ private lazy val fieldIndexByName: Map[String, Int] = {
+ if (caseSensitive) {
+ readSchema.fieldNames.zipWithIndex.toMap
+ } else {
+ readSchema.fieldNames.map(_.toLowerCase(Locale.ROOT)).zipWithIndex.toMap
+ }
+ }
+
+ private def fieldIndexFor(name: String): Int = {
+ if (caseSensitive) {
+ fieldIndexByName.getOrElse(name, readSchema.fieldIndex(name))
+ } else {
+ fieldIndexByName.getOrElse(name.toLowerCase(Locale.ROOT), readSchema.fieldIndex(name))
+ }
+ }
+
+ private lazy val projection: Seq[Integer] =
+ output.map(attr => Integer.valueOf(fieldIndexFor(attr.name)))
+
+ private lazy val nativeFileGroups: FilePartition => pb.FileGroup = (partition: FilePartition) =>
+ {
+ val nativePartitionedFile = (file: PartitionedFile) => {
+ val filePath = file.filePath.toString
+ val size = fileSizes.getOrElse(filePath, file.length)
+ pb.PartitionedFile
+ .newBuilder()
+ .setPath(filePath)
+ .setSize(size)
+ .setLastModifiedNs(0)
+ .setRange(
+ pb.FileRange
+ .newBuilder()
+ .setStart(file.start)
+ .setEnd(file.start + file.length)
+ .build())
+ .build()
+ }
+ pb.FileGroup
+ .newBuilder()
+ .addAllFiles(partition.files.map(nativePartitionedFile).toList.asJava)
+ .build()
+ }
+
+ override def doExecuteNative(): NativeRDD = {
+ if (partitions.isEmpty) {
+ return new EmptyNativeRDD(sparkContext)
+ }
+
+ val nativeMetrics = SparkMetricNode(
+ metrics,
+ Nil,
+ Some({
+ case ("bytes_scanned", v) =>
+ val inputMetric = TaskContext.get.taskMetrics().inputMetrics
+ inputMetric.incBytesRead(v)
+ case ("output_rows", v) =>
+ val inputMetric = TaskContext.get.taskMetrics().inputMetrics
+ inputMetric.incRecordsRead(v)
+ case _ =>
+ }))
+
+ val fileFormat = plan.fileFormat
+ val broadcastedHadoopConf = this.broadcastedHadoopConf
+ val numPartitions = partitions.length
+
+ // Build per-partition native scan plans and execute them via NativeRDD.
+ new NativeRDD(
+ sparkContext,
+ nativeMetrics,
+ partitions.asInstanceOf[Array[Partition]],
+ None,
+ Nil,
+ rddShuffleReadFull = true,
+ (partition, _) => {
+ // Register the Hadoop conf for native readers via a per-task resource id.
+ val resourceId = s"NativeIcebergTableScan:${UUID.randomUUID().toString}"
+ putJniBridgeResource(resourceId, broadcastedHadoopConf)
+
+ // Convert Spark FilePartition to native FileGroup.
+ val nativeFileGroup = nativeFileGroups(partition.asInstanceOf[FilePartition])
+ val nativeFileScanConf = pb.FileScanExecConf
+ .newBuilder()
+ .setNumPartitions(numPartitions)
+ .setPartitionIndex(partition.index)
+ .setStatistics(pb.Statistics.getDefaultInstance)
+ .setSchema(nativeFileSchema)
+ .setFileGroup(nativeFileGroup)
+ .addAllProjection(projection.asJava)
+ .setPartitionSchema(nativePartitionSchema)
+ .build()
+
+ // Choose a native scan node based on file format.
+ if (fileFormat == FileFormat.ORC) {
+ val nativeOrcScanExecBuilder = pb.OrcScanExecNode
+ .newBuilder()
+ .setBaseConf(nativeFileScanConf)
+ .setFsResourceId(resourceId)
+ // No pruning predicates are pushed down in the native scan yet.
+ .addAllPruningPredicates(new java.util.ArrayList())
+
+ pb.PhysicalPlanNode
+ .newBuilder()
+ .setOrcScan(nativeOrcScanExecBuilder.build())
+ .build()
+ } else {
+ val nativeParquetScanExecBuilder = pb.ParquetScanExecNode
+ .newBuilder()
+ .setBaseConf(nativeFileScanConf)
+ .setFsResourceId(resourceId)
+ // No pruning predicates are pushed down in the native scan yet.
+ .addAllPruningPredicates(new java.util.ArrayList())
+
+ pb.PhysicalPlanNode
+ .newBuilder()
+ .setParquetScan(nativeParquetScanExecBuilder.build())
+ .build()
+ }
+ },
+ friendlyName = "NativeRDD.IcebergScan")
+ }
+
+ override val nodeName: String = "NativeIcebergTableScan"
+
+ // Delegate canonicalization to the original scan to keep plan equivalence checks consistent.
+ override protected def doCanonicalize(): SparkPlan = basedScan.canonicalized
+
+ private def buildFileSizes(): Map[String, Long] = {
+ // Map file path to full file size; tasks may split a file into multiple ranges.
+ fileTasks
+ .map(task => task.file().location() -> task.file().fileSizeInBytes())
+ .groupBy(_._1)
+ .mapValues(_.head._2)
+ .toMap
+ }
+
+ private def buildFilePartitions(): Array[FilePartition] = {
+ // Convert Iceberg file tasks into Spark FilePartition groups for execution.
+ if (fileTasks.isEmpty) {
+ return Array.empty
+ }
+
+ val sparkSession = Shims.get.getSqlContext(basedScan).sparkSession
+ val maxSplitBytes = getMaxSplitBytes(sparkSession, fileTasks)
+ val partitionedFiles = fileTasks
+ .map { task =>
+ val filePath = task.file().location()
+ Shims.get.getPartitionedFile(InternalRow.empty, filePath, task.start(), task.length())
+ }
+ .sortBy(_.length)(Ordering[Long].reverse)
+ .toSeq
+
+ FilePartition.getFilePartitions(sparkSession, partitionedFiles, maxSplitBytes).toArray
+ }
+
+ private def getMaxSplitBytes(sparkSession: SparkSession, tasks: Seq[FileScanTask]): Long = {
+ val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
+ val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
+ val minPartitionNum = Shims.get.getMinPartitionNum(sparkSession)
+ val totalBytes = tasks
+ .map(task => task.file().fileSizeInBytes() + openCostInBytes)
+ .sum
+ val bytesPerCore = if (minPartitionNum > 0) totalBytes / minPartitionNum else totalBytes
+
+ Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
+ }
+
+ private def putJniBridgeResource(
+ resourceId: String,
+ broadcastedHadoopConf: Broadcast[SerializableConfiguration]): Unit = {
+ val sharedConf = broadcastedHadoopConf.value.value
+ JniBridge.putResource(
+ resourceId,
+ (location: String) => {
+ val getFsTimeMetric = metrics("io_time_getfs")
+ val currentTimeMillis = System.currentTimeMillis()
+ val fs = NativeHelper.currentUser.doAs(new PrivilegedExceptionAction[FileSystem] {
+ override def run(): FileSystem = FileSystem.get(new URI(location), sharedConf)
+ })
+ getFsTimeMetric.add((System.currentTimeMillis() - currentTimeMillis) * 1000000)
+ fs
+ })
+ }
+
+ private def broadcastedHadoopConf: Broadcast[SerializableConfiguration] = {
+ val sparkSession = Shims.get.getSqlContext(basedScan).sparkSession
+ val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(Map.empty)
+ sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
+ }
+}
diff --git a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala
index d16d47b4f..23d57d5fa 100644
--- a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala
+++ b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala
@@ -16,6 +16,14 @@
*/
package org.apache.auron.iceberg
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+
+import org.apache.iceberg.{FileFormat, FileScanTask}
+import org.apache.iceberg.data.{GenericAppenderFactory, Record}
+import org.apache.iceberg.deletes.PositionDelete
+import org.apache.iceberg.spark.Spark3Util
import org.apache.spark.sql.Row
class AuronIcebergIntegrationSuite
@@ -31,4 +39,184 @@ class AuronIcebergIntegrationSuite
}
}
+ test("iceberg native scan is applied for simple COW table") {
+ withTable("local.db.t2") {
+ sql("create table local.db.t2 using iceberg as select 1 as id, 'a' as v")
+ val df = sql("select * from local.db.t2")
+ df.collect()
+ val plan = df.queryExecution.executedPlan.toString()
+ assert(plan.contains("NativeIcebergTableScan"))
+ }
+ }
+
+ test("iceberg native scan is applied for projection on COW table") {
+ withTable("local.db.t3") {
+ sql("create table local.db.t3 using iceberg as select 1 as id, 'a' as v")
+ val df = sql("select id from local.db.t3")
+ checkAnswer(df, Seq(Row(1)))
+ val plan = df.queryExecution.executedPlan.toString()
+ assert(plan.contains("NativeIcebergTableScan"))
+ }
+ }
+
+ test("iceberg native scan is applied for partitioned COW table with filter") {
+ withTable("local.db.t_partition") {
+ sql("""
+ |create table local.db.t_partition (id int, v string, p string)
+ |using iceberg
+ |partitioned by (p)
+ |""".stripMargin)
+ sql("insert into local.db.t_partition values (1, 'a', 'p1'), (2, 'b', 'p2')")
+ val df = sql("select * from local.db.t_partition where p = 'p1'")
+ checkAnswer(df, Seq(Row(1, "a", "p1")))
+ val plan = df.queryExecution.executedPlan.toString()
+ assert(plan.contains("NativeIcebergTableScan"))
+ }
+ }
+
+ test("iceberg native scan is applied for ORC COW table") {
+ withTable("local.db.t_orc") {
+ sql("""
+ |create table local.db.t_orc (id int, v string)
+ |using iceberg
+ |tblproperties ('write.format.default' = 'orc')
+ |""".stripMargin)
+ sql("insert into local.db.t_orc values (1, 'a'), (2, 'b')")
+ val df = sql("select * from local.db.t_orc")
+ checkAnswer(df, Seq(Row(1, "a"), Row(2, "b")))
+ val plan = df.queryExecution.executedPlan.toString()
+ assert(plan.contains("NativeIcebergTableScan"))
+ }
+ }
+
+ test("iceberg native scan is applied when delete files are null (format v1)") {
+ withTable("local.db.t_v1") {
+ sql("""
+ |create table local.db.t_v1 (id int, v string)
+ |using iceberg
+ |tblproperties ('format-version' = '1')
+ |""".stripMargin)
+ sql("insert into local.db.t_v1 values (1, 'a'), (2, 'b')")
+ val icebergTable = Spark3Util.loadIcebergTable(spark, "local.db.t_v1")
+ val scanTasks = icebergTable.newScan().planFiles()
+ val allDeletesEmpty =
+ try {
+ scanTasks
+ .iterator()
+ .asScala
+ .forall(task => task.deletes() == null || task.deletes().isEmpty)
+ } finally {
+ scanTasks.close()
+ }
+ assert(allDeletesEmpty)
+ val df = sql("select * from local.db.t_v1")
+ checkAnswer(df, Seq(Row(1, "a"), Row(2, "b")))
+ val plan = df.queryExecution.executedPlan.toString()
+ assert(plan.contains("NativeIcebergTableScan"))
+ }
+ }
+
+ test("iceberg scan falls back for residual filters on data columns") {
+ withTable("local.db.t_residual") {
+ sql("create table local.db.t_residual (id int, v string) using iceberg")
+ sql("insert into local.db.t_residual values (1, 'a'), (2, 'b')")
+ val df = sql("select * from local.db.t_residual where v = 'a'")
+ checkAnswer(df, Seq(Row(1, "a")))
+ val plan = df.queryExecution.executedPlan.toString()
+ assert(!plan.contains("NativeIcebergTableScan"))
+ }
+ }
+
+ test("iceberg scan falls back when reading metadata columns") {
+ withTable("local.db.t4") {
+ sql("create table local.db.t4 using iceberg as select 1 as id, 'a' as v")
+ val df = sql("select _file from local.db.t4")
+ df.collect()
+ val plan = df.queryExecution.executedPlan.toString()
+ assert(!plan.contains("NativeIcebergTableScan"))
+ }
+ }
+
+ test("iceberg scan falls back for unsupported decimal types") {
+ withTable("local.db.t5") {
+ sql("create table local.db.t5 (id int, amount decimal(38, 10)) using iceberg")
+ sql("insert into local.db.t5 values (1, 123.45)")
+ val df = sql("select * from local.db.t5")
+ checkAnswer(df, Seq(Row(1, new java.math.BigDecimal("123.4500000000"))))
+ val plan = df.queryExecution.executedPlan.toString()
+ assert(!plan.contains("NativeIcebergTableScan"))
+ }
+ }
+
+ test("iceberg scan falls back when delete files exist") {
+ withTable("local.db.t_delete") {
+ sql("""
+ |create table local.db.t_delete (id int, v string)
+ |using iceberg
+ |tblproperties (
+ | 'format-version' = '2',
+ | 'write.delete.mode' = 'merge-on-read'
+ |)
+ |""".stripMargin)
+ sql("insert into local.db.t_delete values (1, 'a'), (2, 'b')")
+ addPositionDeleteFile("local.db.t_delete")
+ val icebergTable = Spark3Util.loadIcebergTable(spark, "local.db.t_delete")
+ val scanTasks = icebergTable.newScan().planFiles()
+ val hasDeletes =
+ try {
+ scanTasks
+ .iterator()
+ .asScala
+ .exists(task => task.deletes() != null && !task.deletes().isEmpty)
+ } finally {
+ scanTasks.close()
+ }
+ assert(hasDeletes)
+ val df = sql("select * from local.db.t_delete")
+ df.collect()
+ val plan = df.queryExecution.executedPlan.toString()
+ assert(!plan.contains("NativeIcebergTableScan"))
+ }
+ }
+
+ test("iceberg scan is disabled via spark.auron.enable.iceberg.scan") {
+ withTable("local.db.t_disable") {
+ sql("create table local.db.t_disable using iceberg as select 1 as id, 'a' as v")
+ withSQLConf("spark.auron.enable.iceberg.scan" -> "false") {
+ assert(
+ !org.apache.auron.spark.configuration.SparkAuronConfiguration.ENABLE_ICEBERG_SCAN.get())
+ val df = sql("select * from local.db.t_disable")
+ df.collect()
+ val plan = df.queryExecution.executedPlan.toString()
+ assert(!plan.contains("NativeIcebergTableScan"))
+ }
+ }
+ }
+
+ private def addPositionDeleteFile(tableName: String): Unit = {
+ val table = Spark3Util.loadIcebergTable(spark, tableName)
+ val taskIterable = table.newScan().planFiles()
+ val taskIter = taskIterable.iterator()
+ if (!taskIter.hasNext) {
+ taskIterable.close()
+ return
+ }
+
+ val task = taskIter.next().asInstanceOf[FileScanTask]
+ val deletePath =
+ table.locationProvider().newDataLocation(s"delete-${UUID.randomUUID().toString}.parquet")
+ val outputFile = table.io().newOutputFile(deletePath)
+ val encryptedOutput = table.encryption().encrypt(outputFile)
+ val appenderFactory = new GenericAppenderFactory(table.schema(), table.spec())
+ val writer =
+ appenderFactory.newPosDeleteWriter(encryptedOutput, FileFormat.PARQUET, task.partition())
+
+ val delete = PositionDelete.create[Record]().set(task.file().location(), 0L, null)
+ writer.write(delete)
+ writer.close()
+
+ val deleteFile = writer.toDeleteFile()
+ table.newRowDelta().addDeletes(deleteFile).commit()
+ taskIterable.close()
+ }
}
diff --git a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/BaseAuronIcebergSuite.scala b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/BaseAuronIcebergSuite.scala
index 59ffbd5a2..e88d96bf7 100644
--- a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/BaseAuronIcebergSuite.scala
+++ b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/BaseAuronIcebergSuite.scala
@@ -22,14 +22,29 @@ import org.apache.spark.sql.test.SharedSparkSession
trait BaseAuronIcebergSuite extends SharedSparkSession {
override protected def sparkConf: SparkConf = {
+ val extraJavaOptions =
+ "--add-opens=java.base/java.lang=ALL-UNNAMED " +
+ "--add-opens=java.base/java.nio=ALL-UNNAMED " +
+ "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED " +
+ "-Dio.netty.tryReflectionSetAccessible=true"
super.sparkConf
.set(
"spark.sql.extensions",
- "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
+ "org.apache.spark.sql.auron.AuronSparkSessionExtension," +
+ "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
.set("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
.set("spark.sql.catalog.local.type", "hadoop")
.set("spark.sql.catalog.local.warehouse", "iceberg_warehouse")
+ .set("spark.auron.enabled", "true")
+ .set(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.execution.auron.shuffle.AuronShuffleManager")
+ .set("spark.auron.enable.shuffleExchange", "true")
+ .set("spark.auron.enable.project", "false")
+ .set("spark.auron.ui.enabled", "false")
.set("spark.ui.enabled", "false")
+ .set("spark.driver.extraJavaOptions", extraJavaOptions)
+ .set("spark.executor.extraJavaOptions", extraJavaOptions)
}
}