From cfeda53c30a3f60d1b690576948701c438eb168d Mon Sep 17 00:00:00 2001 From: Yann Date: Thu, 9 Apr 2026 10:11:56 +0800 Subject: [PATCH] [spark] support scala 2.13 --- .github/workflows/ci-template.yaml | 10 +++++++--- .github/workflows/stage.sh | 4 ++++ .../org/apache/fluss/spark/read/FlussLakeBatch.scala | 8 ++++---- .../org/apache/fluss/spark/row/DataConverterTest.scala | 6 +++--- .../apache/fluss/spark/row/FlussAsSparkArrayTest.scala | 6 +++--- .../apache/fluss/spark/row/FlussAsSparkRowTest.scala | 4 ++-- pom.xml | 8 ++++++++ 7 files changed, 31 insertions(+), 15 deletions(-) diff --git a/.github/workflows/ci-template.yaml b/.github/workflows/ci-template.yaml index b23e3f0a1a..064c4ffaa6 100644 --- a/.github/workflows/ci-template.yaml +++ b/.github/workflows/ci-template.yaml @@ -36,7 +36,11 @@ jobs: strategy: fail-fast: false matrix: - module: [ core, flink, spark3, lake ] + module: [ core, flink, spark3, spark3-scala213, lake ] + include: + - module: spark3-scala213 + scala-profile: "-Pscala-2.13" + test-profile: "test-spark3" name: "${{ matrix.module }}" steps: - name: Checkout code @@ -48,14 +52,14 @@ jobs: distribution: 'temurin' - name: Build run: | - mvn -T 1C -B clean install -DskipTests ${{ inputs.maven-parameters }} + mvn -T 1C -B clean install -DskipTests ${{ matrix.scala-profile }} ${{ inputs.maven-parameters }} - name: Test timeout-minutes: 60 run: | TEST_MODULES=$(./.github/workflows/stage.sh ${{ matrix.module }}) echo "github ref: ${{ github.ref }}" echo "Start testing modules: $TEST_MODULES" - mvn -B verify $TEST_MODULES -Ptest-coverage -Ptest-${{ matrix.module }} -Dlog.dir=${{ runner.temp }}/fluss-logs -Dlog4j.configurationFile=${{ github.workspace }}/tools/ci/log4j.properties ${{ inputs.maven-parameters }} + mvn -B verify $TEST_MODULES -Ptest-coverage -P${{ matrix.test-profile || format('test-{0}', matrix.module) }} ${{ matrix.scala-profile }} -Dlog.dir=${{ runner.temp }}/fluss-logs -Dlog4j.configurationFile=${{ github.workspace }}/tools/ci/log4j.properties ${{ inputs.maven-parameters }} env: MAVEN_OPTS: -Xmx4096m ARTIFACTS_OSS_ENDPOINT: ${{ secrets.ARTIFACTS_OSS_ENDPOINT }} diff --git a/.github/workflows/stage.sh b/.github/workflows/stage.sh index b35b74e47f..7fef1301dc 100755 --- a/.github/workflows/stage.sh +++ b/.github/workflows/stage.sh @@ -20,6 +20,7 @@ STAGE_CORE="core" STAGE_FLINK="flink" STAGE_SPARK="spark3" +STAGE_SPARK_SCALA213="spark3-scala213" STAGE_LAKE="lake" MODULES_FLINK="\ @@ -74,6 +75,9 @@ function get_test_modules_for_stage() { (${STAGE_SPARK}) echo "-Pspark3 -pl fluss-test-coverage,$modules_spark3" ;; + (${STAGE_SPARK_SCALA213}) + echo "-Pspark3,scala-2.13 -pl fluss-test-coverage,$modules_spark3" + ;; (${STAGE_LAKE}) echo "-pl fluss-test-coverage,$modules_lake" ;; diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeBatch.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeBatch.scala index 70d5ef45c0..ec944f2040 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeBatch.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeBatch.scala @@ -102,14 +102,14 @@ class FlussLakeAppendBatch( val partitions = if (tableInfo.isPartitioned) { planPartitionedTable( - lakeSplits.asScala, + lakeSplits.asScala.toSeq, splitSerializer, tableBucketsOffset, buckets, bucketOffsetsRetriever) } else { planNonPartitionedTable( - lakeSplits.asScala, + lakeSplits.asScala.toSeq, splitSerializer, tableBucketsOffset, buckets, @@ -163,7 +163,7 @@ class FlussLakeAppendBatch( case Some(partitionId) => // Partition in both lake and Fluss — lake splits + log tail val lakePartitions = - createLakePartitions(splits, splitSerializer, tableId, Some(partitionId)) + createLakePartitions(splits.toSeq, splitSerializer, tableId, Some(partitionId)) val stoppingOffsets = getBucketOffsets( stoppingOffsetsInitializer, @@ -182,7 +182,7 @@ class FlussLakeAppendBatch( // Partition only in lake (expired in Fluss) — lake splits only val pid = lakeSplitPartitionId lakeSplitPartitionId -= 1 - createLakePartitions(splits, splitSerializer, tableId, Some(pid)) + createLakePartitions(splits.toSeq, splitSerializer, tableId, Some(pid)) } }.toSeq diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/DataConverterTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/DataConverterTest.scala index 6866ba7b60..d1847de547 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/DataConverterTest.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/DataConverterTest.scala @@ -77,21 +77,21 @@ class DataConverterTest extends AnyFunSuite { val sparkDecimal = DataConverter.toSparkDecimal(flussDecimal) assertThat(sparkDecimal).isInstanceOf(classOf[SparkDecimal]) - assertThat(sparkDecimal.toBigDecimal.doubleValue()).isEqualTo(123.45) + assertThat(sparkDecimal.toBigDecimal.doubleValue).isEqualTo(123.45) } test("toSparkDecimal: negative decimal") { val flussDecimal = FlussDecimal.fromBigDecimal(new java.math.BigDecimal("-999.99"), 5, 2) val sparkDecimal = DataConverter.toSparkDecimal(flussDecimal) - assertThat(sparkDecimal.toBigDecimal.doubleValue()).isEqualTo(-999.99) + assertThat(sparkDecimal.toBigDecimal.doubleValue).isEqualTo(-999.99) } test("toSparkDecimal: zero") { val flussDecimal = FlussDecimal.fromBigDecimal(java.math.BigDecimal.ZERO, 5, 2) val sparkDecimal = DataConverter.toSparkDecimal(flussDecimal) - assertThat(sparkDecimal.toBigDecimal.doubleValue()).isEqualTo(0.0) + assertThat(sparkDecimal.toBigDecimal.doubleValue).isEqualTo(0.0) } test("toSparkUTF8String: ASCII string") { diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/FlussAsSparkArrayTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/FlussAsSparkArrayTest.scala index 029104e5bf..aa2cdd0b2d 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/FlussAsSparkArrayTest.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/FlussAsSparkArrayTest.scala @@ -212,9 +212,9 @@ class FlussAsSparkArrayTest extends AnyFunSuite { val flussArray = new GenericArray(decimals) val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray) - assertThat(sparkArray.getDecimal(0, 10, 2).toBigDecimal.doubleValue()).isEqualTo(10.50) - assertThat(sparkArray.getDecimal(1, 10, 2).toBigDecimal.doubleValue()).isEqualTo(20.75) - assertThat(sparkArray.getDecimal(2, 10, 2).toBigDecimal.doubleValue()).isEqualTo(30.99) + assertThat(sparkArray.getDecimal(0, 10, 2).toBigDecimal.doubleValue).isEqualTo(10.50) + assertThat(sparkArray.getDecimal(1, 10, 2).toBigDecimal.doubleValue).isEqualTo(20.75) + assertThat(sparkArray.getDecimal(2, 10, 2).toBigDecimal.doubleValue).isEqualTo(30.99) } test("getUTF8String: read string array") { diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/FlussAsSparkRowTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/FlussAsSparkRowTest.scala index 03ccad4ef6..c3e19515a1 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/FlussAsSparkRowTest.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/FlussAsSparkRowTest.scala @@ -230,7 +230,7 @@ class FlussAsSparkRowTest extends AnyFunSuite { val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow) val sparkDecimal = sparkRow.getDecimal(0, 10, 2) - assertThat(sparkDecimal.toBigDecimal.doubleValue()).isEqualTo(123.45) + assertThat(sparkDecimal.toBigDecimal.doubleValue).isEqualTo(123.45) } test("getUTF8String: read string values") { @@ -479,6 +479,6 @@ class FlussAsSparkRowTest extends AnyFunSuite { assertThat(sparkRow.getFloat(5)).isEqualTo(3.14f) assertThat(sparkRow.getDouble(6)).isEqualTo(2.718) assertThat(sparkRow.getUTF8String(7).toString).isEqualTo("test") - assertThat(sparkRow.getDecimal(8, 10, 2).toBigDecimal.doubleValue()).isEqualTo(99.99) + assertThat(sparkRow.getDecimal(8, 10, 2).toBigDecimal.doubleValue).isEqualTo(99.99) } } diff --git a/pom.xml b/pom.xml index 2e4ebd9e01..6b361c28fd 100644 --- a/pom.xml +++ b/pom.xml @@ -475,6 +475,14 @@ + + scala-2.13 + + 2.13 + ${scala213.version} + + + release