Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions .github/workflows/ci-template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ jobs:
strategy:
fail-fast: false
matrix:
module: [ core, flink, spark3, lake ]
module: [ core, flink, spark3, spark3-scala213, lake ]
include:
- module: spark3-scala213
scala-profile: "-Pscala-2.13"
test-profile: "test-spark3"
name: "${{ matrix.module }}"
steps:
- name: Checkout code
Expand All @@ -48,14 +52,14 @@ jobs:
distribution: 'temurin'
- name: Build
run: |
mvn -T 1C -B clean install -DskipTests ${{ inputs.maven-parameters }}
mvn -T 1C -B clean install -DskipTests ${{ matrix.scala-profile }} ${{ inputs.maven-parameters }}
- name: Test
timeout-minutes: 60
run: |
TEST_MODULES=$(./.github/workflows/stage.sh ${{ matrix.module }})
echo "github ref: ${{ github.ref }}"
echo "Start testing modules: $TEST_MODULES"
mvn -B verify $TEST_MODULES -Ptest-coverage -Ptest-${{ matrix.module }} -Dlog.dir=${{ runner.temp }}/fluss-logs -Dlog4j.configurationFile=${{ github.workspace }}/tools/ci/log4j.properties ${{ inputs.maven-parameters }}
mvn -B verify $TEST_MODULES -Ptest-coverage -P${{ matrix.test-profile || format('test-{0}', matrix.module) }} ${{ matrix.scala-profile }} -Dlog.dir=${{ runner.temp }}/fluss-logs -Dlog4j.configurationFile=${{ github.workspace }}/tools/ci/log4j.properties ${{ inputs.maven-parameters }}
env:
MAVEN_OPTS: -Xmx4096m
ARTIFACTS_OSS_ENDPOINT: ${{ secrets.ARTIFACTS_OSS_ENDPOINT }}
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/stage.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
STAGE_CORE="core"
STAGE_FLINK="flink"
STAGE_SPARK="spark3"
STAGE_SPARK_SCALA213="spark3-scala213"
STAGE_LAKE="lake"

MODULES_FLINK="\
Expand Down Expand Up @@ -74,6 +75,9 @@ function get_test_modules_for_stage() {
(${STAGE_SPARK})
echo "-Pspark3 -pl fluss-test-coverage,$modules_spark3"
;;
(${STAGE_SPARK_SCALA213})
echo "-Pspark3,scala-2.13 -pl fluss-test-coverage,$modules_spark3"
;;
(${STAGE_LAKE})
echo "-pl fluss-test-coverage,$modules_lake"
;;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,14 @@ class FlussLakeAppendBatch(

val partitions = if (tableInfo.isPartitioned) {
planPartitionedTable(
lakeSplits.asScala,
lakeSplits.asScala.toSeq,
splitSerializer,
tableBucketsOffset,
buckets,
bucketOffsetsRetriever)
} else {
planNonPartitionedTable(
lakeSplits.asScala,
lakeSplits.asScala.toSeq,
splitSerializer,
tableBucketsOffset,
buckets,
Expand Down Expand Up @@ -163,7 +163,7 @@ class FlussLakeAppendBatch(
case Some(partitionId) =>
// Partition in both lake and Fluss — lake splits + log tail
val lakePartitions =
createLakePartitions(splits, splitSerializer, tableId, Some(partitionId))
createLakePartitions(splits.toSeq, splitSerializer, tableId, Some(partitionId))

val stoppingOffsets = getBucketOffsets(
stoppingOffsetsInitializer,
Expand All @@ -182,7 +182,7 @@ class FlussLakeAppendBatch(
// Partition only in lake (expired in Fluss) — lake splits only
val pid = lakeSplitPartitionId
lakeSplitPartitionId -= 1
createLakePartitions(splits, splitSerializer, tableId, Some(pid))
createLakePartitions(splits.toSeq, splitSerializer, tableId, Some(pid))
}
}.toSeq

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,21 +77,21 @@ class DataConverterTest extends AnyFunSuite {
val sparkDecimal = DataConverter.toSparkDecimal(flussDecimal)

assertThat(sparkDecimal).isInstanceOf(classOf[SparkDecimal])
assertThat(sparkDecimal.toBigDecimal.doubleValue()).isEqualTo(123.45)
assertThat(sparkDecimal.toBigDecimal.doubleValue).isEqualTo(123.45)
}

test("toSparkDecimal: negative decimal") {
val flussDecimal = FlussDecimal.fromBigDecimal(new java.math.BigDecimal("-999.99"), 5, 2)
val sparkDecimal = DataConverter.toSparkDecimal(flussDecimal)

assertThat(sparkDecimal.toBigDecimal.doubleValue()).isEqualTo(-999.99)
assertThat(sparkDecimal.toBigDecimal.doubleValue).isEqualTo(-999.99)
}

test("toSparkDecimal: zero") {
val flussDecimal = FlussDecimal.fromBigDecimal(java.math.BigDecimal.ZERO, 5, 2)
val sparkDecimal = DataConverter.toSparkDecimal(flussDecimal)

assertThat(sparkDecimal.toBigDecimal.doubleValue()).isEqualTo(0.0)
assertThat(sparkDecimal.toBigDecimal.doubleValue).isEqualTo(0.0)
}

test("toSparkUTF8String: ASCII string") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,9 @@ class FlussAsSparkArrayTest extends AnyFunSuite {
val flussArray = new GenericArray(decimals)
val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)

assertThat(sparkArray.getDecimal(0, 10, 2).toBigDecimal.doubleValue()).isEqualTo(10.50)
assertThat(sparkArray.getDecimal(1, 10, 2).toBigDecimal.doubleValue()).isEqualTo(20.75)
assertThat(sparkArray.getDecimal(2, 10, 2).toBigDecimal.doubleValue()).isEqualTo(30.99)
assertThat(sparkArray.getDecimal(0, 10, 2).toBigDecimal.doubleValue).isEqualTo(10.50)
assertThat(sparkArray.getDecimal(1, 10, 2).toBigDecimal.doubleValue).isEqualTo(20.75)
assertThat(sparkArray.getDecimal(2, 10, 2).toBigDecimal.doubleValue).isEqualTo(30.99)
}

test("getUTF8String: read string array") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ class FlussAsSparkRowTest extends AnyFunSuite {
val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)

val sparkDecimal = sparkRow.getDecimal(0, 10, 2)
assertThat(sparkDecimal.toBigDecimal.doubleValue()).isEqualTo(123.45)
assertThat(sparkDecimal.toBigDecimal.doubleValue).isEqualTo(123.45)
}

test("getUTF8String: read string values") {
Expand Down Expand Up @@ -479,6 +479,6 @@ class FlussAsSparkRowTest extends AnyFunSuite {
assertThat(sparkRow.getFloat(5)).isEqualTo(3.14f)
assertThat(sparkRow.getDouble(6)).isEqualTo(2.718)
assertThat(sparkRow.getUTF8String(7).toString).isEqualTo("test")
assertThat(sparkRow.getDecimal(8, 10, 2).toBigDecimal.doubleValue()).isEqualTo(99.99)
assertThat(sparkRow.getDecimal(8, 10, 2).toBigDecimal.doubleValue).isEqualTo(99.99)
}
}
8 changes: 8 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,14 @@
</properties>
</profile>

<profile>
<id>scala-2.13</id>
<properties>
<scala.binary.version>2.13</scala.binary.version>
<scala.version>${scala213.version}</scala.version>
</properties>
</profile>

<profile>
<!-- used for SNAPSHOT and regular releases -->
<id>release</id>
Expand Down