diff --git a/.github/workflows/ci-template.yaml b/.github/workflows/ci-template.yaml
index b23e3f0a1a..064c4ffaa6 100644
--- a/.github/workflows/ci-template.yaml
+++ b/.github/workflows/ci-template.yaml
@@ -36,7 +36,11 @@ jobs:
strategy:
fail-fast: false
matrix:
- module: [ core, flink, spark3, lake ]
+ module: [ core, flink, spark3, spark3-scala213, lake ]
+ include:
+ - module: spark3-scala213
+ scala-profile: "-Pscala-2.13"
+ test-profile: "test-spark3"
name: "${{ matrix.module }}"
steps:
- name: Checkout code
@@ -48,14 +52,14 @@ jobs:
distribution: 'temurin'
- name: Build
run: |
- mvn -T 1C -B clean install -DskipTests ${{ inputs.maven-parameters }}
+ mvn -T 1C -B clean install -DskipTests ${{ matrix.scala-profile }} ${{ inputs.maven-parameters }}
- name: Test
timeout-minutes: 60
run: |
TEST_MODULES=$(./.github/workflows/stage.sh ${{ matrix.module }})
echo "github ref: ${{ github.ref }}"
echo "Start testing modules: $TEST_MODULES"
- mvn -B verify $TEST_MODULES -Ptest-coverage -Ptest-${{ matrix.module }} -Dlog.dir=${{ runner.temp }}/fluss-logs -Dlog4j.configurationFile=${{ github.workspace }}/tools/ci/log4j.properties ${{ inputs.maven-parameters }}
+ mvn -B verify $TEST_MODULES -Ptest-coverage -P${{ matrix.test-profile || format('test-{0}', matrix.module) }} ${{ matrix.scala-profile }} -Dlog.dir=${{ runner.temp }}/fluss-logs -Dlog4j.configurationFile=${{ github.workspace }}/tools/ci/log4j.properties ${{ inputs.maven-parameters }}
env:
MAVEN_OPTS: -Xmx4096m
ARTIFACTS_OSS_ENDPOINT: ${{ secrets.ARTIFACTS_OSS_ENDPOINT }}
diff --git a/.github/workflows/stage.sh b/.github/workflows/stage.sh
index b35b74e47f..7fef1301dc 100755
--- a/.github/workflows/stage.sh
+++ b/.github/workflows/stage.sh
@@ -20,6 +20,7 @@
STAGE_CORE="core"
STAGE_FLINK="flink"
STAGE_SPARK="spark3"
+STAGE_SPARK_SCALA213="spark3-scala213"
STAGE_LAKE="lake"
MODULES_FLINK="\
@@ -74,6 +75,9 @@ function get_test_modules_for_stage() {
(${STAGE_SPARK})
echo "-Pspark3 -pl fluss-test-coverage,$modules_spark3"
;;
+ (${STAGE_SPARK_SCALA213})
+ echo "-Pspark3,scala-2.13 -pl fluss-test-coverage,$modules_spark3"
+ ;;
(${STAGE_LAKE})
echo "-pl fluss-test-coverage,$modules_lake"
;;
diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeBatch.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeBatch.scala
index 70d5ef45c0..ec944f2040 100644
--- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeBatch.scala
+++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeBatch.scala
@@ -102,14 +102,14 @@ class FlussLakeAppendBatch(
val partitions = if (tableInfo.isPartitioned) {
planPartitionedTable(
- lakeSplits.asScala,
+ lakeSplits.asScala.toSeq,
splitSerializer,
tableBucketsOffset,
buckets,
bucketOffsetsRetriever)
} else {
planNonPartitionedTable(
- lakeSplits.asScala,
+ lakeSplits.asScala.toSeq,
splitSerializer,
tableBucketsOffset,
buckets,
@@ -163,7 +163,7 @@ class FlussLakeAppendBatch(
case Some(partitionId) =>
// Partition in both lake and Fluss — lake splits + log tail
val lakePartitions =
- createLakePartitions(splits, splitSerializer, tableId, Some(partitionId))
+ createLakePartitions(splits.toSeq, splitSerializer, tableId, Some(partitionId))
val stoppingOffsets = getBucketOffsets(
stoppingOffsetsInitializer,
@@ -182,7 +182,7 @@ class FlussLakeAppendBatch(
// Partition only in lake (expired in Fluss) — lake splits only
val pid = lakeSplitPartitionId
lakeSplitPartitionId -= 1
- createLakePartitions(splits, splitSerializer, tableId, Some(pid))
+ createLakePartitions(splits.toSeq, splitSerializer, tableId, Some(pid))
}
}.toSeq
diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/DataConverterTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/DataConverterTest.scala
index 6866ba7b60..d1847de547 100644
--- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/DataConverterTest.scala
+++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/DataConverterTest.scala
@@ -77,21 +77,21 @@ class DataConverterTest extends AnyFunSuite {
val sparkDecimal = DataConverter.toSparkDecimal(flussDecimal)
assertThat(sparkDecimal).isInstanceOf(classOf[SparkDecimal])
- assertThat(sparkDecimal.toBigDecimal.doubleValue()).isEqualTo(123.45)
+ assertThat(sparkDecimal.toBigDecimal.doubleValue).isEqualTo(123.45)
}
test("toSparkDecimal: negative decimal") {
val flussDecimal = FlussDecimal.fromBigDecimal(new java.math.BigDecimal("-999.99"), 5, 2)
val sparkDecimal = DataConverter.toSparkDecimal(flussDecimal)
- assertThat(sparkDecimal.toBigDecimal.doubleValue()).isEqualTo(-999.99)
+ assertThat(sparkDecimal.toBigDecimal.doubleValue).isEqualTo(-999.99)
}
test("toSparkDecimal: zero") {
val flussDecimal = FlussDecimal.fromBigDecimal(java.math.BigDecimal.ZERO, 5, 2)
val sparkDecimal = DataConverter.toSparkDecimal(flussDecimal)
- assertThat(sparkDecimal.toBigDecimal.doubleValue()).isEqualTo(0.0)
+ assertThat(sparkDecimal.toBigDecimal.doubleValue).isEqualTo(0.0)
}
test("toSparkUTF8String: ASCII string") {
diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/FlussAsSparkArrayTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/FlussAsSparkArrayTest.scala
index 029104e5bf..aa2cdd0b2d 100644
--- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/FlussAsSparkArrayTest.scala
+++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/FlussAsSparkArrayTest.scala
@@ -212,9 +212,9 @@ class FlussAsSparkArrayTest extends AnyFunSuite {
val flussArray = new GenericArray(decimals)
val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
- assertThat(sparkArray.getDecimal(0, 10, 2).toBigDecimal.doubleValue()).isEqualTo(10.50)
- assertThat(sparkArray.getDecimal(1, 10, 2).toBigDecimal.doubleValue()).isEqualTo(20.75)
- assertThat(sparkArray.getDecimal(2, 10, 2).toBigDecimal.doubleValue()).isEqualTo(30.99)
+ assertThat(sparkArray.getDecimal(0, 10, 2).toBigDecimal.doubleValue).isEqualTo(10.50)
+ assertThat(sparkArray.getDecimal(1, 10, 2).toBigDecimal.doubleValue).isEqualTo(20.75)
+ assertThat(sparkArray.getDecimal(2, 10, 2).toBigDecimal.doubleValue).isEqualTo(30.99)
}
test("getUTF8String: read string array") {
diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/FlussAsSparkRowTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/FlussAsSparkRowTest.scala
index 03ccad4ef6..c3e19515a1 100644
--- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/FlussAsSparkRowTest.scala
+++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/FlussAsSparkRowTest.scala
@@ -230,7 +230,7 @@ class FlussAsSparkRowTest extends AnyFunSuite {
val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
val sparkDecimal = sparkRow.getDecimal(0, 10, 2)
- assertThat(sparkDecimal.toBigDecimal.doubleValue()).isEqualTo(123.45)
+ assertThat(sparkDecimal.toBigDecimal.doubleValue).isEqualTo(123.45)
}
test("getUTF8String: read string values") {
@@ -479,6 +479,6 @@ class FlussAsSparkRowTest extends AnyFunSuite {
assertThat(sparkRow.getFloat(5)).isEqualTo(3.14f)
assertThat(sparkRow.getDouble(6)).isEqualTo(2.718)
assertThat(sparkRow.getUTF8String(7).toString).isEqualTo("test")
- assertThat(sparkRow.getDecimal(8, 10, 2).toBigDecimal.doubleValue()).isEqualTo(99.99)
+ assertThat(sparkRow.getDecimal(8, 10, 2).toBigDecimal.doubleValue).isEqualTo(99.99)
}
}
diff --git a/pom.xml b/pom.xml
index 2e4ebd9e01..6b361c28fd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -475,6 +475,14 @@
+
+ scala-2.13
+
+ 2.13
+ ${scala213.version}
+
+
+
release