diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LakeSnapshotAndLogSplitScanner.java similarity index 82% rename from fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java rename to fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LakeSnapshotAndLogSplitScanner.java index 0caf10d55d..f25b878d37 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LakeSnapshotAndLogSplitScanner.java @@ -16,15 +16,13 @@ * limitations under the License. */ -package org.apache.fluss.flink.lake.reader; +package org.apache.fluss.client.table.scanner.batch; import org.apache.fluss.client.table.Table; import org.apache.fluss.client.table.scanner.ScanRecord; import org.apache.fluss.client.table.scanner.SortMergeReader; -import org.apache.fluss.client.table.scanner.batch.BatchScanner; import org.apache.fluss.client.table.scanner.log.LogScanner; import org.apache.fluss.client.table.scanner.log.ScanRecords; -import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit; import org.apache.fluss.lake.source.LakeSource; import org.apache.fluss.lake.source.LakeSplit; import org.apache.fluss.lake.source.RecordReader; @@ -53,14 +51,14 @@ /** A scanner to merge the lakehouse's snapshot and change log. */ public class LakeSnapshotAndLogSplitScanner implements BatchScanner { - private final LakeSnapshotAndFlussLogSplit lakeSnapshotSplitAndFlussLogSplit; + private final @Nullable List lakeSplits; private Comparator rowComparator; private List> lakeRecordIterators = new ArrayList<>(); private final LakeSource lakeSource; private final int[] pkIndexes; - // the indexes of primary key in emitted row by paimon and fluss + // the indexes of primary key in emitted row by lake and fluss private int[] keyIndexesInRow; @Nullable private int[] adjustProjectedFields; @@ -76,11 +74,15 @@ public class LakeSnapshotAndLogSplitScanner implements BatchScanner { public LakeSnapshotAndLogSplitScanner( Table table, LakeSource lakeSource, - LakeSnapshotAndFlussLogSplit lakeSnapshotAndFlussLogSplit, + @Nullable List lakeSplits, + TableBucket tableBucket, + long startingOffset, + long stoppingOffset, @Nullable int[] projectedFields) { this.pkIndexes = table.getTableInfo().getSchema().getPrimaryKeyIndexes(); - this.lakeSnapshotSplitAndFlussLogSplit = lakeSnapshotAndFlussLogSplit; + this.lakeSplits = lakeSplits; this.lakeSource = lakeSource; + this.stoppingOffset = stoppingOffset; int[] newProjectedFields = getNeedProjectFields(table, projectedFields); this.logScanner = table.newScan().project(newProjectedFields).createLogScanner(); @@ -89,29 +91,14 @@ public LakeSnapshotAndLogSplitScanner( .mapToObj(field -> new int[] {field}) .toArray(int[][]::new)); - TableBucket tableBucket = lakeSnapshotAndFlussLogSplit.getTableBucket(); if (tableBucket.getPartitionId() != null) { this.logScanner.subscribe( - tableBucket.getPartitionId(), - tableBucket.getBucket(), - lakeSnapshotAndFlussLogSplit.getStartingOffset()); + tableBucket.getPartitionId(), tableBucket.getBucket(), startingOffset); } else { - this.logScanner.subscribe( - tableBucket.getBucket(), lakeSnapshotAndFlussLogSplit.getStartingOffset()); + this.logScanner.subscribe(tableBucket.getBucket(), startingOffset); } - this.stoppingOffset = - lakeSnapshotAndFlussLogSplit - .getStoppingOffset() - .orElseThrow( - () -> - new RuntimeException( - "StoppingOffset is null for split: " - + lakeSnapshotAndFlussLogSplit)); - - this.logScanFinished = - lakeSnapshotAndFlussLogSplit.getStartingOffset() >= stoppingOffset - || stoppingOffset <= 0; + this.logScanFinished = startingOffset >= stoppingOffset || stoppingOffset <= 0; } private int[] getNeedProjectFields(Table flussTable, @Nullable int[] projectedFields) { @@ -169,11 +156,10 @@ private int findIndex(int[] array, int target) { public CloseableIterator pollBatch(Duration timeout) throws IOException { if (logScanFinished) { if (lakeRecordIterators.isEmpty()) { - if (lakeSnapshotSplitAndFlussLogSplit.getLakeSplits() == null - || lakeSnapshotSplitAndFlussLogSplit.getLakeSplits().isEmpty()) { + if (lakeSplits == null || lakeSplits.isEmpty()) { lakeRecordIterators = Collections.emptyList(); } else { - for (LakeSplit lakeSplit : lakeSnapshotSplitAndFlussLogSplit.getLakeSplits()) { + for (LakeSplit lakeSplit : lakeSplits) { lakeRecordIterators.add( lakeSource.createRecordReader(() -> lakeSplit).read()); } @@ -195,12 +181,11 @@ public CloseableIterator pollBatch(Duration timeout) throws IOExcep } else { if (lakeRecordIterators.isEmpty()) { List recordReaders = new ArrayList<>(); - if (lakeSnapshotSplitAndFlussLogSplit.getLakeSplits() == null - || lakeSnapshotSplitAndFlussLogSplit.getLakeSplits().isEmpty()) { + if (lakeSplits == null || lakeSplits.isEmpty()) { // pass null split to get rowComparator recordReaders.add(lakeSource.createRecordReader(() -> null)); } else { - for (LakeSplit lakeSplit : lakeSnapshotSplitAndFlussLogSplit.getLakeSplits()) { + for (LakeSplit lakeSplit : lakeSplits) { recordReaders.add(lakeSource.createRecordReader(() -> lakeSplit)); } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java index d48dbb7946..229f1428d9 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java @@ -19,7 +19,7 @@ import org.apache.fluss.client.table.Table; import org.apache.fluss.client.table.scanner.batch.BatchScanner; -import org.apache.fluss.flink.lake.reader.LakeSnapshotAndLogSplitScanner; +import org.apache.fluss.client.table.scanner.batch.LakeSnapshotAndLogSplitScanner; import org.apache.fluss.flink.lake.reader.LakeSnapshotScanner; import org.apache.fluss.flink.lake.reader.SeekableLakeSnapshotSplitScanner; import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit; @@ -101,9 +101,23 @@ private BatchScanner getBatchScanner(LakeSnapshotAndFlussLogSplit lakeSplit) { lakeSplit.getCurrentLakeSplitIndex()); } else { + long stoppingOffset = + lakeSplit + .getStoppingOffset() + .orElseThrow( + () -> + new RuntimeException( + "StoppingOffset is null for split: " + + lakeSplit)); lakeBatchScanner = new LakeSnapshotAndLogSplitScanner( - table, lakeSource, lakeSplit, projectedFields); + table, + lakeSource, + lakeSplit.getLakeSplits(), + lakeSplit.getTableBucket(), + lakeSplit.getStartingOffset(), + stoppingOffset, + projectedFields); } return lakeBatchScanner; } diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala index c2ad05081c..0a9d9663f6 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala @@ -21,7 +21,7 @@ import org.apache.fluss.client.admin.Admin import org.apache.fluss.config.{Configuration => FlussConfiguration} import org.apache.fluss.metadata.{TableInfo, TablePath} import org.apache.fluss.spark.catalog.{AbstractSparkTable, SupportsFlussPartitionManagement} -import org.apache.fluss.spark.read.{FlussAppendScanBuilder, FlussLakeAppendScanBuilder, FlussUpsertScanBuilder} +import org.apache.fluss.spark.read.{FlussAppendScanBuilder, FlussLakeAppendScanBuilder, FlussLakeUpsertScanBuilder, FlussUpsertScanBuilder} import org.apache.fluss.spark.write.{FlussAppendWriteBuilder, FlussUpsertWriteBuilder} import org.apache.spark.sql.catalyst.SQLConfHelper @@ -75,7 +75,11 @@ class SparkTable( new FlussAppendScanBuilder(tablePath, tableInfo, options, flussConfig) } } else { - new FlussUpsertScanBuilder(tablePath, tableInfo, options, flussConfig) + if (isDataLakeEnabled) { + new FlussLakeUpsertScanBuilder(tablePath, tableInfo, options, flussConfig) + } else { + new FlussUpsertScanBuilder(tablePath, tableInfo, options, flussConfig) + } } } } diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala index ccf89cc87c..661f8779f4 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala @@ -45,24 +45,6 @@ case class FlussAppendInputPartition(tableBucket: TableBucket, startOffset: Long } } -/** - * Represents an input partition for reading data from a single lake split. Each lake split maps to - * one Spark task, enabling parallel lake reads across splits. - * - * @param tableBucket - * the table bucket this split belongs to - * @param lakeSplitBytes - * serialized lake split data - */ -case class FlussLakeInputPartition(tableBucket: TableBucket, lakeSplitBytes: Array[Byte]) - extends FlussInputPartition { - override def toString: String = { - s"FlussLakeInputPartition{tableId=${tableBucket.getTableId}, bucketId=${tableBucket.getBucket}," + - s" partitionId=${tableBucket.getPartitionId}," + - s" splitSize=${lakeSplitBytes.length}}" - } -} - /** * Represents an input partition for reading data from a primary key table bucket. This partition * includes snapshot information for hybrid snapshot-log reading. @@ -85,6 +67,6 @@ case class FlussUpsertInputPartition( override def toString: String = { s"FlussUpsertInputPartition{tableId=${tableBucket.getTableId}, bucketId=${tableBucket.getBucket}," + s" partitionId=${tableBucket.getPartitionId}, snapshotId=$snapshotId," + - s" logStartOffset=$logStartingOffset, logStopOffset=$logStoppingOffset" + s" logStartOffset=$logStartingOffset, logStopOffset=$logStoppingOffset}" } } diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala index d4e14bd479..13cffe4fed 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala @@ -20,6 +20,7 @@ package org.apache.fluss.spark.read import org.apache.fluss.config.Configuration import org.apache.fluss.metadata.{TableInfo, TablePath} import org.apache.fluss.spark.SparkConversions +import org.apache.fluss.spark.read.lake.{FlussLakeAppendBatch, FlussLakeUpsertBatch} import org.apache.spark.sql.connector.read.{Batch, Scan} import org.apache.spark.sql.connector.read.streaming.MicroBatchStream @@ -108,3 +109,27 @@ case class FlussUpsertScan( checkpointLocation) } } + +/** Fluss Lake Upsert Scan for lake-enabled primary key tables. */ +case class FlussLakeUpsertScan( + tablePath: TablePath, + tableInfo: TableInfo, + requiredSchema: Option[StructType], + options: CaseInsensitiveStringMap, + flussConfig: Configuration) + extends FlussScan { + + override def toBatch: Batch = { + new FlussLakeUpsertBatch(tablePath, tableInfo, readSchema, options, flussConfig) + } + + override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = { + new FlussUpsertMicroBatchStream( + tablePath, + tableInfo, + readSchema, + options, + flussConfig, + checkpointLocation) + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala index 9dd49f4df6..20d367ef46 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala @@ -72,3 +72,16 @@ class FlussUpsertScanBuilder( FlussUpsertScan(tablePath, tableInfo, requiredSchema, options, flussConfig) } } + +/** Fluss Lake Upsert Scan Builder for lake-enabled primary key tables. */ +class FlussLakeUpsertScanBuilder( + tablePath: TablePath, + tableInfo: TableInfo, + options: CaseInsensitiveStringMap, + flussConfig: FlussConfiguration) + extends FlussScanBuilder { + + override def build(): Scan = { + FlussLakeUpsertScan(tablePath, tableInfo, requiredSchema, options, flussConfig) + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeBatch.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendBatch.scala similarity index 88% rename from fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeBatch.scala rename to fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendBatch.scala index 70d5ef45c0..384cb4bdbd 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeBatch.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendBatch.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.fluss.spark.read +package org.apache.fluss.spark.read.lake import org.apache.fluss.client.initializer.{BucketOffsetsRetrieverImpl, OffsetsInitializer} import org.apache.fluss.client.table.scanner.log.LogScanner @@ -24,6 +24,7 @@ import org.apache.fluss.exception.LakeTableSnapshotNotExistException import org.apache.fluss.lake.serializer.SimpleVersionedSerializer import org.apache.fluss.lake.source.{LakeSource, LakeSplit} import org.apache.fluss.metadata.{ResolvedPartitionSpec, TableBucket, TableInfo, TablePath} +import org.apache.fluss.spark.read._ import org.apache.fluss.utils.ExceptionUtils import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory} @@ -40,19 +41,11 @@ class FlussLakeAppendBatch( readSchema: StructType, options: CaseInsensitiveStringMap, flussConfig: Configuration) - extends FlussBatch(tablePath, tableInfo, readSchema, flussConfig) { + extends FlussLakeBatch(tablePath, tableInfo, readSchema, options, flussConfig) { - // Required by FlussBatch but unused — lake snapshot determines start offsets. + // Required by FlussLakeBatch but unused — lake snapshot determines start offsets. override val startOffsetsInitializer: OffsetsInitializer = OffsetsInitializer.earliest() - override val stoppingOffsetsInitializer: OffsetsInitializer = { - FlussOffsetInitializers.stoppingOffsetsInitializer(true, options, flussConfig) - } - - private lazy val (partitions, isFallback) = doPlan() - - override def planInputPartitions(): Array[InputPartition] = partitions - override def createReaderFactory(): PartitionReaderFactory = { if (isFallback) { new FlussAppendPartitionReaderFactory(tablePath, projection, options, flussConfig) @@ -66,11 +59,7 @@ class FlussLakeAppendBatch( } } - /** - * Plans input partitions for reading. The returned isFallback flag is true when no lake snapshot - * exists and the plan falls back to pure log reading. - */ - private def doPlan(): (Array[InputPartition], Boolean) = { + override def doPlan(): (Array[InputPartition], Boolean) = { val lakeSnapshot = try { admin.getReadableLakeSnapshot(tablePath).get() @@ -86,8 +75,8 @@ class FlussLakeAppendBatch( throw e } - val lakeSource = FlussLakeSourceUtils.createLakeSource(tableInfo.getProperties.toMap, tablePath) - lakeSource.withProject(FlussLakeSourceUtils.lakeProjection(projection)) + val lakeSource = FlussLakeUtils.createLakeSource(tableInfo.getProperties.toMap, tablePath) + lakeSource.withProject(FlussLakeUtils.lakeProjection(projection)) val lakeSplits = lakeSource .createPlanner(new LakeSource.PlannerContext { @@ -261,18 +250,6 @@ class FlussLakeAppendBatch( } } - private def getBucketOffsets( - initializer: OffsetsInitializer, - partitionName: String, - buckets: Seq[Int], - bucketOffsetsRetriever: BucketOffsetsRetrieverImpl): Map[Int, Long] = { - initializer - .getBucketOffsets(partitionName, buckets.map(Integer.valueOf).asJava, bucketOffsetsRetriever) - .asScala - .map(e => (e._1.intValue(), Long2long(e._2))) - .toMap - } - private def planFallbackPartitions(): Array[InputPartition] = { val fallbackStartInit = FlussOffsetInitializers.startOffsetsInitializer(options, flussConfig) val bucketOffsetsRetriever = new BucketOffsetsRetrieverImpl(admin, tablePath) diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakePartitionReader.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendPartitionReader.scala similarity index 92% rename from fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakePartitionReader.scala rename to fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendPartitionReader.scala index 9c0031409b..4fa526ed46 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakePartitionReader.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendPartitionReader.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.fluss.spark.read +package org.apache.fluss.spark.read.lake import org.apache.fluss.lake.source.{LakeSource, LakeSplit} import org.apache.fluss.metadata.TablePath @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.read.PartitionReader /** Partition reader that reads data from a single lake split via lake storage (no Fluss connection). */ -class FlussLakePartitionReader( +class FlussLakeAppendPartitionReader( tablePath: TablePath, rowType: RowType, partition: FlussLakeInputPartition, @@ -50,9 +50,7 @@ class FlussLakePartitionReader( val split = splitSerializer.deserialize(splitSerializer.getVersion, partition.lakeSplitBytes) recordIterator = lakeSource - .createRecordReader(new LakeSource.ReaderContext[LakeSplit] { - override def lakeSplit(): LakeSplit = split - }) + .createRecordReader(() => split) .read() } diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakePartitionReaderFactory.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendPartitionReaderFactory.scala similarity index 84% rename from fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakePartitionReaderFactory.scala rename to fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendPartitionReaderFactory.scala index 18c592801b..809e68dbde 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakePartitionReaderFactory.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendPartitionReaderFactory.scala @@ -15,11 +15,12 @@ * limitations under the License. */ -package org.apache.fluss.spark.read +package org.apache.fluss.spark.read.lake import org.apache.fluss.config.Configuration import org.apache.fluss.lake.source.{LakeSource, LakeSplit} import org.apache.fluss.metadata.TablePath +import org.apache.fluss.spark.read.{FlussAppendInputPartition, FlussAppendPartitionReader} import org.apache.fluss.types.RowType import org.apache.spark.sql.catalyst.InternalRow @@ -37,8 +38,8 @@ class FlussLakeAppendPartitionReaderFactory( extends PartitionReaderFactory { @transient private lazy val lakeSource: LakeSource[LakeSplit] = { - val source = FlussLakeSourceUtils.createLakeSource(tableProperties, tablePath) - source.withProject(FlussLakeSourceUtils.lakeProjection(projection)) + val source = FlussLakeUtils.createLakeSource(tableProperties, tablePath) + source.withProject(FlussLakeUtils.lakeProjection(projection)) source } @@ -47,7 +48,7 @@ class FlussLakeAppendPartitionReaderFactory( override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { partition match { case lake: FlussLakeInputPartition => - new FlussLakePartitionReader(tablePath, projectedRowType, lake, lakeSource) + new FlussLakeAppendPartitionReader(tablePath, projectedRowType, lake, lakeSource) case log: FlussAppendInputPartition => new FlussAppendPartitionReader(tablePath, projection, log, flussConfig) case _ => diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeBatch.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeBatch.scala new file mode 100644 index 0000000000..20ff096d52 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeBatch.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.read.lake + +import org.apache.fluss.client.initializer.{BucketOffsetsRetrieverImpl, OffsetsInitializer} +import org.apache.fluss.config.Configuration +import org.apache.fluss.metadata.{TableInfo, TablePath} +import org.apache.fluss.spark.read._ + +import org.apache.spark.sql.connector.read.InputPartition +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +import scala.collection.JavaConverters._ + +abstract class FlussLakeBatch( + tablePath: TablePath, + tableInfo: TableInfo, + readSchema: StructType, + options: CaseInsensitiveStringMap, + flussConfig: Configuration) + extends FlussBatch(tablePath, tableInfo, readSchema, flussConfig) { + + override val stoppingOffsetsInitializer: OffsetsInitializer = { + FlussOffsetInitializers.stoppingOffsetsInitializer(true, options, flussConfig) + } + + lazy val (partitions, isFallback) = doPlan() + + override def planInputPartitions(): Array[InputPartition] = partitions + + /** + * Plans input partitions for reading. The returned isFallback flag is true when no lake snapshot + * exists and the plan falls back to pure log reading. + */ + def doPlan(): (Array[InputPartition], Boolean) + + def getBucketOffsets( + initializer: OffsetsInitializer, + partitionName: String, + buckets: Seq[Int], + bucketOffsetsRetriever: BucketOffsetsRetrieverImpl): Map[Int, Long] = { + initializer + .getBucketOffsets(partitionName, buckets.map(Integer.valueOf).asJava, bucketOffsetsRetriever) + .asScala + .map(e => (e._1.intValue(), Long2long(e._2))) + .toMap + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeInputPartition.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeInputPartition.scala new file mode 100644 index 0000000000..6a32983011 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeInputPartition.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.read.lake + +import org.apache.fluss.metadata.TableBucket +import org.apache.fluss.spark.read.FlussInputPartition + +/** + * Represents an input partition for reading data from a single lake split. Each lake split maps to + * one Spark task, enabling parallel lake reads across splits. + * + * @param tableBucket + * the table bucket this split belongs to + * @param lakeSplitBytes + * serialized lake split data + */ +case class FlussLakeInputPartition(tableBucket: TableBucket, lakeSplitBytes: Array[Byte]) + extends FlussInputPartition { + override def toString: String = { + s"FlussLakeInputPartition{tableId=${tableBucket.getTableId}, bucketId=${tableBucket.getBucket}," + + s" partitionId=${tableBucket.getPartitionId}," + + s" splitSize=${lakeSplitBytes.length}}" + } +} + +/** + * Represents an input partition for reading data from a lake-enabled primary key table bucket. This + * partition combines lake snapshot splits with Fluss log tail for hybrid lake-kv reading. + * + * @param tableBucket + * the table bucket to read from + * @param lakeSplitBytes + * serialized lake splits data (may be null if no lake snapshot) + * @param logStartingOffset + * the log offset where incremental reading should start + * @param logStoppingOffset + * the log offset where incremental reading should end + */ +case class FlussLakeUpsertInputPartition( + tableBucket: TableBucket, + lakeSplitBytes: Array[Byte], + logStartingOffset: Long, + logStoppingOffset: Long) + extends FlussInputPartition { + override def toString: String = { + s"FlussLakeUpsertInputPartition{tableId=${tableBucket.getTableId}, bucketId=${tableBucket.getBucket}," + + s" partitionId=${tableBucket.getPartitionId}," + + s" lakeSplitSize=${if (lakeSplitBytes != null) lakeSplitBytes.length else 0}," + + s" logStartOffset=$logStartingOffset, logStopOffset=$logStoppingOffset}" + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertBatch.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertBatch.scala new file mode 100644 index 0000000000..ccbe35b2ae --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertBatch.scala @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.read.lake + +import org.apache.fluss.client.initializer.{BucketOffsetsRetrieverImpl, OffsetsInitializer, SnapshotOffsetsInitializer} +import org.apache.fluss.client.table.scanner.log.LogScanner +import org.apache.fluss.config.Configuration +import org.apache.fluss.exception.LakeTableSnapshotNotExistException +import org.apache.fluss.lake.serializer.SimpleVersionedSerializer +import org.apache.fluss.lake.source.LakeSplit +import org.apache.fluss.metadata.{ResolvedPartitionSpec, TableBucket, TableInfo, TablePath} +import org.apache.fluss.spark.read._ +import org.apache.fluss.utils.ExceptionUtils + +import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +/** + * Batch for reading lake-enabled primary key tables. Combines lake snapshot data with Fluss kv + * tail, merging them using sort-merge algorithm. + */ +class FlussLakeUpsertBatch( + tablePath: TablePath, + tableInfo: TableInfo, + readSchema: StructType, + options: CaseInsensitiveStringMap, + flussConfig: Configuration) + extends FlussLakeBatch(tablePath, tableInfo, readSchema, options, flussConfig) { + + override val startOffsetsInitializer: OffsetsInitializer = { + val offsetsInitializer = FlussOffsetInitializers.startOffsetsInitializer(options, flussConfig) + if (!offsetsInitializer.isInstanceOf[SnapshotOffsetsInitializer]) { + throw new UnsupportedOperationException("Upsert scan only support FULL startup mode.") + } + offsetsInitializer + } + + override def createReaderFactory(): PartitionReaderFactory = { + if (isFallback) { + new FlussUpsertPartitionReaderFactory(tablePath, projection, options, flussConfig) + } else { + new FlussLakeUpsertPartitionReaderFactory( + tableInfo.getProperties.toMap, + tablePath, + projection, + flussConfig) + } + } + + override def doPlan(): (Array[InputPartition], Boolean) = { + val lakeSnapshot = + try { + admin.getReadableLakeSnapshot(tablePath).get() + } catch { + case e: Exception => + if ( + ExceptionUtils + .stripExecutionException(e) + .isInstanceOf[LakeTableSnapshotNotExistException] + ) { + return (planFallbackPartitions(), true) + } + throw e + } + + val lakeSource = FlussLakeUtils.createLakeSource(tableInfo.getProperties.toMap, tablePath) + lakeSource.withProject(FlussLakeUtils.lakeProjection(projection)) + + val lakeSplits = lakeSource + .createPlanner(() => lakeSnapshot.getSnapshotId) + .plan() + + val splitSerializer = lakeSource.getSplitSerializer + val tableBucketsOffset = lakeSnapshot.getTableBucketsOffset + val bucketOffsetsRetriever = new BucketOffsetsRetrieverImpl(admin, tablePath) + + val partitions = if (tableInfo.isPartitioned) { + planPartitionedTable( + lakeSplits.asScala, + splitSerializer, + tableBucketsOffset, + bucketOffsetsRetriever) + } else { + planNonPartitionedTable( + lakeSplits.asScala, + splitSerializer, + tableBucketsOffset, + bucketOffsetsRetriever) + } + + (partitions, false) + } + + private def planNonPartitionedTable( + lakeSplits: Seq[LakeSplit], + splitSerializer: SimpleVersionedSerializer[LakeSplit], + tableBucketsOffset: java.util.Map[TableBucket, java.lang.Long], + bucketOffsetsRetriever: BucketOffsetsRetrieverImpl): Array[InputPartition] = { + val tableId = tableInfo.getTableId + val buckets = (0 until tableInfo.getNumBuckets).toSeq + + val stoppingOffsets = + getBucketOffsets(stoppingOffsetsInitializer, null, buckets, bucketOffsetsRetriever) + + // Group lake splits by bucket + val lakeSplitsByBucket = lakeSplits.groupBy(_.bucket()).mapValues(_.toSeq).toMap + + buckets.map { + bucketId => + val tableBucket = new TableBucket(tableId, bucketId) + val snapshotLogOffset = tableBucketsOffset.get(tableBucket) + val stoppingOffset = stoppingOffsets(bucketId) + + createLakeUpsertPartition( + tableBucket, + lakeSplitsByBucket.get(bucketId), + splitSerializer, + snapshotLogOffset, + stoppingOffset) + }.toArray + } + + private def planPartitionedTable( + lakeSplits: Seq[LakeSplit], + splitSerializer: SimpleVersionedSerializer[LakeSplit], + tableBucketsOffset: java.util.Map[TableBucket, java.lang.Long], + bucketOffsetsRetriever: BucketOffsetsRetrieverImpl): Array[InputPartition] = { + val tableId = tableInfo.getTableId + val buckets = (0 until tableInfo.getNumBuckets).toSeq + + val flussPartitionIdByName = mutable.LinkedHashMap.empty[String, Long] + partitionInfos.asScala.foreach { + pi => flussPartitionIdByName(pi.getPartitionName) = pi.getPartitionId + } + + val lakeSplitsByPartition = groupLakeSplitsByPartition(lakeSplits) + var lakeSplitPartitionId = -1L + + val lakePartitions = lakeSplitsByPartition.flatMap { + case (partitionName, splitsByBucket) => + flussPartitionIdByName.remove(partitionName) match { + case Some(partitionId) => + // Partition in both lake and Fluss + val stoppingOffsets = getBucketOffsets( + stoppingOffsetsInitializer, + partitionName, + buckets, + bucketOffsetsRetriever) + + buckets.map { + bucketId => + val tableBucket = new TableBucket(tableId, partitionId, bucketId) + val snapshotLogOffset = tableBucketsOffset.get(tableBucket) + val stoppingOffset = stoppingOffsets(bucketId) + + createLakeUpsertPartition( + tableBucket, + splitsByBucket.get(bucketId), + splitSerializer, + snapshotLogOffset, + stoppingOffset) + } + + case None => + // Partition only in lake (expired in Fluss) - create partitions with dummy partition id + val pid = lakeSplitPartitionId + lakeSplitPartitionId -= 1 + + buckets.map { + bucketId => + val tableBucket = new TableBucket(tableId, pid, bucketId) + // For expired partitions, there's no log tail to read + createLakeUpsertPartition( + tableBucket, + splitsByBucket.get(bucketId), + splitSerializer, + null, // no log offset + -1L // no stopping offset + ) + } + } + } + + // Partitions only in Fluss (not yet tiered) - read from earliest + val flussOnlyPartitions = flussPartitionIdByName.flatMap { + case (partitionName, partitionId) => + val stoppingOffsets = getBucketOffsets( + stoppingOffsetsInitializer, + partitionName, + buckets, + bucketOffsetsRetriever) + + buckets.map { + bucketId => + val tableBucket = new TableBucket(tableId, partitionId, bucketId) + val stoppingOffset = stoppingOffsets(bucketId) + + // No lake snapshot for this bucket, read log from earliest + FlussLakeUpsertInputPartition( + tableBucket, + null, // no lake splits + LogScanner.EARLIEST_OFFSET, + stoppingOffset + ): InputPartition + } + } + + (lakePartitions ++ flussOnlyPartitions).toArray + } + + private def groupLakeSplitsByPartition( + lakeSplits: Seq[LakeSplit]): Map[String, mutable.Map[Int, Seq[LakeSplit]]] = { + val grouped = mutable.LinkedHashMap.empty[String, mutable.Map[Int, Seq[LakeSplit]]] + lakeSplits.foreach { + split => + val partitionName = if (split.partition() == null || split.partition().isEmpty) { + "" + } else { + split.partition().asScala.mkString(ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR) + } + val bucketId = split.bucket() + val bucketMap = grouped.getOrElseUpdate(partitionName, mutable.Map.empty) + val splits = bucketMap.getOrElse(bucketId, Seq.empty) + bucketMap(bucketId) = splits :+ split + } + grouped.toMap + } + + private def createLakeUpsertPartition( + tableBucket: TableBucket, + lakeSplits: Option[Seq[LakeSplit]], + splitSerializer: SimpleVersionedSerializer[LakeSplit], + snapshotLogOffset: java.lang.Long, + stoppingOffset: Long): InputPartition = { + val (lakeSplitBytes, logStartingOffset) = + if (lakeSplits.isDefined && lakeSplits.get.nonEmpty) { + // Serialize all lake splits for this bucket into a single byte array + val serialized = FlussLakeUtils.serializeLakeSplits(lakeSplits.get, splitSerializer) + val startOffset = + if (snapshotLogOffset != null) snapshotLogOffset.longValue() + else LogScanner.EARLIEST_OFFSET + (serialized, startOffset) + } else { + // No lake splits for this bucket + (null, LogScanner.EARLIEST_OFFSET) + } + + FlussLakeUpsertInputPartition( + tableBucket, + lakeSplitBytes, + logStartingOffset, + stoppingOffset + ) + } + + private def planFallbackPartitions(): Array[InputPartition] = { + // Fallback to pure Fluss kv reading when no lake snapshot exists + val bucketOffsetsRetriever = new BucketOffsetsRetrieverImpl(admin, tablePath) + val buckets = (0 until tableInfo.getNumBuckets).toSeq + + def createPartitions( + partitionId: Option[Long], + partitionName: String): Array[InputPartition] = { + val stoppingOffsets = + getBucketOffsets(stoppingOffsetsInitializer, partitionName, buckets, bucketOffsetsRetriever) + + buckets.map { + bucketId => + val tableBucket = partitionId match { + case Some(pid) => new TableBucket(tableInfo.getTableId, pid, bucketId) + case None => new TableBucket(tableInfo.getTableId, bucketId) + } + // Use FlussUpsertInputPartition for fallback (reads from Fluss kv snapshot) + FlussUpsertInputPartition( + tableBucket, + -1L, // no snapshot + LogScanner.EARLIEST_OFFSET, + stoppingOffsets(bucketId) + ): InputPartition + }.toArray + } + + if (tableInfo.isPartitioned) { + partitionInfos.asScala.flatMap { + pi => createPartitions(Some(pi.getPartitionId), pi.getPartitionName) + }.toArray + } else { + createPartitions(None, null) + } + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertPartitionReader.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertPartitionReader.scala new file mode 100644 index 0000000000..21514809f5 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertPartitionReader.scala @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.read.lake + +import org.apache.fluss.client.table.scanner.batch.LakeSnapshotAndLogSplitScanner +import org.apache.fluss.config.Configuration +import org.apache.fluss.lake.source.{LakeSource, LakeSplit} +import org.apache.fluss.metadata.TablePath +import org.apache.fluss.row.InternalRow +import org.apache.fluss.spark.read.FlussPartitionReader + +import org.apache.spark.internal.Logging + +import scala.collection.JavaConverters._ + +/** + * Partition reader that reads lake-enabled primary key table data. + * + * Reads lake snapshot and Fluss log tail, merging them using sort-merge algorithm. For rows with + * the same primary key, log data takes precedence over lake snapshot data. + */ +class FlussLakeUpsertPartitionReader( + tablePath: TablePath, + lakeSource: LakeSource[LakeSplit], + projection: Array[Int], + flussPartition: FlussLakeUpsertInputPartition, + flussConfig: Configuration) + extends FlussPartitionReader(tablePath, flussConfig) + with Logging { + + private val lakeSplits = if (flussPartition.lakeSplitBytes != null) { + FlussLakeUtils.deserializeLakeSplits( + flussPartition.lakeSplitBytes, + lakeSource.getSplitSerializer) + } else { + null + } + private val flussLakeSnapshotAndLogSplitScanner = new LakeSnapshotAndLogSplitScanner( + table, + lakeSource, + lakeSplits, + flussPartition.tableBucket, + flussPartition.logStartingOffset, + flussPartition.logStoppingOffset, + projection) + private var mergedIterator: Iterator[InternalRow] = Iterator.empty + private var scanFinished = false + + initialize() + + override def next(): Boolean = { + if (closed || scanFinished) { + return false + } + + // Loop pollBatch until fetch valid records or scan is complete + while (true) { + if (mergedIterator.hasNext) { + currentRow = convertToSparkRow(mergedIterator.next()) + return true + } + + val flussRecords = flussLakeSnapshotAndLogSplitScanner.pollBatch(POLL_TIMEOUT) + if (flussRecords == null) { + scanFinished = true + return false + } + mergedIterator = flussRecords.asScala + } + false + } + + private def initialize(): Unit = { + val currentTs = System.currentTimeMillis() + logInfo(s"Prepare read lake-enabled pk table $tablePath $flussPartition") + + // Loop pollBatch until fetch valid records or scan is complete + var flussRecords = flussLakeSnapshotAndLogSplitScanner.pollBatch(POLL_TIMEOUT) + while (flussRecords != null && !flussRecords.hasNext) { + flussRecords = flussLakeSnapshotAndLogSplitScanner.pollBatch(POLL_TIMEOUT) + } + mergedIterator = if (flussRecords == null) { + scanFinished = true + Iterator.empty + } else { + flussRecords.asScala + } + val spend = (System.currentTimeMillis() - currentTs) / 1000 + logInfo(s"Initialize FlussLakeUpsertPartitionReader cost $spend(s)") + } + + override def close0(): Unit = { + if (mergedIterator != null) { + mergedIterator match { + case closeable: AutoCloseable => closeable.close() + case _ => // Do nothing + } + } + if (flussLakeSnapshotAndLogSplitScanner != null) { + flussLakeSnapshotAndLogSplitScanner.close() + } + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertPartitionReaderFactory.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertPartitionReaderFactory.scala new file mode 100644 index 0000000000..525e29ec7f --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertPartitionReaderFactory.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.read.lake + +import org.apache.fluss.config.Configuration +import org.apache.fluss.lake.source.{LakeSource, LakeSplit} +import org.apache.fluss.metadata.TablePath + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} + +import java.util + +/** + * Factory for lake-enabled primary key table reads. Creates readers that merge lake snapshot with + * Fluss kv tail. + */ +class FlussLakeUpsertPartitionReaderFactory( + tableProperties: util.Map[String, String], + tablePath: TablePath, + projection: Array[Int], + flussConfig: Configuration) + extends PartitionReaderFactory { + + @transient private lazy val lakeSource: LakeSource[LakeSplit] = { + FlussLakeUtils.createLakeSource(tableProperties, tablePath) + } + + override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { + partition match { + case lakeUpsert: FlussLakeUpsertInputPartition => + new FlussLakeUpsertPartitionReader( + tablePath, + lakeSource, + projection, + lakeUpsert, + flussConfig) + case _ => + throw new IllegalArgumentException(s"Unexpected partition type: ${partition.getClass}") + } + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeSourceUtils.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUtils.scala similarity index 55% rename from fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeSourceUtils.scala rename to fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUtils.scala index 41958c3442..a6dce8b08d 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeSourceUtils.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUtils.scala @@ -15,18 +15,20 @@ * limitations under the License. */ -package org.apache.fluss.spark.read +package org.apache.fluss.spark.read.lake import org.apache.fluss.config.{ConfigOptions, Configuration} import org.apache.fluss.lake.lakestorage.LakeStoragePluginSetUp +import org.apache.fluss.lake.serializer.SimpleVersionedSerializer import org.apache.fluss.lake.source.{LakeSource, LakeSplit} import org.apache.fluss.metadata.TablePath import org.apache.fluss.utils.PropertiesUtils +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream} import java.util +import java.util.ArrayList -/** Shared utilities for creating lake sources and projections. */ -object FlussLakeSourceUtils { +object FlussLakeUtils { def createLakeSource( tableProperties: util.Map[String, String], @@ -46,4 +48,49 @@ object FlussLakeSourceUtils { def lakeProjection(projection: Array[Int]): Array[Array[Int]] = { projection.map(i => Array(i)) } + + def serializeLakeSplits( + lakeSplits: Seq[LakeSplit], + splitSerializer: SimpleVersionedSerializer[LakeSplit]): Array[Byte] = { + val baos = new ByteArrayOutputStream() + val dos = new DataOutputStream(baos) + + // Write serializer version + dos.writeInt(splitSerializer.getVersion) + // Write number of splits + dos.writeInt(lakeSplits.size) + // Write each split + for (split <- lakeSplits) { + val serialized = splitSerializer.serialize(split) + dos.writeInt(serialized.length) + dos.write(serialized) + } + + dos.flush() + baos.toByteArray + } + + def deserializeLakeSplits( + bytes: Array[Byte], + splitSerializer: org.apache.fluss.lake.serializer.SimpleVersionedSerializer[LakeSplit]) + : java.util.List[LakeSplit] = { + val bais = new ByteArrayInputStream(bytes) + val dis = new DataInputStream(bais) + + // Read serializer version + val version = dis.readInt() + // Read number of splits + val numSplits = dis.readInt() + val splits = new ArrayList[LakeSplit](numSplits) + + // Read each split + for (_ <- 0 until numSplits) { + val length = dis.readInt() + val serialized = new Array[Byte](length) + dis.readFully(serialized) + splits.add(splitSerializer.deserialize(version, serialized)) + } + + splits + } } diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeIcebergLogTableReadTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeIcebergLogTableReadTest.scala deleted file mode 100644 index fad9bcd902..0000000000 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeIcebergLogTableReadTest.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.spark.lake - -import org.apache.fluss.config.Configuration -import org.apache.fluss.metadata.DataLakeFormat - -import java.nio.file.Files - -class SparkLakeIcebergLogTableReadTest extends SparkLakeLogTableReadTestBase { - - override protected def dataLakeFormat: DataLakeFormat = DataLakeFormat.ICEBERG - - override protected def flussConf: Configuration = { - val conf = super.flussConf - conf.setString("datalake.format", DataLakeFormat.ICEBERG.toString) - conf.setString("datalake.iceberg.type", "hadoop") - warehousePath = - Files.createTempDirectory("fluss-testing-iceberg-lake-read").resolve("warehouse").toString - conf.setString("datalake.iceberg.warehouse", warehousePath) - conf - } - - override protected def lakeCatalogConf: Configuration = { - val conf = new Configuration() - conf.setString("type", "hadoop") - conf.setString("warehouse", warehousePath) - conf - } -} diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTestBase.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTest.scala similarity index 69% rename from fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTestBase.scala rename to fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTest.scala index a238394616..d0127e1186 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTestBase.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTest.scala @@ -18,85 +18,14 @@ package org.apache.fluss.spark.lake import org.apache.fluss.config.{ConfigOptions, Configuration} -import org.apache.fluss.flink.tiering.LakeTieringJobBuilder -import org.apache.fluss.flink.tiering.source.TieringSourceOptions -import org.apache.fluss.metadata.{DataLakeFormat, TableBucket} -import org.apache.fluss.spark.FlussSparkTestBase +import org.apache.fluss.metadata.DataLakeFormat import org.apache.fluss.spark.SparkConnectorOptions.BUCKET_NUMBER -import org.apache.flink.api.common.RuntimeExecutionMode -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.spark.sql.Row -import java.time.Duration +import java.nio.file.Files -/** - * Base class for lake-enabled log table read tests. Subclasses provide the lake format config and - * lake catalog configuration. - */ -abstract class SparkLakeLogTableReadTestBase extends FlussSparkTestBase { - - protected var warehousePath: String = _ - - /** The lake format used by this test. */ - protected def dataLakeFormat: DataLakeFormat - - /** Lake catalog configuration specific to the format. */ - protected def lakeCatalogConf: Configuration - - private val TIERING_PARALLELISM = 2 - private val CHECKPOINT_INTERVAL_MS = 1000L - private val POLL_INTERVAL: Duration = Duration.ofMillis(500L) - private val SYNC_TIMEOUT: Duration = Duration.ofMinutes(2) - private val SYNC_POLL_INTERVAL_MS = 500L - - /** Tier all pending data for the given table to the lake. */ - protected def tierToLake(tableName: String): Unit = { - val tableId = loadFlussTable(createTablePath(tableName)).getTableInfo.getTableId - - val execEnv = StreamExecutionEnvironment.getExecutionEnvironment - execEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING) - execEnv.setParallelism(TIERING_PARALLELISM) - execEnv.enableCheckpointing(CHECKPOINT_INTERVAL_MS) - - val flussConfig = new Configuration(flussServer.getClientConfig) - flussConfig.set(TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL, POLL_INTERVAL) - - val jobClient = LakeTieringJobBuilder - .newBuilder( - execEnv, - flussConfig, - lakeCatalogConf, - new Configuration(), - dataLakeFormat.toString) - .build() - - try { - val tableBucket = new TableBucket(tableId, 0) - val deadline = System.currentTimeMillis() + SYNC_TIMEOUT.toMillis - var synced = false - while (!synced && System.currentTimeMillis() < deadline) { - try { - val replica = flussServer.waitAndGetLeaderReplica(tableBucket) - synced = replica.getLogTablet.getLakeTableSnapshotId >= 0 - } catch { - case _: Exception => - } - if (!synced) Thread.sleep(SYNC_POLL_INTERVAL_MS) - } - assert(synced, s"Bucket $tableBucket not synced to lake within $SYNC_TIMEOUT") - } finally { - jobClient.cancel().get() - } - } - - override protected def withTable(tableNames: String*)(f: => Unit): Unit = { - try { - f - } finally { - tableNames.foreach(t => sql(s"DROP TABLE IF EXISTS $DEFAULT_DATABASE.$t")) - } - } +abstract class SparkLakeLogTableReadTest extends SparkLakeTableReadTestBase { test("Spark Lake Read: log table falls back when no lake snapshot") { withTable("t") { @@ -251,3 +180,46 @@ abstract class SparkLakeLogTableReadTestBase extends FlussSparkTestBase { } } } + +class SparkLakePaimonLogTableReadTest extends SparkLakeLogTableReadTest { + override protected def dataLakeFormat: DataLakeFormat = DataLakeFormat.PAIMON + + override protected def flussConf: Configuration = { + val conf = super.flussConf + conf.setString("datalake.format", DataLakeFormat.PAIMON.toString) + conf.setString("datalake.paimon.metastore", "filesystem") + conf.setString("datalake.paimon.cache-enabled", "false") + warehousePath = + Files.createTempDirectory("fluss-testing-lake-read").resolve("warehouse").toString + conf.setString("datalake.paimon.warehouse", warehousePath) + conf + } + + override protected def lakeCatalogConf: Configuration = { + val conf = new Configuration() + conf.setString("metastore", "filesystem") + conf.setString("warehouse", warehousePath) + conf + } +} + +class SparkLakeIcebergLogTableReadTest extends SparkLakeLogTableReadTest { + override protected def dataLakeFormat: DataLakeFormat = DataLakeFormat.ICEBERG + + override protected def flussConf: Configuration = { + val conf = super.flussConf + conf.setString("datalake.format", DataLakeFormat.ICEBERG.toString) + conf.setString("datalake.iceberg.type", "hadoop") + warehousePath = + Files.createTempDirectory("fluss-testing-iceberg-lake-read").resolve("warehouse").toString + conf.setString("datalake.iceberg.warehouse", warehousePath) + conf + } + + override protected def lakeCatalogConf: Configuration = { + val conf = new Configuration() + conf.setString("type", "hadoop") + conf.setString("warehouse", warehousePath) + conf + } +} diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePaimonLogTableReadTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePaimonLogTableReadTest.scala deleted file mode 100644 index 2e2b941f4b..0000000000 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePaimonLogTableReadTest.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.spark.lake - -import org.apache.fluss.config.Configuration -import org.apache.fluss.metadata.DataLakeFormat - -import java.nio.file.Files - -class SparkLakePaimonLogTableReadTest extends SparkLakeLogTableReadTestBase { - - override protected def dataLakeFormat: DataLakeFormat = DataLakeFormat.PAIMON - - override protected def flussConf: Configuration = { - val conf = super.flussConf - conf.setString("datalake.format", DataLakeFormat.PAIMON.toString) - conf.setString("datalake.paimon.metastore", "filesystem") - conf.setString("datalake.paimon.cache-enabled", "false") - warehousePath = - Files.createTempDirectory("fluss-testing-lake-read").resolve("warehouse").toString - conf.setString("datalake.paimon.warehouse", warehousePath) - conf - } - - override protected def lakeCatalogConf: Configuration = { - val conf = new Configuration() - conf.setString("metastore", "filesystem") - conf.setString("warehouse", warehousePath) - conf - } -} diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePrimaryKeyTableReadTestBase.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePrimaryKeyTableReadTestBase.scala new file mode 100644 index 0000000000..e8a4d23f59 --- /dev/null +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePrimaryKeyTableReadTestBase.scala @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.lake + +import org.apache.fluss.config.{ConfigOptions, Configuration} +import org.apache.fluss.metadata.DataLakeFormat +import org.apache.fluss.spark.SparkConnectorOptions.{BUCKET_NUMBER, PRIMARY_KEY} + +import org.apache.spark.sql.Row + +import java.nio.file.Files + +/** + * Base class for lake-enabled primary key table read tests. Subclasses provide the lake format + * config and lake catalog configuration. + */ +abstract class SparkLakePrimaryKeyTableReadTestBase extends SparkLakeTableReadTestBase { + + test("Spark Lake Read: pk table falls back when no lake snapshot") { + // Test non-partitioned table + withTable("t_non_partitioned") { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.t_non_partitioned (id INT, name STRING, score INT) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, + | '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s', + | '${PRIMARY_KEY.key()}' = 'id', + | '${BUCKET_NUMBER.key()}' = 1) + |""".stripMargin) + + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_non_partitioned VALUES + |(1, "alice", 90), (2, "bob", 85), (3, "charlie", 95) + |""".stripMargin) + + checkAnswer( + sql(s"SELECT * FROM $DEFAULT_DATABASE.t_non_partitioned ORDER BY id"), + Row(1, "alice", 90) :: Row(2, "bob", 85) :: Row(3, "charlie", 95) :: Nil + ) + } + + // Test partitioned table + withTable("t_partitioned") { + sql( + s""" + |CREATE TABLE $DEFAULT_DATABASE.t_partitioned (id INT, name STRING, score INT, dt STRING) + | PARTITIONED BY (dt) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, + | '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s', + | '${PRIMARY_KEY.key()}' = 'id,dt', + | '${BUCKET_NUMBER.key()}' = 1) + |""".stripMargin) + + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_partitioned VALUES + |(1, "alice", 90, "2026-01-01"), + |(2, "bob", 85, "2026-01-01"), + |(3, "charlie", 95, "2026-01-02") + |""".stripMargin) + + checkAnswer( + sql(s"SELECT * FROM $DEFAULT_DATABASE.t_partitioned ORDER BY id"), + Row(1, "alice", 90, "2026-01-01") :: + Row(2, "bob", 85, "2026-01-01") :: + Row(3, "charlie", 95, "2026-01-02") :: Nil + ) + + // Test column projection with different order + checkAnswer( + sql(s"SELECT dt, name, id FROM $DEFAULT_DATABASE.t_partitioned ORDER BY id"), + Row("2026-01-01", "alice", 1) :: + Row("2026-01-01", "bob", 2) :: + Row("2026-01-02", "charlie", 3) :: Nil + ) + } + } + + test("Spark Lake Read: pk table lake-only (all data in lake, no kv tail)") { + // Test non-partitioned table + withTable("t_lake_only") { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.t_lake_only (id INT, name STRING, score INT) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, + | '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s', + | '${PRIMARY_KEY.key()}' = 'id', + | '${BUCKET_NUMBER.key()}' = 1) + |""".stripMargin) + + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_lake_only VALUES + |(1, "alice", 90), (2, "bob", 85), (3, "charlie", 95) + |""".stripMargin) + + tierToLake("t_lake_only") + + checkAnswer( + sql(s"SELECT * FROM $DEFAULT_DATABASE.t_lake_only ORDER BY id"), + Row(1, "alice", 90) :: Row(2, "bob", 85) :: Row(3, "charlie", 95) :: Nil + ) + } + + // Test partitioned table + withTable("t_lake_only_partitioned") { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.t_lake_only_partitioned (id INT, name STRING, score INT, dt STRING) + | PARTITIONED BY (dt) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, + | '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s', + | '${PRIMARY_KEY.key()}' = 'id,dt', + | '${BUCKET_NUMBER.key()}' = 1) + |""".stripMargin) + + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_lake_only_partitioned VALUES + |(1, "alice", 90, "2026-01-01"), + |(2, "bob", 85, "2026-01-01"), + |(3, "charlie", 95, "2026-01-02") + |""".stripMargin) + + tierToLake("t_lake_only_partitioned") + + checkAnswer( + sql(s"SELECT * FROM $DEFAULT_DATABASE.t_lake_only_partitioned ORDER BY id"), + Row(1, "alice", 90, "2026-01-01") :: + Row(2, "bob", 85, "2026-01-01") :: + Row(3, "charlie", 95, "2026-01-02") :: Nil + ) + + // Test column projection with different order + checkAnswer( + sql(s"SELECT dt, name, id FROM $DEFAULT_DATABASE.t_lake_only_partitioned ORDER BY id"), + Row("2026-01-01", "alice", 1) :: + Row("2026-01-01", "bob", 2) :: + Row("2026-01-02", "charlie", 3) :: Nil + ) + } + } + + test("Spark Lake Read: pk table union read (lake + kv tail)") { + // Test non-partitioned table + withTable("t_union") { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.t_union (id INT, name STRING, score INT) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, + | '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s', + | '${PRIMARY_KEY.key()}' = 'id', + | '${BUCKET_NUMBER.key()}' = 1) + |""".stripMargin) + + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_union VALUES + |(1, "alice", 90), (2, "bob", 85), (3, "charlie", 95) + |""".stripMargin) + + tierToLake("t_union") + + // Insert more data after tiering (this will be in kv tail) + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_union VALUES + |(4, "david", 88), (5, "eve", 92) + |""".stripMargin) + + // Union read: should see both lake data and kv tail data + checkAnswer( + sql(s"SELECT * FROM $DEFAULT_DATABASE.t_union ORDER BY id"), + Row(1, "alice", 90) :: Row(2, "bob", 85) :: Row(3, "charlie", 95) :: + Row(4, "david", 88) :: Row(5, "eve", 92) :: Nil + ) + } + + // Test partitioned table + withTable("t_union_partitioned") { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.t_union_partitioned (id INT, name STRING, score INT, dt STRING) + | PARTITIONED BY (dt) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, + | '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s', + | '${PRIMARY_KEY.key()}' = 'id,dt', + | '${BUCKET_NUMBER.key()}' = 1) + |""".stripMargin) + + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_union_partitioned VALUES + |(1, "alice", 90, "2026-01-01"), + |(2, "bob", 85, "2026-01-01"), + |(3, "charlie", 95, "2026-01-02") + |""".stripMargin) + + tierToLake("t_union_partitioned") + + // Insert more data after tiering + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_union_partitioned VALUES + |(4, "david", 88, "2026-01-01"), + |(5, "eve", 92, "2026-01-03") + |""".stripMargin) + + // Union read with partition filter + checkAnswer( + sql(s""" + |SELECT * FROM $DEFAULT_DATABASE.t_union_partitioned + |WHERE dt = '2026-01-01' ORDER BY id""".stripMargin), + Row(1, "alice", 90, "2026-01-01") :: + Row(2, "bob", 85, "2026-01-01") :: + Row(4, "david", 88, "2026-01-01") :: Nil + ) + + // Union read all partitions + checkAnswer( + sql(s"SELECT * FROM $DEFAULT_DATABASE.t_union_partitioned ORDER BY id"), + Row(1, "alice", 90, "2026-01-01") :: + Row(2, "bob", 85, "2026-01-01") :: + Row(3, "charlie", 95, "2026-01-02") :: + Row(4, "david", 88, "2026-01-01") :: + Row(5, "eve", 92, "2026-01-03") :: Nil + ) + + // Test column projection with different order + checkAnswer( + sql(s"SELECT dt, name, id FROM $DEFAULT_DATABASE.t_union_partitioned ORDER BY id"), + Row("2026-01-01", "alice", 1) :: + Row("2026-01-01", "bob", 2) :: + Row("2026-01-02", "charlie", 3) :: + Row("2026-01-01", "david", 4) :: + Row("2026-01-03", "eve", 5) :: Nil + ) + } + } + + test("Spark Lake Read: pk table union read with updates") { + // Test non-partitioned table + withTable("t_update") { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.t_update (id INT, name STRING, score INT) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, + | '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s', + | '${PRIMARY_KEY.key()}' = 'id', + | '${BUCKET_NUMBER.key()}' = 1) + |""".stripMargin) + + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_update VALUES + |(1, "alice", 90), (2, "bob", 85), (3, "charlie", 95) + |""".stripMargin) + + tierToLake("t_update") + + // Update existing record and insert new record + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_update VALUES + |(2, "bob_updated", 100), (4, "david", 88) + |""".stripMargin) + + // Union read: should see updated value for id=2 from kv tail + checkAnswer( + sql(s"SELECT * FROM $DEFAULT_DATABASE.t_update ORDER BY id"), + Row(1, "alice", 90) :: Row(2, "bob_updated", 100) :: + Row(3, "charlie", 95) :: Row(4, "david", 88) :: Nil + ) + } + + // Test partitioned table + withTable("t_update_partitioned") { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.t_update_partitioned (id INT, name STRING, score INT, dt STRING) + | PARTITIONED BY (dt) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, + | '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s', + | '${PRIMARY_KEY.key()}' = 'id,dt', + | '${BUCKET_NUMBER.key()}' = 1) + |""".stripMargin) + + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_update_partitioned VALUES + |(1, "alice", 90, "2026-01-01"), + |(2, "bob", 85, "2026-01-01"), + |(3, "charlie", 95, "2026-01-02") + |""".stripMargin) + + tierToLake("t_update_partitioned") + + // Update existing record and insert new record + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_update_partitioned VALUES + |(2, "bob_updated", 100, "2026-01-01"), + |(4, "david", 88, "2026-01-02") + |""".stripMargin) + + // Union read: should see updated value for id=2 from kv tail + checkAnswer( + sql(s"SELECT * FROM $DEFAULT_DATABASE.t_update_partitioned ORDER BY id"), + Row(1, "alice", 90, "2026-01-01") :: + Row(2, "bob_updated", 100, "2026-01-01") :: + Row(3, "charlie", 95, "2026-01-02") :: + Row(4, "david", 88, "2026-01-02") :: Nil + ) + + // Test column projection with different order + checkAnswer( + sql(s"SELECT dt, name, id FROM $DEFAULT_DATABASE.t_update_partitioned ORDER BY id"), + Row("2026-01-01", "alice", 1) :: + Row("2026-01-01", "bob_updated", 2) :: + Row("2026-01-02", "charlie", 3) :: + Row("2026-01-02", "david", 4) :: Nil + ) + } + } +} + +class SparkLakePaimonPrimaryKeyTableReadTest extends SparkLakePrimaryKeyTableReadTestBase { + + override protected def dataLakeFormat: DataLakeFormat = DataLakeFormat.PAIMON + + override protected def flussConf: Configuration = { + val conf = super.flussConf + conf.setString("datalake.format", DataLakeFormat.PAIMON.toString) + conf.setString("datalake.paimon.metastore", "filesystem") + conf.setString("datalake.paimon.cache-enabled", "false") + warehousePath = + Files.createTempDirectory("fluss-testing-paimon-pk-lake-read").resolve("warehouse").toString + conf.setString("datalake.paimon.warehouse", warehousePath) + conf + } + + override protected def lakeCatalogConf: Configuration = { + val conf = new Configuration() + conf.setString("metastore", "filesystem") + conf.setString("warehouse", warehousePath) + conf + } +} diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeTableReadTestBase.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeTableReadTestBase.scala new file mode 100644 index 0000000000..f8a5b65b2c --- /dev/null +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeTableReadTestBase.scala @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.lake + +import org.apache.fluss.config.Configuration +import org.apache.fluss.flink.tiering.LakeTieringJobBuilder +import org.apache.fluss.flink.tiering.source.TieringSourceOptions +import org.apache.fluss.metadata.{DataLakeFormat, TableBucket} +import org.apache.fluss.spark.FlussSparkTestBase + +import org.apache.flink.api.common.RuntimeExecutionMode +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment + +import java.time.Duration + +import scala.jdk.CollectionConverters._ + +/** + * Base class for lake-enabled table read tests. Subclasses provide the lake format config and lake + * catalog configuration. + */ +abstract class SparkLakeTableReadTestBase extends FlussSparkTestBase { + + protected var warehousePath: String = _ + + /** The lake format used by this test. */ + protected def dataLakeFormat: DataLakeFormat + + /** Lake catalog configuration specific to the format. */ + protected def lakeCatalogConf: Configuration + + private val TIERING_PARALLELISM = 2 + private val CHECKPOINT_INTERVAL_MS = 1000L + private val POLL_INTERVAL: Duration = Duration.ofMillis(500L) + private val SYNC_TIMEOUT: Duration = Duration.ofMinutes(2) + private val SYNC_POLL_INTERVAL_MS = 500L + + /** Tier all pending data for the given table to the lake. */ + protected def tierToLake(tableName: String): Unit = { + val tablePath = createTablePath(tableName) + val tableInfo = loadFlussTable(tablePath).getTableInfo + val tableId = tableInfo.getTableId + val numBuckets = tableInfo.getNumBuckets + + val execEnv = StreamExecutionEnvironment.getExecutionEnvironment + execEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING) + execEnv.setParallelism(TIERING_PARALLELISM) + execEnv.enableCheckpointing(CHECKPOINT_INTERVAL_MS) + + val flussConfig = new Configuration(flussServer.getClientConfig) + flussConfig.set(TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL, POLL_INTERVAL) + + val jobClient = LakeTieringJobBuilder + .newBuilder( + execEnv, + flussConfig, + lakeCatalogConf, + new Configuration(), + dataLakeFormat.toString) + .build() + + try { + // Collect all buckets to wait for sync + val tableBuckets = if (tableInfo.isPartitioned) { + // For partitioned table, get all partitions and their buckets + val partitionInfos = admin.listPartitionInfos(tablePath).get() + partitionInfos.asScala.flatMap { + partitionInfo => + (0 until numBuckets).map { + bucket => new TableBucket(tableId, partitionInfo.getPartitionId, bucket) + } + }.toSet + } else { + // For non-partitioned table, just use bucket 0 + Set(new TableBucket(tableId, 0)) + } + + val deadline = System.currentTimeMillis() + SYNC_TIMEOUT.toMillis + val syncedBuckets = scala.collection.mutable.Set[TableBucket]() + + while (syncedBuckets.size < tableBuckets.size && System.currentTimeMillis() < deadline) { + tableBuckets.foreach { + tableBucket => + if (!syncedBuckets.contains(tableBucket)) { + try { + val replica = flussServer.waitAndGetLeaderReplica(tableBucket) + if (replica.getLogTablet.getLakeTableSnapshotId >= 0) { + syncedBuckets.add(tableBucket) + } + } catch { + case _: Exception => + } + } + } + if (syncedBuckets.size < tableBuckets.size) { + Thread.sleep(SYNC_POLL_INTERVAL_MS) + } + } + + assert( + syncedBuckets.size == tableBuckets.size, + s"Not all buckets synced to lake within $SYNC_TIMEOUT. " + + s"Synced: ${syncedBuckets.size}, Total: ${tableBuckets.size}" + ) + } finally { + jobClient.cancel().get() + } + } + + override protected def withTable(tableNames: String*)(f: => Unit): Unit = { + try { + f + } finally { + tableNames.foreach(t => sql(s"DROP TABLE IF EXISTS $DEFAULT_DATABASE.$t")) + } + } + +}