From acf7c5ac84d7c31bdb536e3176dd09eec881326c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BE=8A=E5=B7=9D?= Date: Thu, 9 Apr 2026 14:32:02 +0800 Subject: [PATCH 1/7] add pk table ut and move to lake dir --- .../LakeSnapshotAndLogSplitScanner.java | 47 +- .../flink/lake/LakeSplitReaderGenerator.java | 18 +- .../org/apache/fluss/spark/SparkTable.scala | 8 +- .../spark/read/FlussInputPartition.scala | 29 +- .../apache/fluss/spark/read/FlussScan.scala | 25 + .../fluss/spark/read/FlussScanBuilder.scala | 13 + .../read/{ => lake}/FlussLakeBatch.scala | 3 +- .../read/lake/FlussLakeInputPartition.scala | 66 +++ .../{ => lake}/FlussLakePartitionReader.scala | 3 +- .../FlussLakePartitionReaderFactory.scala | 3 +- .../{ => lake}/FlussLakeSourceUtils.scala | 2 +- .../read/lake/FlussLakeUpsertBatch.scala | 365 +++++++++++++++ .../lake/FlussLakeUpsertPartitionReader.scala | 143 ++++++ ...lussLakeUpsertPartitionReaderFactory.scala | 58 +++ .../fluss/spark/FlussSparkTestBase.scala | 1 + .../SparkLakeIcebergPkTableReadTest.scala | 45 ++ .../lake/SparkLakePaimonPkTableReadTest.scala | 46 ++ .../lake/SparkLakePkTableReadTestBase.scala | 433 ++++++++++++++++++ 18 files changed, 1268 insertions(+), 40 deletions(-) rename {fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader => fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch}/LakeSnapshotAndLogSplitScanner.java (82%) rename fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/{ => lake}/FlussLakeBatch.scala (99%) create mode 100644 fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeInputPartition.scala rename fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/{ => lake}/FlussLakePartitionReader.scala (96%) rename fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/{ => lake}/FlussLakePartitionReaderFactory.scala (93%) rename fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/{ => lake}/FlussLakeSourceUtils.scala (98%) create mode 100644 fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertBatch.scala create mode 100644 fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertPartitionReader.scala create mode 100644 fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertPartitionReaderFactory.scala create mode 100644 fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeIcebergPkTableReadTest.scala create mode 100644 fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePaimonPkTableReadTest.scala create mode 100644 fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePkTableReadTestBase.scala diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LakeSnapshotAndLogSplitScanner.java similarity index 82% rename from fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java rename to fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LakeSnapshotAndLogSplitScanner.java index 0caf10d55d..f25b878d37 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LakeSnapshotAndLogSplitScanner.java @@ -16,15 +16,13 @@ * limitations under the License. */ -package org.apache.fluss.flink.lake.reader; +package org.apache.fluss.client.table.scanner.batch; import org.apache.fluss.client.table.Table; import org.apache.fluss.client.table.scanner.ScanRecord; import org.apache.fluss.client.table.scanner.SortMergeReader; -import org.apache.fluss.client.table.scanner.batch.BatchScanner; import org.apache.fluss.client.table.scanner.log.LogScanner; import org.apache.fluss.client.table.scanner.log.ScanRecords; -import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit; import org.apache.fluss.lake.source.LakeSource; import org.apache.fluss.lake.source.LakeSplit; import org.apache.fluss.lake.source.RecordReader; @@ -53,14 +51,14 @@ /** A scanner to merge the lakehouse's snapshot and change log. */ public class LakeSnapshotAndLogSplitScanner implements BatchScanner { - private final LakeSnapshotAndFlussLogSplit lakeSnapshotSplitAndFlussLogSplit; + private final @Nullable List lakeSplits; private Comparator rowComparator; private List> lakeRecordIterators = new ArrayList<>(); private final LakeSource lakeSource; private final int[] pkIndexes; - // the indexes of primary key in emitted row by paimon and fluss + // the indexes of primary key in emitted row by lake and fluss private int[] keyIndexesInRow; @Nullable private int[] adjustProjectedFields; @@ -76,11 +74,15 @@ public class LakeSnapshotAndLogSplitScanner implements BatchScanner { public LakeSnapshotAndLogSplitScanner( Table table, LakeSource lakeSource, - LakeSnapshotAndFlussLogSplit lakeSnapshotAndFlussLogSplit, + @Nullable List lakeSplits, + TableBucket tableBucket, + long startingOffset, + long stoppingOffset, @Nullable int[] projectedFields) { this.pkIndexes = table.getTableInfo().getSchema().getPrimaryKeyIndexes(); - this.lakeSnapshotSplitAndFlussLogSplit = lakeSnapshotAndFlussLogSplit; + this.lakeSplits = lakeSplits; this.lakeSource = lakeSource; + this.stoppingOffset = stoppingOffset; int[] newProjectedFields = getNeedProjectFields(table, projectedFields); this.logScanner = table.newScan().project(newProjectedFields).createLogScanner(); @@ -89,29 +91,14 @@ public LakeSnapshotAndLogSplitScanner( .mapToObj(field -> new int[] {field}) .toArray(int[][]::new)); - TableBucket tableBucket = lakeSnapshotAndFlussLogSplit.getTableBucket(); if (tableBucket.getPartitionId() != null) { this.logScanner.subscribe( - tableBucket.getPartitionId(), - tableBucket.getBucket(), - lakeSnapshotAndFlussLogSplit.getStartingOffset()); + tableBucket.getPartitionId(), tableBucket.getBucket(), startingOffset); } else { - this.logScanner.subscribe( - tableBucket.getBucket(), lakeSnapshotAndFlussLogSplit.getStartingOffset()); + this.logScanner.subscribe(tableBucket.getBucket(), startingOffset); } - this.stoppingOffset = - lakeSnapshotAndFlussLogSplit - .getStoppingOffset() - .orElseThrow( - () -> - new RuntimeException( - "StoppingOffset is null for split: " - + lakeSnapshotAndFlussLogSplit)); - - this.logScanFinished = - lakeSnapshotAndFlussLogSplit.getStartingOffset() >= stoppingOffset - || stoppingOffset <= 0; + this.logScanFinished = startingOffset >= stoppingOffset || stoppingOffset <= 0; } private int[] getNeedProjectFields(Table flussTable, @Nullable int[] projectedFields) { @@ -169,11 +156,10 @@ private int findIndex(int[] array, int target) { public CloseableIterator pollBatch(Duration timeout) throws IOException { if (logScanFinished) { if (lakeRecordIterators.isEmpty()) { - if (lakeSnapshotSplitAndFlussLogSplit.getLakeSplits() == null - || lakeSnapshotSplitAndFlussLogSplit.getLakeSplits().isEmpty()) { + if (lakeSplits == null || lakeSplits.isEmpty()) { lakeRecordIterators = Collections.emptyList(); } else { - for (LakeSplit lakeSplit : lakeSnapshotSplitAndFlussLogSplit.getLakeSplits()) { + for (LakeSplit lakeSplit : lakeSplits) { lakeRecordIterators.add( lakeSource.createRecordReader(() -> lakeSplit).read()); } @@ -195,12 +181,11 @@ public CloseableIterator pollBatch(Duration timeout) throws IOExcep } else { if (lakeRecordIterators.isEmpty()) { List recordReaders = new ArrayList<>(); - if (lakeSnapshotSplitAndFlussLogSplit.getLakeSplits() == null - || lakeSnapshotSplitAndFlussLogSplit.getLakeSplits().isEmpty()) { + if (lakeSplits == null || lakeSplits.isEmpty()) { // pass null split to get rowComparator recordReaders.add(lakeSource.createRecordReader(() -> null)); } else { - for (LakeSplit lakeSplit : lakeSnapshotSplitAndFlussLogSplit.getLakeSplits()) { + for (LakeSplit lakeSplit : lakeSplits) { recordReaders.add(lakeSource.createRecordReader(() -> lakeSplit)); } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java index d48dbb7946..229f1428d9 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java @@ -19,7 +19,7 @@ import org.apache.fluss.client.table.Table; import org.apache.fluss.client.table.scanner.batch.BatchScanner; -import org.apache.fluss.flink.lake.reader.LakeSnapshotAndLogSplitScanner; +import org.apache.fluss.client.table.scanner.batch.LakeSnapshotAndLogSplitScanner; import org.apache.fluss.flink.lake.reader.LakeSnapshotScanner; import org.apache.fluss.flink.lake.reader.SeekableLakeSnapshotSplitScanner; import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit; @@ -101,9 +101,23 @@ private BatchScanner getBatchScanner(LakeSnapshotAndFlussLogSplit lakeSplit) { lakeSplit.getCurrentLakeSplitIndex()); } else { + long stoppingOffset = + lakeSplit + .getStoppingOffset() + .orElseThrow( + () -> + new RuntimeException( + "StoppingOffset is null for split: " + + lakeSplit)); lakeBatchScanner = new LakeSnapshotAndLogSplitScanner( - table, lakeSource, lakeSplit, projectedFields); + table, + lakeSource, + lakeSplit.getLakeSplits(), + lakeSplit.getTableBucket(), + lakeSplit.getStartingOffset(), + stoppingOffset, + projectedFields); } return lakeBatchScanner; } diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala index c2ad05081c..0a9d9663f6 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala @@ -21,7 +21,7 @@ import org.apache.fluss.client.admin.Admin import org.apache.fluss.config.{Configuration => FlussConfiguration} import org.apache.fluss.metadata.{TableInfo, TablePath} import org.apache.fluss.spark.catalog.{AbstractSparkTable, SupportsFlussPartitionManagement} -import org.apache.fluss.spark.read.{FlussAppendScanBuilder, FlussLakeAppendScanBuilder, FlussUpsertScanBuilder} +import org.apache.fluss.spark.read.{FlussAppendScanBuilder, FlussLakeAppendScanBuilder, FlussLakeUpsertScanBuilder, FlussUpsertScanBuilder} import org.apache.fluss.spark.write.{FlussAppendWriteBuilder, FlussUpsertWriteBuilder} import org.apache.spark.sql.catalyst.SQLConfHelper @@ -75,7 +75,11 @@ class SparkTable( new FlussAppendScanBuilder(tablePath, tableInfo, options, flussConfig) } } else { - new FlussUpsertScanBuilder(tablePath, tableInfo, options, flussConfig) + if (isDataLakeEnabled) { + new FlussLakeUpsertScanBuilder(tablePath, tableInfo, options, flussConfig) + } else { + new FlussUpsertScanBuilder(tablePath, tableInfo, options, flussConfig) + } } } } diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala index ccf89cc87c..9ae5bf1f52 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala @@ -85,6 +85,33 @@ case class FlussUpsertInputPartition( override def toString: String = { s"FlussUpsertInputPartition{tableId=${tableBucket.getTableId}, bucketId=${tableBucket.getBucket}," + s" partitionId=${tableBucket.getPartitionId}, snapshotId=$snapshotId," + - s" logStartOffset=$logStartingOffset, logStopOffset=$logStoppingOffset" + s" logStartOffset=$logStartingOffset, logStopOffset=$logStoppingOffset}" + } +} + +/** + * Represents an input partition for reading data from a lake-enabled primary key table bucket. This + * partition combines lake snapshot splits with Fluss log tail for hybrid lake-kv reading. + * + * @param tableBucket + * the table bucket to read from + * @param lakeSplitBytes + * serialized lake splits data (may be null if no lake snapshot) + * @param logStartingOffset + * the log offset where incremental reading should start + * @param logStoppingOffset + * the log offset where incremental reading should end + */ +case class FlussLakeUpsertInputPartition( + tableBucket: TableBucket, + lakeSplitBytes: Array[Byte], + logStartingOffset: Long, + logStoppingOffset: Long) + extends FlussInputPartition { + override def toString: String = { + s"FlussLakeUpsertInputPartition{tableId=${tableBucket.getTableId}, bucketId=${tableBucket.getBucket}," + + s" partitionId=${tableBucket.getPartitionId}," + + s" lakeSplitSize=${if (lakeSplitBytes != null) lakeSplitBytes.length else 0}," + + s" logStartOffset=$logStartingOffset, logStopOffset=$logStoppingOffset}" } } diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala index d4e14bd479..13cffe4fed 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala @@ -20,6 +20,7 @@ package org.apache.fluss.spark.read import org.apache.fluss.config.Configuration import org.apache.fluss.metadata.{TableInfo, TablePath} import org.apache.fluss.spark.SparkConversions +import org.apache.fluss.spark.read.lake.{FlussLakeAppendBatch, FlussLakeUpsertBatch} import org.apache.spark.sql.connector.read.{Batch, Scan} import org.apache.spark.sql.connector.read.streaming.MicroBatchStream @@ -108,3 +109,27 @@ case class FlussUpsertScan( checkpointLocation) } } + +/** Fluss Lake Upsert Scan for lake-enabled primary key tables. */ +case class FlussLakeUpsertScan( + tablePath: TablePath, + tableInfo: TableInfo, + requiredSchema: Option[StructType], + options: CaseInsensitiveStringMap, + flussConfig: Configuration) + extends FlussScan { + + override def toBatch: Batch = { + new FlussLakeUpsertBatch(tablePath, tableInfo, readSchema, options, flussConfig) + } + + override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = { + new FlussUpsertMicroBatchStream( + tablePath, + tableInfo, + readSchema, + options, + flussConfig, + checkpointLocation) + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala index 9dd49f4df6..20d367ef46 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala @@ -72,3 +72,16 @@ class FlussUpsertScanBuilder( FlussUpsertScan(tablePath, tableInfo, requiredSchema, options, flussConfig) } } + +/** Fluss Lake Upsert Scan Builder for lake-enabled primary key tables. */ +class FlussLakeUpsertScanBuilder( + tablePath: TablePath, + tableInfo: TableInfo, + options: CaseInsensitiveStringMap, + flussConfig: FlussConfiguration) + extends FlussScanBuilder { + + override def build(): Scan = { + FlussLakeUpsertScan(tablePath, tableInfo, requiredSchema, options, flussConfig) + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeBatch.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeBatch.scala similarity index 99% rename from fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeBatch.scala rename to fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeBatch.scala index 70d5ef45c0..a7b11c8760 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeBatch.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeBatch.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.fluss.spark.read +package org.apache.fluss.spark.read.lake import org.apache.fluss.client.initializer.{BucketOffsetsRetrieverImpl, OffsetsInitializer} import org.apache.fluss.client.table.scanner.log.LogScanner @@ -24,6 +24,7 @@ import org.apache.fluss.exception.LakeTableSnapshotNotExistException import org.apache.fluss.lake.serializer.SimpleVersionedSerializer import org.apache.fluss.lake.source.{LakeSource, LakeSplit} import org.apache.fluss.metadata.{ResolvedPartitionSpec, TableBucket, TableInfo, TablePath} +import org.apache.fluss.spark.read._ import org.apache.fluss.utils.ExceptionUtils import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeInputPartition.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeInputPartition.scala new file mode 100644 index 0000000000..6a32983011 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeInputPartition.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.read.lake + +import org.apache.fluss.metadata.TableBucket +import org.apache.fluss.spark.read.FlussInputPartition + +/** + * Represents an input partition for reading data from a single lake split. Each lake split maps to + * one Spark task, enabling parallel lake reads across splits. + * + * @param tableBucket + * the table bucket this split belongs to + * @param lakeSplitBytes + * serialized lake split data + */ +case class FlussLakeInputPartition(tableBucket: TableBucket, lakeSplitBytes: Array[Byte]) + extends FlussInputPartition { + override def toString: String = { + s"FlussLakeInputPartition{tableId=${tableBucket.getTableId}, bucketId=${tableBucket.getBucket}," + + s" partitionId=${tableBucket.getPartitionId}," + + s" splitSize=${lakeSplitBytes.length}}" + } +} + +/** + * Represents an input partition for reading data from a lake-enabled primary key table bucket. This + * partition combines lake snapshot splits with Fluss log tail for hybrid lake-kv reading. + * + * @param tableBucket + * the table bucket to read from + * @param lakeSplitBytes + * serialized lake splits data (may be null if no lake snapshot) + * @param logStartingOffset + * the log offset where incremental reading should start + * @param logStoppingOffset + * the log offset where incremental reading should end + */ +case class FlussLakeUpsertInputPartition( + tableBucket: TableBucket, + lakeSplitBytes: Array[Byte], + logStartingOffset: Long, + logStoppingOffset: Long) + extends FlussInputPartition { + override def toString: String = { + s"FlussLakeUpsertInputPartition{tableId=${tableBucket.getTableId}, bucketId=${tableBucket.getBucket}," + + s" partitionId=${tableBucket.getPartitionId}," + + s" lakeSplitSize=${if (lakeSplitBytes != null) lakeSplitBytes.length else 0}," + + s" logStartOffset=$logStartingOffset, logStopOffset=$logStoppingOffset}" + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakePartitionReader.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakePartitionReader.scala similarity index 96% rename from fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakePartitionReader.scala rename to fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakePartitionReader.scala index 9c0031409b..ebd2f37602 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakePartitionReader.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakePartitionReader.scala @@ -15,11 +15,12 @@ * limitations under the License. */ -package org.apache.fluss.spark.read +package org.apache.fluss.spark.read.lake import org.apache.fluss.lake.source.{LakeSource, LakeSplit} import org.apache.fluss.metadata.TablePath import org.apache.fluss.record.LogRecord +import org.apache.fluss.spark.read.FlussLakeInputPartition import org.apache.fluss.spark.row.DataConverter import org.apache.fluss.types.RowType import org.apache.fluss.utils.CloseableIterator diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakePartitionReaderFactory.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakePartitionReaderFactory.scala similarity index 93% rename from fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakePartitionReaderFactory.scala rename to fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakePartitionReaderFactory.scala index 18c592801b..595c8c0e14 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakePartitionReaderFactory.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakePartitionReaderFactory.scala @@ -15,11 +15,12 @@ * limitations under the License. */ -package org.apache.fluss.spark.read +package org.apache.fluss.spark.read.lake import org.apache.fluss.config.Configuration import org.apache.fluss.lake.source.{LakeSource, LakeSplit} import org.apache.fluss.metadata.TablePath +import org.apache.fluss.spark.read.{FlussAppendInputPartition, FlussAppendPartitionReader, FlussLakeInputPartition} import org.apache.fluss.types.RowType import org.apache.spark.sql.catalyst.InternalRow diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeSourceUtils.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeSourceUtils.scala similarity index 98% rename from fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeSourceUtils.scala rename to fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeSourceUtils.scala index 41958c3442..8fb4ccf6bc 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeSourceUtils.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeSourceUtils.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.fluss.spark.read +package org.apache.fluss.spark.read.lake import org.apache.fluss.config.{ConfigOptions, Configuration} import org.apache.fluss.lake.lakestorage.LakeStoragePluginSetUp diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertBatch.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertBatch.scala new file mode 100644 index 0000000000..d1bc2f07f5 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertBatch.scala @@ -0,0 +1,365 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.read.lake + +import org.apache.fluss.client.initializer.{BucketOffsetsRetrieverImpl, OffsetsInitializer, SnapshotOffsetsInitializer} +import org.apache.fluss.client.table.scanner.log.LogScanner +import org.apache.fluss.config.Configuration +import org.apache.fluss.exception.LakeTableSnapshotNotExistException +import org.apache.fluss.lake.serializer.SimpleVersionedSerializer +import org.apache.fluss.lake.source.{LakeSource, LakeSplit} +import org.apache.fluss.metadata.{ResolvedPartitionSpec, TableBucket, TableInfo, TablePath} +import org.apache.fluss.spark.read._ +import org.apache.fluss.utils.ExceptionUtils + +import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +import java.io.{ByteArrayOutputStream, DataOutputStream} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +/** + * Batch for reading lake-enabled primary key tables. Combines lake snapshot data with Fluss kv + * tail, merging them using sort-merge algorithm. + */ +class FlussLakeUpsertBatch( + tablePath: TablePath, + tableInfo: TableInfo, + readSchema: StructType, + options: CaseInsensitiveStringMap, + flussConfig: Configuration) + extends FlussBatch(tablePath, tableInfo, readSchema, flussConfig) { + + override val startOffsetsInitializer: OffsetsInitializer = { + val offsetsInitializer = FlussOffsetInitializers.startOffsetsInitializer(options, flussConfig) + if (!offsetsInitializer.isInstanceOf[SnapshotOffsetsInitializer]) { + throw new UnsupportedOperationException("Upsert scan only support FULL startup mode.") + } + offsetsInitializer + } + + override val stoppingOffsetsInitializer: OffsetsInitializer = { + FlussOffsetInitializers.stoppingOffsetsInitializer(true, options, flussConfig) + } + + private lazy val (partitions, isFallback) = doPlan() + + override def planInputPartitions(): Array[InputPartition] = partitions + + override def createReaderFactory(): PartitionReaderFactory = { + if (isFallback) { + new FlussUpsertPartitionReaderFactory(tablePath, projection, options, flussConfig) + } else { + new FlussLakeUpsertPartitionReaderFactory( + tableInfo.getProperties.toMap, + tablePath, + projection, + flussConfig) + } + } + + /** + * Plans input partitions for reading. The returned isFallback flag is true when no lake snapshot + * exists and the plan falls back to pure Fluss kv reading. + */ + private def doPlan(): (Array[InputPartition], Boolean) = { + val lakeSnapshot = + try { + admin.getReadableLakeSnapshot(tablePath).get() + } catch { + case e: Exception => + if ( + ExceptionUtils + .stripExecutionException(e) + .isInstanceOf[LakeTableSnapshotNotExistException] + ) { + return (planFallbackPartitions(), true) + } + throw e + } + + val lakeSource = FlussLakeSourceUtils.createLakeSource(tableInfo.getProperties.toMap, tablePath) + lakeSource.withProject(FlussLakeSourceUtils.lakeProjection(projection)) + + val lakeSplits = lakeSource + .createPlanner(new LakeSource.PlannerContext { + override def snapshotId(): Long = lakeSnapshot.getSnapshotId + }) + .plan() + + val splitSerializer = lakeSource.getSplitSerializer + val tableBucketsOffset = lakeSnapshot.getTableBucketsOffset + val bucketOffsetsRetriever = new BucketOffsetsRetrieverImpl(admin, tablePath) + + val partitions = if (tableInfo.isPartitioned) { + planPartitionedTable( + lakeSplits.asScala, + splitSerializer, + tableBucketsOffset, + bucketOffsetsRetriever) + } else { + planNonPartitionedTable( + lakeSplits.asScala, + splitSerializer, + tableBucketsOffset, + bucketOffsetsRetriever) + } + + (partitions, false) + } + + private def planNonPartitionedTable( + lakeSplits: Seq[LakeSplit], + splitSerializer: SimpleVersionedSerializer[LakeSplit], + tableBucketsOffset: java.util.Map[TableBucket, java.lang.Long], + bucketOffsetsRetriever: BucketOffsetsRetrieverImpl): Array[InputPartition] = { + val tableId = tableInfo.getTableId + val buckets = (0 until tableInfo.getNumBuckets).toSeq + + val stoppingOffsets = + getBucketOffsets(stoppingOffsetsInitializer, null, buckets, bucketOffsetsRetriever) + + // Group lake splits by bucket + val lakeSplitsByBucket = groupLakeSplitsByBucket(lakeSplits, None) + + buckets.map { + bucketId => + val tableBucket = new TableBucket(tableId, bucketId) + val snapshotLogOffset = tableBucketsOffset.get(tableBucket) + val stoppingOffset = stoppingOffsets(bucketId) + + createLakeUpsertPartition( + tableBucket, + lakeSplitsByBucket.get(bucketId), + splitSerializer, + snapshotLogOffset, + stoppingOffset) + }.toArray + } + + private def planPartitionedTable( + lakeSplits: Seq[LakeSplit], + splitSerializer: SimpleVersionedSerializer[LakeSplit], + tableBucketsOffset: java.util.Map[TableBucket, java.lang.Long], + bucketOffsetsRetriever: BucketOffsetsRetrieverImpl): Array[InputPartition] = { + val tableId = tableInfo.getTableId + val buckets = (0 until tableInfo.getNumBuckets).toSeq + + val flussPartitionIdByName = mutable.LinkedHashMap.empty[String, Long] + partitionInfos.asScala.foreach { + pi => flussPartitionIdByName(pi.getPartitionName) = pi.getPartitionId + } + + val lakeSplitsByPartition = groupLakeSplitsByPartition(lakeSplits) + var lakeSplitPartitionId = -1L + + val lakePartitions = lakeSplitsByPartition.flatMap { + case (partitionName, splitsByBucket) => + flussPartitionIdByName.remove(partitionName) match { + case Some(partitionId) => + // Partition in both lake and Fluss + val stoppingOffsets = getBucketOffsets( + stoppingOffsetsInitializer, + partitionName, + buckets, + bucketOffsetsRetriever) + + buckets.map { + bucketId => + val tableBucket = new TableBucket(tableId, partitionId, bucketId) + val snapshotLogOffset = tableBucketsOffset.get(tableBucket) + val stoppingOffset = stoppingOffsets(bucketId) + + createLakeUpsertPartition( + tableBucket, + splitsByBucket.get(bucketId), + splitSerializer, + snapshotLogOffset, + stoppingOffset) + } + + case None => + // Partition only in lake (expired in Fluss) - create partitions with dummy partition id + val pid = lakeSplitPartitionId + lakeSplitPartitionId -= 1 + + buckets.map { + bucketId => + val tableBucket = new TableBucket(tableId, pid, bucketId) + // For expired partitions, there's no log tail to read + createLakeUpsertPartition( + tableBucket, + splitsByBucket.get(bucketId), + splitSerializer, + null, // no log offset + -1L // no stopping offset + ) + } + } + } + + // Partitions only in Fluss (not yet tiered) - read from earliest + val flussOnlyPartitions = flussPartitionIdByName.flatMap { + case (partitionName, partitionId) => + val stoppingOffsets = getBucketOffsets( + stoppingOffsetsInitializer, + partitionName, + buckets, + bucketOffsetsRetriever) + + buckets.map { + bucketId => + val tableBucket = new TableBucket(tableId, partitionId, bucketId) + val stoppingOffset = stoppingOffsets(bucketId) + + // No lake snapshot for this bucket, read log from earliest + FlussLakeUpsertInputPartition( + tableBucket, + null, // no lake splits + LogScanner.EARLIEST_OFFSET, + stoppingOffset + ): InputPartition + } + } + + (lakePartitions ++ flussOnlyPartitions).toArray + } + + private def groupLakeSplitsByPartition( + lakeSplits: Seq[LakeSplit]): Map[String, mutable.Map[Int, Seq[LakeSplit]]] = { + val grouped = mutable.LinkedHashMap.empty[String, mutable.Map[Int, Seq[LakeSplit]]] + lakeSplits.foreach { + split => + val partitionName = if (split.partition() == null || split.partition().isEmpty) { + "" + } else { + split.partition().asScala.mkString(ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR) + } + val bucketId = split.bucket() + val bucketMap = grouped.getOrElseUpdate(partitionName, mutable.Map.empty) + val splits = bucketMap.getOrElse(bucketId, Seq.empty) + bucketMap(bucketId) = splits :+ split + } + grouped.toMap + } + + private def groupLakeSplitsByBucket( + lakeSplits: Seq[LakeSplit], + partitionId: Option[Long]): Map[Int, Seq[LakeSplit]] = { + lakeSplits.groupBy(_.bucket()).mapValues(_.toSeq).toMap + } + + private def createLakeUpsertPartition( + tableBucket: TableBucket, + lakeSplits: Option[Seq[LakeSplit]], + splitSerializer: SimpleVersionedSerializer[LakeSplit], + snapshotLogOffset: java.lang.Long, + stoppingOffset: Long): InputPartition = { + val (lakeSplitBytes, logStartingOffset) = + if (lakeSplits.isDefined && lakeSplits.get.nonEmpty) { + // Serialize all lake splits for this bucket into a single byte array + val serialized = serializeLakeSplits(lakeSplits.get, splitSerializer) + val startOffset = + if (snapshotLogOffset != null) snapshotLogOffset.longValue() + else LogScanner.EARLIEST_OFFSET + (serialized, startOffset) + } else { + // No lake splits for this bucket + (null, LogScanner.EARLIEST_OFFSET) + } + + FlussLakeUpsertInputPartition( + tableBucket, + lakeSplitBytes, + logStartingOffset, + stoppingOffset + ) + } + + private def serializeLakeSplits( + lakeSplits: Seq[LakeSplit], + splitSerializer: SimpleVersionedSerializer[LakeSplit]): Array[Byte] = { + val baos = new ByteArrayOutputStream() + val dos = new DataOutputStream(baos) + + // Write serializer version + dos.writeInt(splitSerializer.getVersion) + // Write number of splits + dos.writeInt(lakeSplits.size) + // Write each split + for (split <- lakeSplits) { + val serialized = splitSerializer.serialize(split) + dos.writeInt(serialized.length) + dos.write(serialized) + } + + dos.flush() + baos.toByteArray + } + + private def getBucketOffsets( + initializer: OffsetsInitializer, + partitionName: String, + buckets: Seq[Int], + bucketOffsetsRetriever: BucketOffsetsRetrieverImpl): Map[Int, Long] = { + initializer + .getBucketOffsets(partitionName, buckets.map(Integer.valueOf).asJava, bucketOffsetsRetriever) + .asScala + .map(e => (e._1.intValue(), Long2long(e._2))) + .toMap + } + + private def planFallbackPartitions(): Array[InputPartition] = { + // Fallback to pure Fluss kv reading when no lake snapshot exists + val bucketOffsetsRetriever = new BucketOffsetsRetrieverImpl(admin, tablePath) + val buckets = (0 until tableInfo.getNumBuckets).toSeq + + def createPartitions( + partitionId: Option[Long], + partitionName: String): Array[InputPartition] = { + val stoppingOffsets = + getBucketOffsets(stoppingOffsetsInitializer, partitionName, buckets, bucketOffsetsRetriever) + + buckets.map { + bucketId => + val tableBucket = partitionId match { + case Some(pid) => new TableBucket(tableInfo.getTableId, pid, bucketId) + case None => new TableBucket(tableInfo.getTableId, bucketId) + } + // Use FlussUpsertInputPartition for fallback (reads from Fluss kv snapshot) + FlussUpsertInputPartition( + tableBucket, + -1L, // no snapshot + LogScanner.EARLIEST_OFFSET, + stoppingOffsets(bucketId) + ): InputPartition + }.toArray + } + + if (tableInfo.isPartitioned) { + partitionInfos.asScala.flatMap { + pi => createPartitions(Some(pi.getPartitionId), pi.getPartitionName) + }.toArray + } else { + createPartitions(None, null) + } + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertPartitionReader.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertPartitionReader.scala new file mode 100644 index 0000000000..9dd77d43db --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertPartitionReader.scala @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.read.lake + +import org.apache.fluss.client.table.scanner.batch.LakeSnapshotAndLogSplitScanner +import org.apache.fluss.config.Configuration +import org.apache.fluss.lake.source.{LakeSource, LakeSplit} +import org.apache.fluss.metadata.TablePath +import org.apache.fluss.row.InternalRow +import org.apache.fluss.spark.read.{FlussLakeUpsertInputPartition, FlussPartitionReader} + +import org.apache.spark.internal.Logging + +import java.io.{ByteArrayInputStream, DataInputStream} +import java.util.ArrayList + +import scala.collection.JavaConverters._ + +/** + * Partition reader that reads lake-enabled primary key table data. + * + * Reads lake snapshot and Fluss log tail, merging them using sort-merge algorithm. For rows with + * the same primary key, log data takes precedence over lake snapshot data. + */ +class FlussLakeUpsertPartitionReader( + tablePath: TablePath, + lakeSource: LakeSource[LakeSplit], + projection: Array[Int], + flussPartition: FlussLakeUpsertInputPartition, + flussConfig: Configuration) + extends FlussPartitionReader(tablePath, flussConfig) + with Logging { + + private val lakeSplits = if (flussPartition.lakeSplitBytes != null) { + deserializeLakeSplits(flussPartition.lakeSplitBytes, lakeSource.getSplitSerializer) + } else { + null + } + private val flussLakeSnapshotAndLogSplitScanner = new LakeSnapshotAndLogSplitScanner( + table, + lakeSource, + lakeSplits, + flussPartition.tableBucket, + flussPartition.logStartingOffset, + flussPartition.logStoppingOffset, + projection) + private var mergedIterator: Iterator[InternalRow] = Iterator.empty + private var scanFinished = false + + initialize() + + override def next(): Boolean = { + if (closed || scanFinished) { + return false + } + + // Loop pollBatch until fetch valid records or scan is complete + while (true) { + if (mergedIterator.hasNext) { + currentRow = convertToSparkRow(mergedIterator.next()) + return true + } + + val flussRecords = flussLakeSnapshotAndLogSplitScanner.pollBatch(POLL_TIMEOUT) + if (flussRecords == null) { + scanFinished = true + return false + } + mergedIterator = flussRecords.asScala + } + false + } + + private def deserializeLakeSplits( + bytes: Array[Byte], + splitSerializer: org.apache.fluss.lake.serializer.SimpleVersionedSerializer[LakeSplit]) + : java.util.List[LakeSplit] = { + val bais = new ByteArrayInputStream(bytes) + val dis = new DataInputStream(bais) + + // Read serializer version + val version = dis.readInt() + // Read number of splits + val numSplits = dis.readInt() + val splits = new ArrayList[LakeSplit](numSplits) + + // Read each split + for (_ <- 0 until numSplits) { + val length = dis.readInt() + val serialized = new Array[Byte](length) + dis.readFully(serialized) + splits.add(splitSerializer.deserialize(version, serialized)) + } + + splits + } + + private def initialize(): Unit = { + val currentTs = System.currentTimeMillis() + logInfo(s"Prepare read lake-enabled pk table $tablePath $flussPartition") + + // Loop pollBatch until fetch valid records or scan is complete + var flussRecords = flussLakeSnapshotAndLogSplitScanner.pollBatch(POLL_TIMEOUT) + while (flussRecords != null && !flussRecords.hasNext) { + flussRecords = flussLakeSnapshotAndLogSplitScanner.pollBatch(POLL_TIMEOUT) + } + mergedIterator = if (flussRecords == null) { + scanFinished = true + Iterator.empty + } else { + flussRecords.asScala + } + val spend = (System.currentTimeMillis() - currentTs) / 1000 + logInfo(s"Initialize FlussLakeUpsertPartitionReader cost $spend(s)") + } + + override def close0(): Unit = { + if (mergedIterator != null) { + mergedIterator match { + case closeable: AutoCloseable => closeable.close() + case _ => // Do nothing + } + } + if (flussLakeSnapshotAndLogSplitScanner != null) { + flussLakeSnapshotAndLogSplitScanner.close() + } + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertPartitionReaderFactory.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertPartitionReaderFactory.scala new file mode 100644 index 0000000000..f09074026b --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertPartitionReaderFactory.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.read.lake + +import org.apache.fluss.config.Configuration +import org.apache.fluss.lake.source.{LakeSource, LakeSplit} +import org.apache.fluss.metadata.TablePath +import org.apache.fluss.spark.read.FlussLakeUpsertInputPartition + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} + +import java.util + +/** + * Factory for lake-enabled primary key table reads. Creates readers that merge lake snapshot with + * Fluss kv tail. + */ +class FlussLakeUpsertPartitionReaderFactory( + tableProperties: util.Map[String, String], + tablePath: TablePath, + projection: Array[Int], + flussConfig: Configuration) + extends PartitionReaderFactory { + + @transient private lazy val lakeSource: LakeSource[LakeSplit] = { + FlussLakeSourceUtils.createLakeSource(tableProperties, tablePath) + } + + override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { + partition match { + case lakeUpsert: FlussLakeUpsertInputPartition => + new FlussLakeUpsertPartitionReader( + tablePath, + lakeSource, + projection, + lakeUpsert, + flussConfig) + case _ => + throw new IllegalArgumentException(s"Unexpected partition type: ${partition.getClass}") + } + } +} diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala index 11287fb7d4..34ab0cd5f9 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala @@ -63,6 +63,7 @@ class FlussSparkTestBase extends QueryTest with SharedSparkSession { super.beforeAll() sql(s"USE $DEFAULT_DATABASE") + spark.sparkContext.setLogLevel("INFO") } override protected def afterAll(): Unit = { diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeIcebergPkTableReadTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeIcebergPkTableReadTest.scala new file mode 100644 index 0000000000..ca7929b728 --- /dev/null +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeIcebergPkTableReadTest.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.lake + +import org.apache.fluss.config.Configuration +import org.apache.fluss.metadata.DataLakeFormat + +import java.nio.file.Files + +class SparkLakeIcebergPkTableReadTest extends SparkLakePkTableReadTestBase { + + override protected def dataLakeFormat: DataLakeFormat = DataLakeFormat.ICEBERG + + override protected def flussConf: Configuration = { + val conf = super.flussConf + conf.setString("datalake.format", DataLakeFormat.ICEBERG.toString) + conf.setString("datalake.iceberg.type", "hadoop") + warehousePath = + Files.createTempDirectory("fluss-testing-iceberg-pk-lake-read").resolve("warehouse").toString + conf.setString("datalake.iceberg.warehouse", warehousePath) + conf + } + + override protected def lakeCatalogConf: Configuration = { + val conf = new Configuration() + conf.setString("type", "hadoop") + conf.setString("warehouse", warehousePath) + conf + } +} diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePaimonPkTableReadTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePaimonPkTableReadTest.scala new file mode 100644 index 0000000000..9281970a3b --- /dev/null +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePaimonPkTableReadTest.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.lake + +import org.apache.fluss.config.Configuration +import org.apache.fluss.metadata.DataLakeFormat + +import java.nio.file.Files + +class SparkLakePaimonPkTableReadTest extends SparkLakePkTableReadTestBase { + + override protected def dataLakeFormat: DataLakeFormat = DataLakeFormat.PAIMON + + override protected def flussConf: Configuration = { + val conf = super.flussConf + conf.setString("datalake.format", DataLakeFormat.PAIMON.toString) + conf.setString("datalake.paimon.metastore", "filesystem") + conf.setString("datalake.paimon.cache-enabled", "false") + warehousePath = + Files.createTempDirectory("fluss-testing-paimon-pk-lake-read").resolve("warehouse").toString + conf.setString("datalake.paimon.warehouse", warehousePath) + conf + } + + override protected def lakeCatalogConf: Configuration = { + val conf = new Configuration() + conf.setString("metastore", "filesystem") + conf.setString("warehouse", warehousePath) + conf + } +} diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePkTableReadTestBase.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePkTableReadTestBase.scala new file mode 100644 index 0000000000..c424abda7b --- /dev/null +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePkTableReadTestBase.scala @@ -0,0 +1,433 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.lake + +import org.apache.fluss.config.{ConfigOptions, Configuration} +import org.apache.fluss.flink.tiering.LakeTieringJobBuilder +import org.apache.fluss.flink.tiering.source.TieringSourceOptions +import org.apache.fluss.metadata.{DataLakeFormat, TableBucket, TablePath} +import org.apache.fluss.spark.FlussSparkTestBase +import org.apache.fluss.spark.SparkConnectorOptions.{BUCKET_NUMBER, PRIMARY_KEY} + +import org.apache.flink.api.common.RuntimeExecutionMode +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.apache.spark.sql.Row + +import java.time.Duration + +import scala.jdk.CollectionConverters._ + +/** + * Base class for lake-enabled primary key table read tests. Subclasses provide the lake format + * config and lake catalog configuration. + */ +abstract class SparkLakePkTableReadTestBase extends FlussSparkTestBase { + + protected var warehousePath: String = _ + + /** The lake format used by this test. */ + protected def dataLakeFormat: DataLakeFormat + + /** Lake catalog configuration specific to the format. */ + protected def lakeCatalogConf: Configuration + + private val TIERING_PARALLELISM = 2 + private val CHECKPOINT_INTERVAL_MS = 1000L + private val POLL_INTERVAL: Duration = Duration.ofMillis(500L) + private val SYNC_TIMEOUT: Duration = Duration.ofMinutes(2) + private val SYNC_POLL_INTERVAL_MS = 500L + + /** Tier all pending data for the given table to the lake. */ + protected def tierToLake(tableName: String): Unit = { + val tablePath = createTablePath(tableName) + val tableInfo = loadFlussTable(tablePath).getTableInfo + val tableId = tableInfo.getTableId + val numBuckets = tableInfo.getNumBuckets + + val execEnv = StreamExecutionEnvironment.getExecutionEnvironment + execEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING) + execEnv.setParallelism(TIERING_PARALLELISM) + execEnv.enableCheckpointing(CHECKPOINT_INTERVAL_MS) + + val flussConfig = new Configuration(flussServer.getClientConfig) + flussConfig.set(TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL, POLL_INTERVAL) + + val jobClient = LakeTieringJobBuilder + .newBuilder( + execEnv, + flussConfig, + lakeCatalogConf, + new Configuration(), + dataLakeFormat.toString) + .build() + + try { + // Collect all buckets to wait for sync + val tableBuckets = if (tableInfo.isPartitioned) { + // For partitioned table, get all partitions and their buckets + val partitionInfos = admin.listPartitionInfos(tablePath).get() + partitionInfos.asScala.flatMap { + partitionInfo => + (0 until numBuckets).map { + bucket => new TableBucket(tableId, partitionInfo.getPartitionId, bucket) + } + }.toSet + } else { + // For non-partitioned table, just use bucket 0 + Set(new TableBucket(tableId, 0)) + } + + val deadline = System.currentTimeMillis() + SYNC_TIMEOUT.toMillis + val syncedBuckets = scala.collection.mutable.Set[TableBucket]() + + while (syncedBuckets.size < tableBuckets.size && System.currentTimeMillis() < deadline) { + tableBuckets.foreach { + tableBucket => + if (!syncedBuckets.contains(tableBucket)) { + try { + val replica = flussServer.waitAndGetLeaderReplica(tableBucket) + // For pk table, we also check the LogTablet's lake snapshot id + // which is updated when data is tiered to lake + if (replica.getLogTablet.getLakeTableSnapshotId >= 0) { + syncedBuckets.add(tableBucket) + } + } catch { + case _: Exception => + } + } + } + if (syncedBuckets.size < tableBuckets.size) { + Thread.sleep(SYNC_POLL_INTERVAL_MS) + } + } + + assert( + syncedBuckets.size == tableBuckets.size, + s"Not all buckets synced to lake within $SYNC_TIMEOUT. " + + s"Synced: ${syncedBuckets.size}, Total: ${tableBuckets.size}" + ) + } finally { + jobClient.cancel().get() + } + } + + override protected def withTable(tableNames: String*)(f: => Unit): Unit = { + try { + f + } finally { + tableNames.foreach(t => sql(s"DROP TABLE IF EXISTS $DEFAULT_DATABASE.$t")) + } + } + + test("Spark Lake Read: pk table falls back when no lake snapshot") { + // Test non-partitioned table + withTable("t_non_partitioned") { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.t_non_partitioned (id INT, name STRING, score INT) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, + | '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s', + | '${PRIMARY_KEY.key()}' = 'id', + | '${BUCKET_NUMBER.key()}' = 1) + |""".stripMargin) + + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_non_partitioned VALUES + |(1, "alice", 90), (2, "bob", 85), (3, "charlie", 95) + |""".stripMargin) + + checkAnswer( + sql(s"SELECT * FROM $DEFAULT_DATABASE.t_non_partitioned ORDER BY id"), + Row(1, "alice", 90) :: Row(2, "bob", 85) :: Row(3, "charlie", 95) :: Nil + ) + } + + // Test partitioned table + withTable("t_partitioned") { + sql( + s""" + |CREATE TABLE $DEFAULT_DATABASE.t_partitioned (id INT, name STRING, score INT, dt STRING) + | PARTITIONED BY (dt) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, + | '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s', + | '${PRIMARY_KEY.key()}' = 'id,dt', + | '${BUCKET_NUMBER.key()}' = 1) + |""".stripMargin) + + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_partitioned VALUES + |(1, "alice", 90, "2026-01-01"), + |(2, "bob", 85, "2026-01-01"), + |(3, "charlie", 95, "2026-01-02") + |""".stripMargin) + + checkAnswer( + sql(s"SELECT * FROM $DEFAULT_DATABASE.t_partitioned ORDER BY id"), + Row(1, "alice", 90, "2026-01-01") :: + Row(2, "bob", 85, "2026-01-01") :: + Row(3, "charlie", 95, "2026-01-02") :: Nil + ) + + // Test column projection with different order + checkAnswer( + sql(s"SELECT dt, name, id FROM $DEFAULT_DATABASE.t_partitioned ORDER BY id"), + Row("2026-01-01", "alice", 1) :: + Row("2026-01-01", "bob", 2) :: + Row("2026-01-02", "charlie", 3) :: Nil + ) + } + } + + test("Spark Lake Read: pk table lake-only (all data in lake, no kv tail)") { + // Test non-partitioned table + withTable("t_lake_only") { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.t_lake_only (id INT, name STRING, score INT) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, + | '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s', + | '${PRIMARY_KEY.key()}' = 'id', + | '${BUCKET_NUMBER.key()}' = 1) + |""".stripMargin) + + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_lake_only VALUES + |(1, "alice", 90), (2, "bob", 85), (3, "charlie", 95) + |""".stripMargin) + + tierToLake("t_lake_only") + + checkAnswer( + sql(s"SELECT * FROM $DEFAULT_DATABASE.t_lake_only ORDER BY id"), + Row(1, "alice", 90) :: Row(2, "bob", 85) :: Row(3, "charlie", 95) :: Nil + ) + } + + // Test partitioned table + withTable("t_lake_only_partitioned") { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.t_lake_only_partitioned (id INT, name STRING, score INT, dt STRING) + | PARTITIONED BY (dt) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, + | '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s', + | '${PRIMARY_KEY.key()}' = 'id,dt', + | '${BUCKET_NUMBER.key()}' = 1) + |""".stripMargin) + + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_lake_only_partitioned VALUES + |(1, "alice", 90, "2026-01-01"), + |(2, "bob", 85, "2026-01-01"), + |(3, "charlie", 95, "2026-01-02") + |""".stripMargin) + + tierToLake("t_lake_only_partitioned") + + checkAnswer( + sql(s"SELECT * FROM $DEFAULT_DATABASE.t_lake_only_partitioned ORDER BY id"), + Row(1, "alice", 90, "2026-01-01") :: + Row(2, "bob", 85, "2026-01-01") :: + Row(3, "charlie", 95, "2026-01-02") :: Nil + ) + + // Test column projection with different order + checkAnswer( + sql(s"SELECT dt, name, id FROM $DEFAULT_DATABASE.t_lake_only_partitioned ORDER BY id"), + Row("2026-01-01", "alice", 1) :: + Row("2026-01-01", "bob", 2) :: + Row("2026-01-02", "charlie", 3) :: Nil + ) + } + } + + test("Spark Lake Read: pk table union read (lake + kv tail)") { + // Test non-partitioned table + withTable("t_union") { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.t_union (id INT, name STRING, score INT) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, + | '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s', + | '${PRIMARY_KEY.key()}' = 'id', + | '${BUCKET_NUMBER.key()}' = 1) + |""".stripMargin) + + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_union VALUES + |(1, "alice", 90), (2, "bob", 85), (3, "charlie", 95) + |""".stripMargin) + + tierToLake("t_union") + + // Insert more data after tiering (this will be in kv tail) + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_union VALUES + |(4, "david", 88), (5, "eve", 92) + |""".stripMargin) + + // Union read: should see both lake data and kv tail data + checkAnswer( + sql(s"SELECT * FROM $DEFAULT_DATABASE.t_union ORDER BY id"), + Row(1, "alice", 90) :: Row(2, "bob", 85) :: Row(3, "charlie", 95) :: + Row(4, "david", 88) :: Row(5, "eve", 92) :: Nil + ) + } + + // Test partitioned table + withTable("t_union_partitioned") { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.t_union_partitioned (id INT, name STRING, score INT, dt STRING) + | PARTITIONED BY (dt) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, + | '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s', + | '${PRIMARY_KEY.key()}' = 'id,dt', + | '${BUCKET_NUMBER.key()}' = 1) + |""".stripMargin) + + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_union_partitioned VALUES + |(1, "alice", 90, "2026-01-01"), + |(2, "bob", 85, "2026-01-01"), + |(3, "charlie", 95, "2026-01-02") + |""".stripMargin) + + tierToLake("t_union_partitioned") + + // Insert more data after tiering + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_union_partitioned VALUES + |(4, "david", 88, "2026-01-01"), + |(5, "eve", 92, "2026-01-03") + |""".stripMargin) + + // Union read with partition filter + checkAnswer( + sql(s""" + |SELECT * FROM $DEFAULT_DATABASE.t_union_partitioned + |WHERE dt = '2026-01-01' ORDER BY id""".stripMargin), + Row(1, "alice", 90, "2026-01-01") :: + Row(2, "bob", 85, "2026-01-01") :: + Row(4, "david", 88, "2026-01-01") :: Nil + ) + + // Union read all partitions + checkAnswer( + sql(s"SELECT * FROM $DEFAULT_DATABASE.t_union_partitioned ORDER BY id"), + Row(1, "alice", 90, "2026-01-01") :: + Row(2, "bob", 85, "2026-01-01") :: + Row(3, "charlie", 95, "2026-01-02") :: + Row(4, "david", 88, "2026-01-01") :: + Row(5, "eve", 92, "2026-01-03") :: Nil + ) + + // Test column projection with different order + checkAnswer( + sql(s"SELECT dt, name, id FROM $DEFAULT_DATABASE.t_union_partitioned ORDER BY id"), + Row("2026-01-01", "alice", 1) :: + Row("2026-01-01", "bob", 2) :: + Row("2026-01-02", "charlie", 3) :: + Row("2026-01-01", "david", 4) :: + Row("2026-01-03", "eve", 5) :: Nil + ) + } + } + + test("Spark Lake Read: pk table union read with updates") { + // Test non-partitioned table + withTable("t_update") { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.t_update (id INT, name STRING, score INT) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, + | '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s', + | '${PRIMARY_KEY.key()}' = 'id', + | '${BUCKET_NUMBER.key()}' = 1) + |""".stripMargin) + + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_update VALUES + |(1, "alice", 90), (2, "bob", 85), (3, "charlie", 95) + |""".stripMargin) + + tierToLake("t_update") + + // Update existing record and insert new record + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_update VALUES + |(2, "bob_updated", 100), (4, "david", 88) + |""".stripMargin) + + // Union read: should see updated value for id=2 from kv tail + checkAnswer( + sql(s"SELECT * FROM $DEFAULT_DATABASE.t_update ORDER BY id"), + Row(1, "alice", 90) :: Row(2, "bob_updated", 100) :: + Row(3, "charlie", 95) :: Row(4, "david", 88) :: Nil + ) + } + + // Test partitioned table + withTable("t_update_partitioned") { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.t_update_partitioned (id INT, name STRING, score INT, dt STRING) + | PARTITIONED BY (dt) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, + | '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s', + | '${PRIMARY_KEY.key()}' = 'id,dt', + | '${BUCKET_NUMBER.key()}' = 1) + |""".stripMargin) + + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_update_partitioned VALUES + |(1, "alice", 90, "2026-01-01"), + |(2, "bob", 85, "2026-01-01"), + |(3, "charlie", 95, "2026-01-02") + |""".stripMargin) + + tierToLake("t_update_partitioned") + + // Update existing record and insert new record + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_update_partitioned VALUES + |(2, "bob_updated", 100, "2026-01-01"), + |(4, "david", 88, "2026-01-02") + |""".stripMargin) + + // Union read: should see updated value for id=2 from kv tail + checkAnswer( + sql(s"SELECT * FROM $DEFAULT_DATABASE.t_update_partitioned ORDER BY id"), + Row(1, "alice", 90, "2026-01-01") :: + Row(2, "bob_updated", 100, "2026-01-01") :: + Row(3, "charlie", 95, "2026-01-02") :: + Row(4, "david", 88, "2026-01-02") :: Nil + ) + + // Test column projection with different order + checkAnswer( + sql(s"SELECT dt, name, id FROM $DEFAULT_DATABASE.t_update_partitioned ORDER BY id"), + Row("2026-01-01", "alice", 1) :: + Row("2026-01-01", "bob_updated", 2) :: + Row("2026-01-02", "charlie", 3) :: + Row("2026-01-02", "david", 4) :: Nil + ) + } + } +} From 6f4f4770b7b5a88242b020a01aaab53388dc6e0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BE=8A=E5=B7=9D?= Date: Thu, 9 Apr 2026 14:45:50 +0800 Subject: [PATCH 2/7] compact test files --- .../SparkLakeIcebergLogTableReadTest.scala | 45 ----- .../SparkLakeIcebergPkTableReadTest.scala | 45 ----- ....scala => SparkLakeLogTableReadTest.scala} | 120 ++++++-------- .../SparkLakePaimonLogTableReadTest.scala | 46 ------ .../lake/SparkLakePaimonPkTableReadTest.scala | 46 ------ ...parkLakePrimaryKeyTableReadTestBase.scala} | 154 ++++++------------ .../lake/SparkLakeTableReadTestBase.scala | 133 +++++++++++++++ 7 files changed, 227 insertions(+), 362 deletions(-) delete mode 100644 fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeIcebergLogTableReadTest.scala delete mode 100644 fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeIcebergPkTableReadTest.scala rename fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/{SparkLakeLogTableReadTestBase.scala => SparkLakeLogTableReadTest.scala} (69%) delete mode 100644 fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePaimonLogTableReadTest.scala delete mode 100644 fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePaimonPkTableReadTest.scala rename fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/{SparkLakePkTableReadTestBase.scala => SparkLakePrimaryKeyTableReadTestBase.scala} (75%) create mode 100644 fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeTableReadTestBase.scala diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeIcebergLogTableReadTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeIcebergLogTableReadTest.scala deleted file mode 100644 index fad9bcd902..0000000000 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeIcebergLogTableReadTest.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.spark.lake - -import org.apache.fluss.config.Configuration -import org.apache.fluss.metadata.DataLakeFormat - -import java.nio.file.Files - -class SparkLakeIcebergLogTableReadTest extends SparkLakeLogTableReadTestBase { - - override protected def dataLakeFormat: DataLakeFormat = DataLakeFormat.ICEBERG - - override protected def flussConf: Configuration = { - val conf = super.flussConf - conf.setString("datalake.format", DataLakeFormat.ICEBERG.toString) - conf.setString("datalake.iceberg.type", "hadoop") - warehousePath = - Files.createTempDirectory("fluss-testing-iceberg-lake-read").resolve("warehouse").toString - conf.setString("datalake.iceberg.warehouse", warehousePath) - conf - } - - override protected def lakeCatalogConf: Configuration = { - val conf = new Configuration() - conf.setString("type", "hadoop") - conf.setString("warehouse", warehousePath) - conf - } -} diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeIcebergPkTableReadTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeIcebergPkTableReadTest.scala deleted file mode 100644 index ca7929b728..0000000000 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeIcebergPkTableReadTest.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.spark.lake - -import org.apache.fluss.config.Configuration -import org.apache.fluss.metadata.DataLakeFormat - -import java.nio.file.Files - -class SparkLakeIcebergPkTableReadTest extends SparkLakePkTableReadTestBase { - - override protected def dataLakeFormat: DataLakeFormat = DataLakeFormat.ICEBERG - - override protected def flussConf: Configuration = { - val conf = super.flussConf - conf.setString("datalake.format", DataLakeFormat.ICEBERG.toString) - conf.setString("datalake.iceberg.type", "hadoop") - warehousePath = - Files.createTempDirectory("fluss-testing-iceberg-pk-lake-read").resolve("warehouse").toString - conf.setString("datalake.iceberg.warehouse", warehousePath) - conf - } - - override protected def lakeCatalogConf: Configuration = { - val conf = new Configuration() - conf.setString("type", "hadoop") - conf.setString("warehouse", warehousePath) - conf - } -} diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTestBase.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTest.scala similarity index 69% rename from fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTestBase.scala rename to fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTest.scala index a238394616..d0127e1186 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTestBase.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTest.scala @@ -18,85 +18,14 @@ package org.apache.fluss.spark.lake import org.apache.fluss.config.{ConfigOptions, Configuration} -import org.apache.fluss.flink.tiering.LakeTieringJobBuilder -import org.apache.fluss.flink.tiering.source.TieringSourceOptions -import org.apache.fluss.metadata.{DataLakeFormat, TableBucket} -import org.apache.fluss.spark.FlussSparkTestBase +import org.apache.fluss.metadata.DataLakeFormat import org.apache.fluss.spark.SparkConnectorOptions.BUCKET_NUMBER -import org.apache.flink.api.common.RuntimeExecutionMode -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.spark.sql.Row -import java.time.Duration +import java.nio.file.Files -/** - * Base class for lake-enabled log table read tests. Subclasses provide the lake format config and - * lake catalog configuration. - */ -abstract class SparkLakeLogTableReadTestBase extends FlussSparkTestBase { - - protected var warehousePath: String = _ - - /** The lake format used by this test. */ - protected def dataLakeFormat: DataLakeFormat - - /** Lake catalog configuration specific to the format. */ - protected def lakeCatalogConf: Configuration - - private val TIERING_PARALLELISM = 2 - private val CHECKPOINT_INTERVAL_MS = 1000L - private val POLL_INTERVAL: Duration = Duration.ofMillis(500L) - private val SYNC_TIMEOUT: Duration = Duration.ofMinutes(2) - private val SYNC_POLL_INTERVAL_MS = 500L - - /** Tier all pending data for the given table to the lake. */ - protected def tierToLake(tableName: String): Unit = { - val tableId = loadFlussTable(createTablePath(tableName)).getTableInfo.getTableId - - val execEnv = StreamExecutionEnvironment.getExecutionEnvironment - execEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING) - execEnv.setParallelism(TIERING_PARALLELISM) - execEnv.enableCheckpointing(CHECKPOINT_INTERVAL_MS) - - val flussConfig = new Configuration(flussServer.getClientConfig) - flussConfig.set(TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL, POLL_INTERVAL) - - val jobClient = LakeTieringJobBuilder - .newBuilder( - execEnv, - flussConfig, - lakeCatalogConf, - new Configuration(), - dataLakeFormat.toString) - .build() - - try { - val tableBucket = new TableBucket(tableId, 0) - val deadline = System.currentTimeMillis() + SYNC_TIMEOUT.toMillis - var synced = false - while (!synced && System.currentTimeMillis() < deadline) { - try { - val replica = flussServer.waitAndGetLeaderReplica(tableBucket) - synced = replica.getLogTablet.getLakeTableSnapshotId >= 0 - } catch { - case _: Exception => - } - if (!synced) Thread.sleep(SYNC_POLL_INTERVAL_MS) - } - assert(synced, s"Bucket $tableBucket not synced to lake within $SYNC_TIMEOUT") - } finally { - jobClient.cancel().get() - } - } - - override protected def withTable(tableNames: String*)(f: => Unit): Unit = { - try { - f - } finally { - tableNames.foreach(t => sql(s"DROP TABLE IF EXISTS $DEFAULT_DATABASE.$t")) - } - } +abstract class SparkLakeLogTableReadTest extends SparkLakeTableReadTestBase { test("Spark Lake Read: log table falls back when no lake snapshot") { withTable("t") { @@ -251,3 +180,46 @@ abstract class SparkLakeLogTableReadTestBase extends FlussSparkTestBase { } } } + +class SparkLakePaimonLogTableReadTest extends SparkLakeLogTableReadTest { + override protected def dataLakeFormat: DataLakeFormat = DataLakeFormat.PAIMON + + override protected def flussConf: Configuration = { + val conf = super.flussConf + conf.setString("datalake.format", DataLakeFormat.PAIMON.toString) + conf.setString("datalake.paimon.metastore", "filesystem") + conf.setString("datalake.paimon.cache-enabled", "false") + warehousePath = + Files.createTempDirectory("fluss-testing-lake-read").resolve("warehouse").toString + conf.setString("datalake.paimon.warehouse", warehousePath) + conf + } + + override protected def lakeCatalogConf: Configuration = { + val conf = new Configuration() + conf.setString("metastore", "filesystem") + conf.setString("warehouse", warehousePath) + conf + } +} + +class SparkLakeIcebergLogTableReadTest extends SparkLakeLogTableReadTest { + override protected def dataLakeFormat: DataLakeFormat = DataLakeFormat.ICEBERG + + override protected def flussConf: Configuration = { + val conf = super.flussConf + conf.setString("datalake.format", DataLakeFormat.ICEBERG.toString) + conf.setString("datalake.iceberg.type", "hadoop") + warehousePath = + Files.createTempDirectory("fluss-testing-iceberg-lake-read").resolve("warehouse").toString + conf.setString("datalake.iceberg.warehouse", warehousePath) + conf + } + + override protected def lakeCatalogConf: Configuration = { + val conf = new Configuration() + conf.setString("type", "hadoop") + conf.setString("warehouse", warehousePath) + conf + } +} diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePaimonLogTableReadTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePaimonLogTableReadTest.scala deleted file mode 100644 index 2e2b941f4b..0000000000 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePaimonLogTableReadTest.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.spark.lake - -import org.apache.fluss.config.Configuration -import org.apache.fluss.metadata.DataLakeFormat - -import java.nio.file.Files - -class SparkLakePaimonLogTableReadTest extends SparkLakeLogTableReadTestBase { - - override protected def dataLakeFormat: DataLakeFormat = DataLakeFormat.PAIMON - - override protected def flussConf: Configuration = { - val conf = super.flussConf - conf.setString("datalake.format", DataLakeFormat.PAIMON.toString) - conf.setString("datalake.paimon.metastore", "filesystem") - conf.setString("datalake.paimon.cache-enabled", "false") - warehousePath = - Files.createTempDirectory("fluss-testing-lake-read").resolve("warehouse").toString - conf.setString("datalake.paimon.warehouse", warehousePath) - conf - } - - override protected def lakeCatalogConf: Configuration = { - val conf = new Configuration() - conf.setString("metastore", "filesystem") - conf.setString("warehouse", warehousePath) - conf - } -} diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePaimonPkTableReadTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePaimonPkTableReadTest.scala deleted file mode 100644 index 9281970a3b..0000000000 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePaimonPkTableReadTest.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.spark.lake - -import org.apache.fluss.config.Configuration -import org.apache.fluss.metadata.DataLakeFormat - -import java.nio.file.Files - -class SparkLakePaimonPkTableReadTest extends SparkLakePkTableReadTestBase { - - override protected def dataLakeFormat: DataLakeFormat = DataLakeFormat.PAIMON - - override protected def flussConf: Configuration = { - val conf = super.flussConf - conf.setString("datalake.format", DataLakeFormat.PAIMON.toString) - conf.setString("datalake.paimon.metastore", "filesystem") - conf.setString("datalake.paimon.cache-enabled", "false") - warehousePath = - Files.createTempDirectory("fluss-testing-paimon-pk-lake-read").resolve("warehouse").toString - conf.setString("datalake.paimon.warehouse", warehousePath) - conf - } - - override protected def lakeCatalogConf: Configuration = { - val conf = new Configuration() - conf.setString("metastore", "filesystem") - conf.setString("warehouse", warehousePath) - conf - } -} diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePkTableReadTestBase.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePrimaryKeyTableReadTestBase.scala similarity index 75% rename from fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePkTableReadTestBase.scala rename to fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePrimaryKeyTableReadTestBase.scala index c424abda7b..b0f3412f35 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePkTableReadTestBase.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePrimaryKeyTableReadTestBase.scala @@ -18,121 +18,18 @@ package org.apache.fluss.spark.lake import org.apache.fluss.config.{ConfigOptions, Configuration} -import org.apache.fluss.flink.tiering.LakeTieringJobBuilder -import org.apache.fluss.flink.tiering.source.TieringSourceOptions -import org.apache.fluss.metadata.{DataLakeFormat, TableBucket, TablePath} -import org.apache.fluss.spark.FlussSparkTestBase +import org.apache.fluss.metadata.DataLakeFormat import org.apache.fluss.spark.SparkConnectorOptions.{BUCKET_NUMBER, PRIMARY_KEY} -import org.apache.flink.api.common.RuntimeExecutionMode -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.spark.sql.Row -import java.time.Duration - -import scala.jdk.CollectionConverters._ +import java.nio.file.Files /** * Base class for lake-enabled primary key table read tests. Subclasses provide the lake format * config and lake catalog configuration. */ -abstract class SparkLakePkTableReadTestBase extends FlussSparkTestBase { - - protected var warehousePath: String = _ - - /** The lake format used by this test. */ - protected def dataLakeFormat: DataLakeFormat - - /** Lake catalog configuration specific to the format. */ - protected def lakeCatalogConf: Configuration - - private val TIERING_PARALLELISM = 2 - private val CHECKPOINT_INTERVAL_MS = 1000L - private val POLL_INTERVAL: Duration = Duration.ofMillis(500L) - private val SYNC_TIMEOUT: Duration = Duration.ofMinutes(2) - private val SYNC_POLL_INTERVAL_MS = 500L - - /** Tier all pending data for the given table to the lake. */ - protected def tierToLake(tableName: String): Unit = { - val tablePath = createTablePath(tableName) - val tableInfo = loadFlussTable(tablePath).getTableInfo - val tableId = tableInfo.getTableId - val numBuckets = tableInfo.getNumBuckets - - val execEnv = StreamExecutionEnvironment.getExecutionEnvironment - execEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING) - execEnv.setParallelism(TIERING_PARALLELISM) - execEnv.enableCheckpointing(CHECKPOINT_INTERVAL_MS) - - val flussConfig = new Configuration(flussServer.getClientConfig) - flussConfig.set(TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL, POLL_INTERVAL) - - val jobClient = LakeTieringJobBuilder - .newBuilder( - execEnv, - flussConfig, - lakeCatalogConf, - new Configuration(), - dataLakeFormat.toString) - .build() - - try { - // Collect all buckets to wait for sync - val tableBuckets = if (tableInfo.isPartitioned) { - // For partitioned table, get all partitions and their buckets - val partitionInfos = admin.listPartitionInfos(tablePath).get() - partitionInfos.asScala.flatMap { - partitionInfo => - (0 until numBuckets).map { - bucket => new TableBucket(tableId, partitionInfo.getPartitionId, bucket) - } - }.toSet - } else { - // For non-partitioned table, just use bucket 0 - Set(new TableBucket(tableId, 0)) - } - - val deadline = System.currentTimeMillis() + SYNC_TIMEOUT.toMillis - val syncedBuckets = scala.collection.mutable.Set[TableBucket]() - - while (syncedBuckets.size < tableBuckets.size && System.currentTimeMillis() < deadline) { - tableBuckets.foreach { - tableBucket => - if (!syncedBuckets.contains(tableBucket)) { - try { - val replica = flussServer.waitAndGetLeaderReplica(tableBucket) - // For pk table, we also check the LogTablet's lake snapshot id - // which is updated when data is tiered to lake - if (replica.getLogTablet.getLakeTableSnapshotId >= 0) { - syncedBuckets.add(tableBucket) - } - } catch { - case _: Exception => - } - } - } - if (syncedBuckets.size < tableBuckets.size) { - Thread.sleep(SYNC_POLL_INTERVAL_MS) - } - } - - assert( - syncedBuckets.size == tableBuckets.size, - s"Not all buckets synced to lake within $SYNC_TIMEOUT. " + - s"Synced: ${syncedBuckets.size}, Total: ${tableBuckets.size}" - ) - } finally { - jobClient.cancel().get() - } - } - - override protected def withTable(tableNames: String*)(f: => Unit): Unit = { - try { - f - } finally { - tableNames.foreach(t => sql(s"DROP TABLE IF EXISTS $DEFAULT_DATABASE.$t")) - } - } +abstract class SparkLakePrimaryKeyTableReadTestBase extends SparkLakeTableReadTestBase { test("Spark Lake Read: pk table falls back when no lake snapshot") { // Test non-partitioned table @@ -431,3 +328,48 @@ abstract class SparkLakePkTableReadTestBase extends FlussSparkTestBase { } } } + +class SparkLakePaimonPrimaryKeyTableReadTestBase extends SparkLakePrimaryKeyTableReadTestBase { + + override protected def dataLakeFormat: DataLakeFormat = DataLakeFormat.PAIMON + + override protected def flussConf: Configuration = { + val conf = super.flussConf + conf.setString("datalake.format", DataLakeFormat.PAIMON.toString) + conf.setString("datalake.paimon.metastore", "filesystem") + conf.setString("datalake.paimon.cache-enabled", "false") + warehousePath = + Files.createTempDirectory("fluss-testing-paimon-pk-lake-read").resolve("warehouse").toString + conf.setString("datalake.paimon.warehouse", warehousePath) + conf + } + + override protected def lakeCatalogConf: Configuration = { + val conf = new Configuration() + conf.setString("metastore", "filesystem") + conf.setString("warehouse", warehousePath) + conf + } +} + +class SparkLakeIcebergPrimaryKeyTableReadTestBase extends SparkLakePrimaryKeyTableReadTestBase { + + override protected def dataLakeFormat: DataLakeFormat = DataLakeFormat.ICEBERG + + override protected def flussConf: Configuration = { + val conf = super.flussConf + conf.setString("datalake.format", DataLakeFormat.ICEBERG.toString) + conf.setString("datalake.iceberg.type", "hadoop") + warehousePath = + Files.createTempDirectory("fluss-testing-iceberg-pk-lake-read").resolve("warehouse").toString + conf.setString("datalake.iceberg.warehouse", warehousePath) + conf + } + + override protected def lakeCatalogConf: Configuration = { + val conf = new Configuration() + conf.setString("type", "hadoop") + conf.setString("warehouse", warehousePath) + conf + } +} diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeTableReadTestBase.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeTableReadTestBase.scala new file mode 100644 index 0000000000..f8a5b65b2c --- /dev/null +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeTableReadTestBase.scala @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.lake + +import org.apache.fluss.config.Configuration +import org.apache.fluss.flink.tiering.LakeTieringJobBuilder +import org.apache.fluss.flink.tiering.source.TieringSourceOptions +import org.apache.fluss.metadata.{DataLakeFormat, TableBucket} +import org.apache.fluss.spark.FlussSparkTestBase + +import org.apache.flink.api.common.RuntimeExecutionMode +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment + +import java.time.Duration + +import scala.jdk.CollectionConverters._ + +/** + * Base class for lake-enabled table read tests. Subclasses provide the lake format config and lake + * catalog configuration. + */ +abstract class SparkLakeTableReadTestBase extends FlussSparkTestBase { + + protected var warehousePath: String = _ + + /** The lake format used by this test. */ + protected def dataLakeFormat: DataLakeFormat + + /** Lake catalog configuration specific to the format. */ + protected def lakeCatalogConf: Configuration + + private val TIERING_PARALLELISM = 2 + private val CHECKPOINT_INTERVAL_MS = 1000L + private val POLL_INTERVAL: Duration = Duration.ofMillis(500L) + private val SYNC_TIMEOUT: Duration = Duration.ofMinutes(2) + private val SYNC_POLL_INTERVAL_MS = 500L + + /** Tier all pending data for the given table to the lake. */ + protected def tierToLake(tableName: String): Unit = { + val tablePath = createTablePath(tableName) + val tableInfo = loadFlussTable(tablePath).getTableInfo + val tableId = tableInfo.getTableId + val numBuckets = tableInfo.getNumBuckets + + val execEnv = StreamExecutionEnvironment.getExecutionEnvironment + execEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING) + execEnv.setParallelism(TIERING_PARALLELISM) + execEnv.enableCheckpointing(CHECKPOINT_INTERVAL_MS) + + val flussConfig = new Configuration(flussServer.getClientConfig) + flussConfig.set(TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL, POLL_INTERVAL) + + val jobClient = LakeTieringJobBuilder + .newBuilder( + execEnv, + flussConfig, + lakeCatalogConf, + new Configuration(), + dataLakeFormat.toString) + .build() + + try { + // Collect all buckets to wait for sync + val tableBuckets = if (tableInfo.isPartitioned) { + // For partitioned table, get all partitions and their buckets + val partitionInfos = admin.listPartitionInfos(tablePath).get() + partitionInfos.asScala.flatMap { + partitionInfo => + (0 until numBuckets).map { + bucket => new TableBucket(tableId, partitionInfo.getPartitionId, bucket) + } + }.toSet + } else { + // For non-partitioned table, just use bucket 0 + Set(new TableBucket(tableId, 0)) + } + + val deadline = System.currentTimeMillis() + SYNC_TIMEOUT.toMillis + val syncedBuckets = scala.collection.mutable.Set[TableBucket]() + + while (syncedBuckets.size < tableBuckets.size && System.currentTimeMillis() < deadline) { + tableBuckets.foreach { + tableBucket => + if (!syncedBuckets.contains(tableBucket)) { + try { + val replica = flussServer.waitAndGetLeaderReplica(tableBucket) + if (replica.getLogTablet.getLakeTableSnapshotId >= 0) { + syncedBuckets.add(tableBucket) + } + } catch { + case _: Exception => + } + } + } + if (syncedBuckets.size < tableBuckets.size) { + Thread.sleep(SYNC_POLL_INTERVAL_MS) + } + } + + assert( + syncedBuckets.size == tableBuckets.size, + s"Not all buckets synced to lake within $SYNC_TIMEOUT. " + + s"Synced: ${syncedBuckets.size}, Total: ${tableBuckets.size}" + ) + } finally { + jobClient.cancel().get() + } + } + + override protected def withTable(tableNames: String*)(f: => Unit): Unit = { + try { + f + } finally { + tableNames.foreach(t => sql(s"DROP TABLE IF EXISTS $DEFAULT_DATABASE.$t")) + } + } + +} From d8002da622de049eff0fd6d8a7d40c0dc2d35685 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BE=8A=E5=B7=9D?= Date: Thu, 9 Apr 2026 16:49:07 +0800 Subject: [PATCH 3/7] remove duplicate code --- .../read/lake/FlussLakeAppendBatch.scala | 289 ++++++++++++++++++ ...a => FlussLakeAppendPartitionReader.scala} | 7 +- ...ussLakeAppendPartitionReaderFactory.scala} | 8 +- .../spark/read/lake/FlussLakeBatch.scala | 261 +--------------- .../read/lake/FlussLakeUpsertBatch.scala | 71 +---- .../lake/FlussLakeUpsertPartitionReader.scala | 33 +- ...lussLakeUpsertPartitionReaderFactory.scala | 3 +- ...SourceUtils.scala => FlussLakeUtils.scala} | 51 +++- .../fluss/spark/FlussSparkTestBase.scala | 1 - 9 files changed, 363 insertions(+), 361 deletions(-) create mode 100644 fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendBatch.scala rename fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/{FlussLakePartitionReader.scala => FlussLakeAppendPartitionReader.scala} (92%) rename fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/{FlussLakePartitionReaderFactory.scala => FlussLakeAppendPartitionReaderFactory.scala} (88%) rename fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/{FlussLakeSourceUtils.scala => FlussLakeUtils.scala} (56%) diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendBatch.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendBatch.scala new file mode 100644 index 0000000000..384cb4bdbd --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendBatch.scala @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.read.lake + +import org.apache.fluss.client.initializer.{BucketOffsetsRetrieverImpl, OffsetsInitializer} +import org.apache.fluss.client.table.scanner.log.LogScanner +import org.apache.fluss.config.Configuration +import org.apache.fluss.exception.LakeTableSnapshotNotExistException +import org.apache.fluss.lake.serializer.SimpleVersionedSerializer +import org.apache.fluss.lake.source.{LakeSource, LakeSplit} +import org.apache.fluss.metadata.{ResolvedPartitionSpec, TableBucket, TableInfo, TablePath} +import org.apache.fluss.spark.read._ +import org.apache.fluss.utils.ExceptionUtils + +import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +/** Batch for reading lake-enabled log table (append-only table with datalake). */ +class FlussLakeAppendBatch( + tablePath: TablePath, + tableInfo: TableInfo, + readSchema: StructType, + options: CaseInsensitiveStringMap, + flussConfig: Configuration) + extends FlussLakeBatch(tablePath, tableInfo, readSchema, options, flussConfig) { + + // Required by FlussLakeBatch but unused — lake snapshot determines start offsets. + override val startOffsetsInitializer: OffsetsInitializer = OffsetsInitializer.earliest() + + override def createReaderFactory(): PartitionReaderFactory = { + if (isFallback) { + new FlussAppendPartitionReaderFactory(tablePath, projection, options, flussConfig) + } else { + new FlussLakeAppendPartitionReaderFactory( + tableInfo.getProperties.toMap, + tablePath, + tableInfo.getRowType, + projection, + flussConfig) + } + } + + override def doPlan(): (Array[InputPartition], Boolean) = { + val lakeSnapshot = + try { + admin.getReadableLakeSnapshot(tablePath).get() + } catch { + case e: Exception => + if ( + ExceptionUtils + .stripExecutionException(e) + .isInstanceOf[LakeTableSnapshotNotExistException] + ) { + return (planFallbackPartitions(), true) + } + throw e + } + + val lakeSource = FlussLakeUtils.createLakeSource(tableInfo.getProperties.toMap, tablePath) + lakeSource.withProject(FlussLakeUtils.lakeProjection(projection)) + + val lakeSplits = lakeSource + .createPlanner(new LakeSource.PlannerContext { + override def snapshotId(): Long = lakeSnapshot.getSnapshotId + }) + .plan() + + val splitSerializer = lakeSource.getSplitSerializer + val tableBucketsOffset = lakeSnapshot.getTableBucketsOffset + val buckets = (0 until tableInfo.getNumBuckets).toSeq + val bucketOffsetsRetriever = new BucketOffsetsRetrieverImpl(admin, tablePath) + + val partitions = if (tableInfo.isPartitioned) { + planPartitionedTable( + lakeSplits.asScala, + splitSerializer, + tableBucketsOffset, + buckets, + bucketOffsetsRetriever) + } else { + planNonPartitionedTable( + lakeSplits.asScala, + splitSerializer, + tableBucketsOffset, + buckets, + bucketOffsetsRetriever) + } + + (partitions, false) + } + + private def planNonPartitionedTable( + lakeSplits: Seq[LakeSplit], + splitSerializer: SimpleVersionedSerializer[LakeSplit], + tableBucketsOffset: java.util.Map[TableBucket, java.lang.Long], + buckets: Seq[Int], + bucketOffsetsRetriever: BucketOffsetsRetrieverImpl): Array[InputPartition] = { + val tableId = tableInfo.getTableId + + val lakePartitions = + createLakePartitions(lakeSplits, splitSerializer, tableId, partitionId = None) + + val stoppingOffsets = + getBucketOffsets(stoppingOffsetsInitializer, null, buckets, bucketOffsetsRetriever) + val logPartitions = buckets.flatMap { + bucketId => + val tableBucket = new TableBucket(tableId, bucketId) + createLogTailPartition(tableBucket, tableBucketsOffset, stoppingOffsets(bucketId)) + } + + (lakePartitions ++ logPartitions).toArray + } + + private def planPartitionedTable( + lakeSplits: Seq[LakeSplit], + splitSerializer: SimpleVersionedSerializer[LakeSplit], + tableBucketsOffset: java.util.Map[TableBucket, java.lang.Long], + buckets: Seq[Int], + bucketOffsetsRetriever: BucketOffsetsRetrieverImpl): Array[InputPartition] = { + val tableId = tableInfo.getTableId + + val flussPartitionIdByName = mutable.LinkedHashMap.empty[String, Long] + partitionInfos.asScala.foreach { + pi => flussPartitionIdByName(pi.getPartitionName) = pi.getPartitionId + } + + val lakeSplitsByPartition = groupLakeSplitsByPartition(lakeSplits) + var lakeSplitPartitionId = -1L + + val lakeAndLogPartitions = lakeSplitsByPartition.flatMap { + case (partitionName, splits) => + flussPartitionIdByName.remove(partitionName) match { + case Some(partitionId) => + // Partition in both lake and Fluss — lake splits + log tail + val lakePartitions = + createLakePartitions(splits, splitSerializer, tableId, Some(partitionId)) + + val stoppingOffsets = getBucketOffsets( + stoppingOffsetsInitializer, + partitionName, + buckets, + bucketOffsetsRetriever) + val logPartitions = buckets.flatMap { + bucketId => + val tableBucket = new TableBucket(tableId, partitionId, bucketId) + createLogTailPartition(tableBucket, tableBucketsOffset, stoppingOffsets(bucketId)) + } + + lakePartitions ++ logPartitions + + case None => + // Partition only in lake (expired in Fluss) — lake splits only + val pid = lakeSplitPartitionId + lakeSplitPartitionId -= 1 + createLakePartitions(splits, splitSerializer, tableId, Some(pid)) + } + }.toSeq + + // Partitions only in Fluss (not yet tiered) — log from earliest + val flussOnlyPartitions = flussPartitionIdByName.flatMap { + case (partitionName, partitionId) => + val stoppingOffsets = getBucketOffsets( + stoppingOffsetsInitializer, + partitionName, + buckets, + bucketOffsetsRetriever) + buckets.flatMap { + bucketId => + val stoppingOffset = stoppingOffsets(bucketId) + if (stoppingOffset > 0) { + val tableBucket = new TableBucket(tableId, partitionId, bucketId) + Some( + FlussAppendInputPartition( + tableBucket, + LogScanner.EARLIEST_OFFSET, + stoppingOffset): InputPartition) + } else { + None + } + } + }.toSeq + + (lakeAndLogPartitions ++ flussOnlyPartitions).toArray + } + + private def groupLakeSplitsByPartition( + lakeSplits: Seq[LakeSplit]): mutable.LinkedHashMap[String, mutable.ArrayBuffer[LakeSplit]] = { + val grouped = mutable.LinkedHashMap.empty[String, mutable.ArrayBuffer[LakeSplit]] + lakeSplits.foreach { + split => + val partitionName = if (split.partition() == null || split.partition().isEmpty) { + "" + } else { + split.partition().asScala.mkString(ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR) + } + grouped.getOrElseUpdate(partitionName, mutable.ArrayBuffer.empty) += split + } + grouped + } + + private def createLakePartitions( + splits: Seq[LakeSplit], + splitSerializer: SimpleVersionedSerializer[LakeSplit], + tableId: Long, + partitionId: Option[Long]): Seq[InputPartition] = { + splits.map { + split => + val tableBucket = partitionId match { + case Some(pid) => new TableBucket(tableId, pid, split.bucket()) + case None => new TableBucket(tableId, split.bucket()) + } + FlussLakeInputPartition(tableBucket, splitSerializer.serialize(split)) + } + } + + private def createLogTailPartition( + tableBucket: TableBucket, + tableBucketsOffset: java.util.Map[TableBucket, java.lang.Long], + stoppingOffset: Long): Option[InputPartition] = { + val snapshotLogOffset = tableBucketsOffset.get(tableBucket) + if (snapshotLogOffset != null) { + if (snapshotLogOffset.longValue() < stoppingOffset) { + Some(FlussAppendInputPartition(tableBucket, snapshotLogOffset.longValue(), stoppingOffset)) + } else { + None + } + } else if (stoppingOffset > 0) { + Some(FlussAppendInputPartition(tableBucket, LogScanner.EARLIEST_OFFSET, stoppingOffset)) + } else { + None + } + } + + private def planFallbackPartitions(): Array[InputPartition] = { + val fallbackStartInit = FlussOffsetInitializers.startOffsetsInitializer(options, flussConfig) + val bucketOffsetsRetriever = new BucketOffsetsRetrieverImpl(admin, tablePath) + val buckets = (0 until tableInfo.getNumBuckets).toSeq + val tableId = tableInfo.getTableId + + def createPartitions( + partitionId: Option[Long], + partitionName: String): Array[InputPartition] = { + val startOffsets = + getBucketOffsets(fallbackStartInit, partitionName, buckets, bucketOffsetsRetriever) + val stoppingOffsets = + getBucketOffsets(stoppingOffsetsInitializer, partitionName, buckets, bucketOffsetsRetriever) + + buckets.map { + bucketId => + val tableBucket = partitionId match { + case Some(pid) => new TableBucket(tableId, pid, bucketId) + case None => new TableBucket(tableId, bucketId) + } + FlussAppendInputPartition( + tableBucket, + startOffsets(bucketId), + stoppingOffsets(bucketId) + ): InputPartition + }.toArray + } + + if (tableInfo.isPartitioned) { + partitionInfos.asScala.flatMap { + pi => createPartitions(Some(pi.getPartitionId), pi.getPartitionName) + }.toArray + } else { + createPartitions(None, null) + } + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakePartitionReader.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendPartitionReader.scala similarity index 92% rename from fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakePartitionReader.scala rename to fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendPartitionReader.scala index ebd2f37602..4fa526ed46 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakePartitionReader.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendPartitionReader.scala @@ -20,7 +20,6 @@ package org.apache.fluss.spark.read.lake import org.apache.fluss.lake.source.{LakeSource, LakeSplit} import org.apache.fluss.metadata.TablePath import org.apache.fluss.record.LogRecord -import org.apache.fluss.spark.read.FlussLakeInputPartition import org.apache.fluss.spark.row.DataConverter import org.apache.fluss.types.RowType import org.apache.fluss.utils.CloseableIterator @@ -30,7 +29,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.read.PartitionReader /** Partition reader that reads data from a single lake split via lake storage (no Fluss connection). */ -class FlussLakePartitionReader( +class FlussLakeAppendPartitionReader( tablePath: TablePath, rowType: RowType, partition: FlussLakeInputPartition, @@ -51,9 +50,7 @@ class FlussLakePartitionReader( val split = splitSerializer.deserialize(splitSerializer.getVersion, partition.lakeSplitBytes) recordIterator = lakeSource - .createRecordReader(new LakeSource.ReaderContext[LakeSplit] { - override def lakeSplit(): LakeSplit = split - }) + .createRecordReader(() => split) .read() } diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakePartitionReaderFactory.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendPartitionReaderFactory.scala similarity index 88% rename from fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakePartitionReaderFactory.scala rename to fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendPartitionReaderFactory.scala index 595c8c0e14..809e68dbde 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakePartitionReaderFactory.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendPartitionReaderFactory.scala @@ -20,7 +20,7 @@ package org.apache.fluss.spark.read.lake import org.apache.fluss.config.Configuration import org.apache.fluss.lake.source.{LakeSource, LakeSplit} import org.apache.fluss.metadata.TablePath -import org.apache.fluss.spark.read.{FlussAppendInputPartition, FlussAppendPartitionReader, FlussLakeInputPartition} +import org.apache.fluss.spark.read.{FlussAppendInputPartition, FlussAppendPartitionReader} import org.apache.fluss.types.RowType import org.apache.spark.sql.catalyst.InternalRow @@ -38,8 +38,8 @@ class FlussLakeAppendPartitionReaderFactory( extends PartitionReaderFactory { @transient private lazy val lakeSource: LakeSource[LakeSplit] = { - val source = FlussLakeSourceUtils.createLakeSource(tableProperties, tablePath) - source.withProject(FlussLakeSourceUtils.lakeProjection(projection)) + val source = FlussLakeUtils.createLakeSource(tableProperties, tablePath) + source.withProject(FlussLakeUtils.lakeProjection(projection)) source } @@ -48,7 +48,7 @@ class FlussLakeAppendPartitionReaderFactory( override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { partition match { case lake: FlussLakeInputPartition => - new FlussLakePartitionReader(tablePath, projectedRowType, lake, lakeSource) + new FlussLakeAppendPartitionReader(tablePath, projectedRowType, lake, lakeSource) case log: FlussAppendInputPartition => new FlussAppendPartitionReader(tablePath, projection, log, flussConfig) case _ => diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeBatch.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeBatch.scala index a7b11c8760..20ff096d52 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeBatch.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeBatch.scala @@ -18,24 +18,17 @@ package org.apache.fluss.spark.read.lake import org.apache.fluss.client.initializer.{BucketOffsetsRetrieverImpl, OffsetsInitializer} -import org.apache.fluss.client.table.scanner.log.LogScanner import org.apache.fluss.config.Configuration -import org.apache.fluss.exception.LakeTableSnapshotNotExistException -import org.apache.fluss.lake.serializer.SimpleVersionedSerializer -import org.apache.fluss.lake.source.{LakeSource, LakeSplit} -import org.apache.fluss.metadata.{ResolvedPartitionSpec, TableBucket, TableInfo, TablePath} +import org.apache.fluss.metadata.{TableInfo, TablePath} import org.apache.fluss.spark.read._ -import org.apache.fluss.utils.ExceptionUtils -import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory} +import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap import scala.collection.JavaConverters._ -import scala.collection.mutable -/** Batch for reading lake-enabled log table (append-only table with datalake). */ -class FlussLakeAppendBatch( +abstract class FlussLakeBatch( tablePath: TablePath, tableInfo: TableInfo, readSchema: StructType, @@ -43,226 +36,21 @@ class FlussLakeAppendBatch( flussConfig: Configuration) extends FlussBatch(tablePath, tableInfo, readSchema, flussConfig) { - // Required by FlussBatch but unused — lake snapshot determines start offsets. - override val startOffsetsInitializer: OffsetsInitializer = OffsetsInitializer.earliest() - override val stoppingOffsetsInitializer: OffsetsInitializer = { FlussOffsetInitializers.stoppingOffsetsInitializer(true, options, flussConfig) } - private lazy val (partitions, isFallback) = doPlan() + lazy val (partitions, isFallback) = doPlan() override def planInputPartitions(): Array[InputPartition] = partitions - override def createReaderFactory(): PartitionReaderFactory = { - if (isFallback) { - new FlussAppendPartitionReaderFactory(tablePath, projection, options, flussConfig) - } else { - new FlussLakeAppendPartitionReaderFactory( - tableInfo.getProperties.toMap, - tablePath, - tableInfo.getRowType, - projection, - flussConfig) - } - } - /** * Plans input partitions for reading. The returned isFallback flag is true when no lake snapshot * exists and the plan falls back to pure log reading. */ - private def doPlan(): (Array[InputPartition], Boolean) = { - val lakeSnapshot = - try { - admin.getReadableLakeSnapshot(tablePath).get() - } catch { - case e: Exception => - if ( - ExceptionUtils - .stripExecutionException(e) - .isInstanceOf[LakeTableSnapshotNotExistException] - ) { - return (planFallbackPartitions(), true) - } - throw e - } - - val lakeSource = FlussLakeSourceUtils.createLakeSource(tableInfo.getProperties.toMap, tablePath) - lakeSource.withProject(FlussLakeSourceUtils.lakeProjection(projection)) - - val lakeSplits = lakeSource - .createPlanner(new LakeSource.PlannerContext { - override def snapshotId(): Long = lakeSnapshot.getSnapshotId - }) - .plan() - - val splitSerializer = lakeSource.getSplitSerializer - val tableBucketsOffset = lakeSnapshot.getTableBucketsOffset - val buckets = (0 until tableInfo.getNumBuckets).toSeq - val bucketOffsetsRetriever = new BucketOffsetsRetrieverImpl(admin, tablePath) - - val partitions = if (tableInfo.isPartitioned) { - planPartitionedTable( - lakeSplits.asScala, - splitSerializer, - tableBucketsOffset, - buckets, - bucketOffsetsRetriever) - } else { - planNonPartitionedTable( - lakeSplits.asScala, - splitSerializer, - tableBucketsOffset, - buckets, - bucketOffsetsRetriever) - } - - (partitions, false) - } - - private def planNonPartitionedTable( - lakeSplits: Seq[LakeSplit], - splitSerializer: SimpleVersionedSerializer[LakeSplit], - tableBucketsOffset: java.util.Map[TableBucket, java.lang.Long], - buckets: Seq[Int], - bucketOffsetsRetriever: BucketOffsetsRetrieverImpl): Array[InputPartition] = { - val tableId = tableInfo.getTableId - - val lakePartitions = - createLakePartitions(lakeSplits, splitSerializer, tableId, partitionId = None) - - val stoppingOffsets = - getBucketOffsets(stoppingOffsetsInitializer, null, buckets, bucketOffsetsRetriever) - val logPartitions = buckets.flatMap { - bucketId => - val tableBucket = new TableBucket(tableId, bucketId) - createLogTailPartition(tableBucket, tableBucketsOffset, stoppingOffsets(bucketId)) - } - - (lakePartitions ++ logPartitions).toArray - } - - private def planPartitionedTable( - lakeSplits: Seq[LakeSplit], - splitSerializer: SimpleVersionedSerializer[LakeSplit], - tableBucketsOffset: java.util.Map[TableBucket, java.lang.Long], - buckets: Seq[Int], - bucketOffsetsRetriever: BucketOffsetsRetrieverImpl): Array[InputPartition] = { - val tableId = tableInfo.getTableId - - val flussPartitionIdByName = mutable.LinkedHashMap.empty[String, Long] - partitionInfos.asScala.foreach { - pi => flussPartitionIdByName(pi.getPartitionName) = pi.getPartitionId - } - - val lakeSplitsByPartition = groupLakeSplitsByPartition(lakeSplits) - var lakeSplitPartitionId = -1L + def doPlan(): (Array[InputPartition], Boolean) - val lakeAndLogPartitions = lakeSplitsByPartition.flatMap { - case (partitionName, splits) => - flussPartitionIdByName.remove(partitionName) match { - case Some(partitionId) => - // Partition in both lake and Fluss — lake splits + log tail - val lakePartitions = - createLakePartitions(splits, splitSerializer, tableId, Some(partitionId)) - - val stoppingOffsets = getBucketOffsets( - stoppingOffsetsInitializer, - partitionName, - buckets, - bucketOffsetsRetriever) - val logPartitions = buckets.flatMap { - bucketId => - val tableBucket = new TableBucket(tableId, partitionId, bucketId) - createLogTailPartition(tableBucket, tableBucketsOffset, stoppingOffsets(bucketId)) - } - - lakePartitions ++ logPartitions - - case None => - // Partition only in lake (expired in Fluss) — lake splits only - val pid = lakeSplitPartitionId - lakeSplitPartitionId -= 1 - createLakePartitions(splits, splitSerializer, tableId, Some(pid)) - } - }.toSeq - - // Partitions only in Fluss (not yet tiered) — log from earliest - val flussOnlyPartitions = flussPartitionIdByName.flatMap { - case (partitionName, partitionId) => - val stoppingOffsets = getBucketOffsets( - stoppingOffsetsInitializer, - partitionName, - buckets, - bucketOffsetsRetriever) - buckets.flatMap { - bucketId => - val stoppingOffset = stoppingOffsets(bucketId) - if (stoppingOffset > 0) { - val tableBucket = new TableBucket(tableId, partitionId, bucketId) - Some( - FlussAppendInputPartition( - tableBucket, - LogScanner.EARLIEST_OFFSET, - stoppingOffset): InputPartition) - } else { - None - } - } - }.toSeq - - (lakeAndLogPartitions ++ flussOnlyPartitions).toArray - } - - private def groupLakeSplitsByPartition( - lakeSplits: Seq[LakeSplit]): mutable.LinkedHashMap[String, mutable.ArrayBuffer[LakeSplit]] = { - val grouped = mutable.LinkedHashMap.empty[String, mutable.ArrayBuffer[LakeSplit]] - lakeSplits.foreach { - split => - val partitionName = if (split.partition() == null || split.partition().isEmpty) { - "" - } else { - split.partition().asScala.mkString(ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR) - } - grouped.getOrElseUpdate(partitionName, mutable.ArrayBuffer.empty) += split - } - grouped - } - - private def createLakePartitions( - splits: Seq[LakeSplit], - splitSerializer: SimpleVersionedSerializer[LakeSplit], - tableId: Long, - partitionId: Option[Long]): Seq[InputPartition] = { - splits.map { - split => - val tableBucket = partitionId match { - case Some(pid) => new TableBucket(tableId, pid, split.bucket()) - case None => new TableBucket(tableId, split.bucket()) - } - FlussLakeInputPartition(tableBucket, splitSerializer.serialize(split)) - } - } - - private def createLogTailPartition( - tableBucket: TableBucket, - tableBucketsOffset: java.util.Map[TableBucket, java.lang.Long], - stoppingOffset: Long): Option[InputPartition] = { - val snapshotLogOffset = tableBucketsOffset.get(tableBucket) - if (snapshotLogOffset != null) { - if (snapshotLogOffset.longValue() < stoppingOffset) { - Some(FlussAppendInputPartition(tableBucket, snapshotLogOffset.longValue(), stoppingOffset)) - } else { - None - } - } else if (stoppingOffset > 0) { - Some(FlussAppendInputPartition(tableBucket, LogScanner.EARLIEST_OFFSET, stoppingOffset)) - } else { - None - } - } - - private def getBucketOffsets( + def getBucketOffsets( initializer: OffsetsInitializer, partitionName: String, buckets: Seq[Int], @@ -273,41 +61,4 @@ class FlussLakeAppendBatch( .map(e => (e._1.intValue(), Long2long(e._2))) .toMap } - - private def planFallbackPartitions(): Array[InputPartition] = { - val fallbackStartInit = FlussOffsetInitializers.startOffsetsInitializer(options, flussConfig) - val bucketOffsetsRetriever = new BucketOffsetsRetrieverImpl(admin, tablePath) - val buckets = (0 until tableInfo.getNumBuckets).toSeq - val tableId = tableInfo.getTableId - - def createPartitions( - partitionId: Option[Long], - partitionName: String): Array[InputPartition] = { - val startOffsets = - getBucketOffsets(fallbackStartInit, partitionName, buckets, bucketOffsetsRetriever) - val stoppingOffsets = - getBucketOffsets(stoppingOffsetsInitializer, partitionName, buckets, bucketOffsetsRetriever) - - buckets.map { - bucketId => - val tableBucket = partitionId match { - case Some(pid) => new TableBucket(tableId, pid, bucketId) - case None => new TableBucket(tableId, bucketId) - } - FlussAppendInputPartition( - tableBucket, - startOffsets(bucketId), - stoppingOffsets(bucketId) - ): InputPartition - }.toArray - } - - if (tableInfo.isPartitioned) { - partitionInfos.asScala.flatMap { - pi => createPartitions(Some(pi.getPartitionId), pi.getPartitionName) - }.toArray - } else { - createPartitions(None, null) - } - } } diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertBatch.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertBatch.scala index d1bc2f07f5..ccbe35b2ae 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertBatch.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertBatch.scala @@ -22,7 +22,7 @@ import org.apache.fluss.client.table.scanner.log.LogScanner import org.apache.fluss.config.Configuration import org.apache.fluss.exception.LakeTableSnapshotNotExistException import org.apache.fluss.lake.serializer.SimpleVersionedSerializer -import org.apache.fluss.lake.source.{LakeSource, LakeSplit} +import org.apache.fluss.lake.source.LakeSplit import org.apache.fluss.metadata.{ResolvedPartitionSpec, TableBucket, TableInfo, TablePath} import org.apache.fluss.spark.read._ import org.apache.fluss.utils.ExceptionUtils @@ -31,8 +31,6 @@ import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFacto import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap -import java.io.{ByteArrayOutputStream, DataOutputStream} - import scala.collection.JavaConverters._ import scala.collection.mutable @@ -46,7 +44,7 @@ class FlussLakeUpsertBatch( readSchema: StructType, options: CaseInsensitiveStringMap, flussConfig: Configuration) - extends FlussBatch(tablePath, tableInfo, readSchema, flussConfig) { + extends FlussLakeBatch(tablePath, tableInfo, readSchema, options, flussConfig) { override val startOffsetsInitializer: OffsetsInitializer = { val offsetsInitializer = FlussOffsetInitializers.startOffsetsInitializer(options, flussConfig) @@ -56,14 +54,6 @@ class FlussLakeUpsertBatch( offsetsInitializer } - override val stoppingOffsetsInitializer: OffsetsInitializer = { - FlussOffsetInitializers.stoppingOffsetsInitializer(true, options, flussConfig) - } - - private lazy val (partitions, isFallback) = doPlan() - - override def planInputPartitions(): Array[InputPartition] = partitions - override def createReaderFactory(): PartitionReaderFactory = { if (isFallback) { new FlussUpsertPartitionReaderFactory(tablePath, projection, options, flussConfig) @@ -76,11 +66,7 @@ class FlussLakeUpsertBatch( } } - /** - * Plans input partitions for reading. The returned isFallback flag is true when no lake snapshot - * exists and the plan falls back to pure Fluss kv reading. - */ - private def doPlan(): (Array[InputPartition], Boolean) = { + override def doPlan(): (Array[InputPartition], Boolean) = { val lakeSnapshot = try { admin.getReadableLakeSnapshot(tablePath).get() @@ -96,13 +82,11 @@ class FlussLakeUpsertBatch( throw e } - val lakeSource = FlussLakeSourceUtils.createLakeSource(tableInfo.getProperties.toMap, tablePath) - lakeSource.withProject(FlussLakeSourceUtils.lakeProjection(projection)) + val lakeSource = FlussLakeUtils.createLakeSource(tableInfo.getProperties.toMap, tablePath) + lakeSource.withProject(FlussLakeUtils.lakeProjection(projection)) val lakeSplits = lakeSource - .createPlanner(new LakeSource.PlannerContext { - override def snapshotId(): Long = lakeSnapshot.getSnapshotId - }) + .createPlanner(() => lakeSnapshot.getSnapshotId) .plan() val splitSerializer = lakeSource.getSplitSerializer @@ -138,7 +122,7 @@ class FlussLakeUpsertBatch( getBucketOffsets(stoppingOffsetsInitializer, null, buckets, bucketOffsetsRetriever) // Group lake splits by bucket - val lakeSplitsByBucket = groupLakeSplitsByBucket(lakeSplits, None) + val lakeSplitsByBucket = lakeSplits.groupBy(_.bucket()).mapValues(_.toSeq).toMap buckets.map { bucketId => @@ -261,12 +245,6 @@ class FlussLakeUpsertBatch( grouped.toMap } - private def groupLakeSplitsByBucket( - lakeSplits: Seq[LakeSplit], - partitionId: Option[Long]): Map[Int, Seq[LakeSplit]] = { - lakeSplits.groupBy(_.bucket()).mapValues(_.toSeq).toMap - } - private def createLakeUpsertPartition( tableBucket: TableBucket, lakeSplits: Option[Seq[LakeSplit]], @@ -276,7 +254,7 @@ class FlussLakeUpsertBatch( val (lakeSplitBytes, logStartingOffset) = if (lakeSplits.isDefined && lakeSplits.get.nonEmpty) { // Serialize all lake splits for this bucket into a single byte array - val serialized = serializeLakeSplits(lakeSplits.get, splitSerializer) + val serialized = FlussLakeUtils.serializeLakeSplits(lakeSplits.get, splitSerializer) val startOffset = if (snapshotLogOffset != null) snapshotLogOffset.longValue() else LogScanner.EARLIEST_OFFSET @@ -294,39 +272,6 @@ class FlussLakeUpsertBatch( ) } - private def serializeLakeSplits( - lakeSplits: Seq[LakeSplit], - splitSerializer: SimpleVersionedSerializer[LakeSplit]): Array[Byte] = { - val baos = new ByteArrayOutputStream() - val dos = new DataOutputStream(baos) - - // Write serializer version - dos.writeInt(splitSerializer.getVersion) - // Write number of splits - dos.writeInt(lakeSplits.size) - // Write each split - for (split <- lakeSplits) { - val serialized = splitSerializer.serialize(split) - dos.writeInt(serialized.length) - dos.write(serialized) - } - - dos.flush() - baos.toByteArray - } - - private def getBucketOffsets( - initializer: OffsetsInitializer, - partitionName: String, - buckets: Seq[Int], - bucketOffsetsRetriever: BucketOffsetsRetrieverImpl): Map[Int, Long] = { - initializer - .getBucketOffsets(partitionName, buckets.map(Integer.valueOf).asJava, bucketOffsetsRetriever) - .asScala - .map(e => (e._1.intValue(), Long2long(e._2))) - .toMap - } - private def planFallbackPartitions(): Array[InputPartition] = { // Fallback to pure Fluss kv reading when no lake snapshot exists val bucketOffsetsRetriever = new BucketOffsetsRetrieverImpl(admin, tablePath) diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertPartitionReader.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertPartitionReader.scala index 9dd77d43db..21514809f5 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertPartitionReader.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertPartitionReader.scala @@ -22,13 +22,10 @@ import org.apache.fluss.config.Configuration import org.apache.fluss.lake.source.{LakeSource, LakeSplit} import org.apache.fluss.metadata.TablePath import org.apache.fluss.row.InternalRow -import org.apache.fluss.spark.read.{FlussLakeUpsertInputPartition, FlussPartitionReader} +import org.apache.fluss.spark.read.FlussPartitionReader import org.apache.spark.internal.Logging -import java.io.{ByteArrayInputStream, DataInputStream} -import java.util.ArrayList - import scala.collection.JavaConverters._ /** @@ -47,7 +44,9 @@ class FlussLakeUpsertPartitionReader( with Logging { private val lakeSplits = if (flussPartition.lakeSplitBytes != null) { - deserializeLakeSplits(flussPartition.lakeSplitBytes, lakeSource.getSplitSerializer) + FlussLakeUtils.deserializeLakeSplits( + flussPartition.lakeSplitBytes, + lakeSource.getSplitSerializer) } else { null } @@ -86,30 +85,6 @@ class FlussLakeUpsertPartitionReader( false } - private def deserializeLakeSplits( - bytes: Array[Byte], - splitSerializer: org.apache.fluss.lake.serializer.SimpleVersionedSerializer[LakeSplit]) - : java.util.List[LakeSplit] = { - val bais = new ByteArrayInputStream(bytes) - val dis = new DataInputStream(bais) - - // Read serializer version - val version = dis.readInt() - // Read number of splits - val numSplits = dis.readInt() - val splits = new ArrayList[LakeSplit](numSplits) - - // Read each split - for (_ <- 0 until numSplits) { - val length = dis.readInt() - val serialized = new Array[Byte](length) - dis.readFully(serialized) - splits.add(splitSerializer.deserialize(version, serialized)) - } - - splits - } - private def initialize(): Unit = { val currentTs = System.currentTimeMillis() logInfo(s"Prepare read lake-enabled pk table $tablePath $flussPartition") diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertPartitionReaderFactory.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertPartitionReaderFactory.scala index f09074026b..525e29ec7f 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertPartitionReaderFactory.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUpsertPartitionReaderFactory.scala @@ -20,7 +20,6 @@ package org.apache.fluss.spark.read.lake import org.apache.fluss.config.Configuration import org.apache.fluss.lake.source.{LakeSource, LakeSplit} import org.apache.fluss.metadata.TablePath -import org.apache.fluss.spark.read.FlussLakeUpsertInputPartition import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} @@ -39,7 +38,7 @@ class FlussLakeUpsertPartitionReaderFactory( extends PartitionReaderFactory { @transient private lazy val lakeSource: LakeSource[LakeSplit] = { - FlussLakeSourceUtils.createLakeSource(tableProperties, tablePath) + FlussLakeUtils.createLakeSource(tableProperties, tablePath) } override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeSourceUtils.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUtils.scala similarity index 56% rename from fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeSourceUtils.scala rename to fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUtils.scala index 8fb4ccf6bc..a6dce8b08d 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeSourceUtils.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeUtils.scala @@ -19,14 +19,16 @@ package org.apache.fluss.spark.read.lake import org.apache.fluss.config.{ConfigOptions, Configuration} import org.apache.fluss.lake.lakestorage.LakeStoragePluginSetUp +import org.apache.fluss.lake.serializer.SimpleVersionedSerializer import org.apache.fluss.lake.source.{LakeSource, LakeSplit} import org.apache.fluss.metadata.TablePath import org.apache.fluss.utils.PropertiesUtils +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream} import java.util +import java.util.ArrayList -/** Shared utilities for creating lake sources and projections. */ -object FlussLakeSourceUtils { +object FlussLakeUtils { def createLakeSource( tableProperties: util.Map[String, String], @@ -46,4 +48,49 @@ object FlussLakeSourceUtils { def lakeProjection(projection: Array[Int]): Array[Array[Int]] = { projection.map(i => Array(i)) } + + def serializeLakeSplits( + lakeSplits: Seq[LakeSplit], + splitSerializer: SimpleVersionedSerializer[LakeSplit]): Array[Byte] = { + val baos = new ByteArrayOutputStream() + val dos = new DataOutputStream(baos) + + // Write serializer version + dos.writeInt(splitSerializer.getVersion) + // Write number of splits + dos.writeInt(lakeSplits.size) + // Write each split + for (split <- lakeSplits) { + val serialized = splitSerializer.serialize(split) + dos.writeInt(serialized.length) + dos.write(serialized) + } + + dos.flush() + baos.toByteArray + } + + def deserializeLakeSplits( + bytes: Array[Byte], + splitSerializer: org.apache.fluss.lake.serializer.SimpleVersionedSerializer[LakeSplit]) + : java.util.List[LakeSplit] = { + val bais = new ByteArrayInputStream(bytes) + val dis = new DataInputStream(bais) + + // Read serializer version + val version = dis.readInt() + // Read number of splits + val numSplits = dis.readInt() + val splits = new ArrayList[LakeSplit](numSplits) + + // Read each split + for (_ <- 0 until numSplits) { + val length = dis.readInt() + val serialized = new Array[Byte](length) + dis.readFully(serialized) + splits.add(splitSerializer.deserialize(version, serialized)) + } + + splits + } } diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala index 34ab0cd5f9..11287fb7d4 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala @@ -63,7 +63,6 @@ class FlussSparkTestBase extends QueryTest with SharedSparkSession { super.beforeAll() sql(s"USE $DEFAULT_DATABASE") - spark.sparkContext.setLogLevel("INFO") } override protected def afterAll(): Unit = { From 894dae18b47f155abc10cdf2deb8d278d532ec57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BE=8A=E5=B7=9D?= Date: Thu, 9 Apr 2026 17:25:12 +0800 Subject: [PATCH 4/7] remove duplicate code --- .../spark/read/FlussInputPartition.scala | 44 ------------------- ...SparkLakePrimaryKeyTableReadTestBase.scala | 4 +- 2 files changed, 2 insertions(+), 46 deletions(-) diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala index 9ae5bf1f52..648eb43888 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala @@ -45,23 +45,6 @@ case class FlussAppendInputPartition(tableBucket: TableBucket, startOffset: Long } } -/** - * Represents an input partition for reading data from a single lake split. Each lake split maps to - * one Spark task, enabling parallel lake reads across splits. - * - * @param tableBucket - * the table bucket this split belongs to - * @param lakeSplitBytes - * serialized lake split data - */ -case class FlussLakeInputPartition(tableBucket: TableBucket, lakeSplitBytes: Array[Byte]) - extends FlussInputPartition { - override def toString: String = { - s"FlussLakeInputPartition{tableId=${tableBucket.getTableId}, bucketId=${tableBucket.getBucket}," + - s" partitionId=${tableBucket.getPartitionId}," + - s" splitSize=${lakeSplitBytes.length}}" - } -} /** * Represents an input partition for reading data from a primary key table bucket. This partition @@ -88,30 +71,3 @@ case class FlussUpsertInputPartition( s" logStartOffset=$logStartingOffset, logStopOffset=$logStoppingOffset}" } } - -/** - * Represents an input partition for reading data from a lake-enabled primary key table bucket. This - * partition combines lake snapshot splits with Fluss log tail for hybrid lake-kv reading. - * - * @param tableBucket - * the table bucket to read from - * @param lakeSplitBytes - * serialized lake splits data (may be null if no lake snapshot) - * @param logStartingOffset - * the log offset where incremental reading should start - * @param logStoppingOffset - * the log offset where incremental reading should end - */ -case class FlussLakeUpsertInputPartition( - tableBucket: TableBucket, - lakeSplitBytes: Array[Byte], - logStartingOffset: Long, - logStoppingOffset: Long) - extends FlussInputPartition { - override def toString: String = { - s"FlussLakeUpsertInputPartition{tableId=${tableBucket.getTableId}, bucketId=${tableBucket.getBucket}," + - s" partitionId=${tableBucket.getPartitionId}," + - s" lakeSplitSize=${if (lakeSplitBytes != null) lakeSplitBytes.length else 0}," + - s" logStartOffset=$logStartingOffset, logStopOffset=$logStoppingOffset}" - } -} diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePrimaryKeyTableReadTestBase.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePrimaryKeyTableReadTestBase.scala index b0f3412f35..d1e47f9d93 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePrimaryKeyTableReadTestBase.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePrimaryKeyTableReadTestBase.scala @@ -329,7 +329,7 @@ abstract class SparkLakePrimaryKeyTableReadTestBase extends SparkLakeTableReadTe } } -class SparkLakePaimonPrimaryKeyTableReadTestBase extends SparkLakePrimaryKeyTableReadTestBase { +class SparkLakePaimonPrimaryKeyTableReadTest extends SparkLakePrimaryKeyTableReadTestBase { override protected def dataLakeFormat: DataLakeFormat = DataLakeFormat.PAIMON @@ -352,7 +352,7 @@ class SparkLakePaimonPrimaryKeyTableReadTestBase extends SparkLakePrimaryKeyTabl } } -class SparkLakeIcebergPrimaryKeyTableReadTestBase extends SparkLakePrimaryKeyTableReadTestBase { +class SparkLakeIcebergPrimaryKeyTableReadTest extends SparkLakePrimaryKeyTableReadTestBase { override protected def dataLakeFormat: DataLakeFormat = DataLakeFormat.ICEBERG From d55f761fb060a59dc574850819e9200a80e9d286 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BE=8A=E5=B7=9D?= Date: Thu, 9 Apr 2026 18:33:29 +0800 Subject: [PATCH 5/7] fix style --- .../scala/org/apache/fluss/spark/read/FlussInputPartition.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala index 648eb43888..661f8779f4 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala @@ -45,7 +45,6 @@ case class FlussAppendInputPartition(tableBucket: TableBucket, startOffset: Long } } - /** * Represents an input partition for reading data from a primary key table bucket. This partition * includes snapshot information for hybrid snapshot-log reading. From 0b75de5afb2d81479512ffd3e0c12e673f55130f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BE=8A=E5=B7=9D?= Date: Thu, 9 Apr 2026 18:57:21 +0800 Subject: [PATCH 6/7] ignore iceberg suite --- ...SparkLakePrimaryKeyTableReadTestBase.scala | 22 ------------------- 1 file changed, 22 deletions(-) diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePrimaryKeyTableReadTestBase.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePrimaryKeyTableReadTestBase.scala index d1e47f9d93..e8a4d23f59 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePrimaryKeyTableReadTestBase.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePrimaryKeyTableReadTestBase.scala @@ -351,25 +351,3 @@ class SparkLakePaimonPrimaryKeyTableReadTest extends SparkLakePrimaryKeyTableRea conf } } - -class SparkLakeIcebergPrimaryKeyTableReadTest extends SparkLakePrimaryKeyTableReadTestBase { - - override protected def dataLakeFormat: DataLakeFormat = DataLakeFormat.ICEBERG - - override protected def flussConf: Configuration = { - val conf = super.flussConf - conf.setString("datalake.format", DataLakeFormat.ICEBERG.toString) - conf.setString("datalake.iceberg.type", "hadoop") - warehousePath = - Files.createTempDirectory("fluss-testing-iceberg-pk-lake-read").resolve("warehouse").toString - conf.setString("datalake.iceberg.warehouse", warehousePath) - conf - } - - override protected def lakeCatalogConf: Configuration = { - val conf = new Configuration() - conf.setString("type", "hadoop") - conf.setString("warehouse", warehousePath) - conf - } -} From e3109d6a74879cff839189f36f3c71e091236d59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BE=8A=E5=B7=9D?= Date: Fri, 10 Apr 2026 09:48:30 +0800 Subject: [PATCH 7/7] trigger CI