Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@
* limitations under the License.
*/

package org.apache.fluss.flink.lake.reader;
package org.apache.fluss.client.table.scanner.batch;

import org.apache.fluss.client.table.Table;
import org.apache.fluss.client.table.scanner.ScanRecord;
import org.apache.fluss.client.table.scanner.SortMergeReader;
import org.apache.fluss.client.table.scanner.batch.BatchScanner;
import org.apache.fluss.client.table.scanner.log.LogScanner;
import org.apache.fluss.client.table.scanner.log.ScanRecords;
import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
import org.apache.fluss.lake.source.LakeSource;
import org.apache.fluss.lake.source.LakeSplit;
import org.apache.fluss.lake.source.RecordReader;
Expand Down Expand Up @@ -53,14 +51,14 @@
/** A scanner to merge the lakehouse's snapshot and change log. */
public class LakeSnapshotAndLogSplitScanner implements BatchScanner {

private final LakeSnapshotAndFlussLogSplit lakeSnapshotSplitAndFlussLogSplit;
private final @Nullable List<LakeSplit> lakeSplits;
private Comparator<InternalRow> rowComparator;
private List<CloseableIterator<LogRecord>> lakeRecordIterators = new ArrayList<>();
private final LakeSource<LakeSplit> lakeSource;

private final int[] pkIndexes;

// the indexes of primary key in emitted row by paimon and fluss
// the indexes of primary key in emitted row by lake and fluss
private int[] keyIndexesInRow;
@Nullable private int[] adjustProjectedFields;

Expand All @@ -76,11 +74,15 @@ public class LakeSnapshotAndLogSplitScanner implements BatchScanner {
public LakeSnapshotAndLogSplitScanner(
Table table,
LakeSource<LakeSplit> lakeSource,
LakeSnapshotAndFlussLogSplit lakeSnapshotAndFlussLogSplit,
@Nullable List<LakeSplit> lakeSplits,
TableBucket tableBucket,
long startingOffset,
long stoppingOffset,
@Nullable int[] projectedFields) {
this.pkIndexes = table.getTableInfo().getSchema().getPrimaryKeyIndexes();
this.lakeSnapshotSplitAndFlussLogSplit = lakeSnapshotAndFlussLogSplit;
this.lakeSplits = lakeSplits;
this.lakeSource = lakeSource;
this.stoppingOffset = stoppingOffset;
int[] newProjectedFields = getNeedProjectFields(table, projectedFields);

this.logScanner = table.newScan().project(newProjectedFields).createLogScanner();
Expand All @@ -89,29 +91,14 @@ public LakeSnapshotAndLogSplitScanner(
.mapToObj(field -> new int[] {field})
.toArray(int[][]::new));

TableBucket tableBucket = lakeSnapshotAndFlussLogSplit.getTableBucket();
if (tableBucket.getPartitionId() != null) {
this.logScanner.subscribe(
tableBucket.getPartitionId(),
tableBucket.getBucket(),
lakeSnapshotAndFlussLogSplit.getStartingOffset());
tableBucket.getPartitionId(), tableBucket.getBucket(), startingOffset);
} else {
this.logScanner.subscribe(
tableBucket.getBucket(), lakeSnapshotAndFlussLogSplit.getStartingOffset());
this.logScanner.subscribe(tableBucket.getBucket(), startingOffset);
}

this.stoppingOffset =
lakeSnapshotAndFlussLogSplit
.getStoppingOffset()
.orElseThrow(
() ->
new RuntimeException(
"StoppingOffset is null for split: "
+ lakeSnapshotAndFlussLogSplit));

this.logScanFinished =
lakeSnapshotAndFlussLogSplit.getStartingOffset() >= stoppingOffset
|| stoppingOffset <= 0;
this.logScanFinished = startingOffset >= stoppingOffset || stoppingOffset <= 0;
}

private int[] getNeedProjectFields(Table flussTable, @Nullable int[] projectedFields) {
Expand Down Expand Up @@ -169,11 +156,10 @@ private int findIndex(int[] array, int target) {
public CloseableIterator<InternalRow> pollBatch(Duration timeout) throws IOException {
if (logScanFinished) {
if (lakeRecordIterators.isEmpty()) {
if (lakeSnapshotSplitAndFlussLogSplit.getLakeSplits() == null
|| lakeSnapshotSplitAndFlussLogSplit.getLakeSplits().isEmpty()) {
if (lakeSplits == null || lakeSplits.isEmpty()) {
lakeRecordIterators = Collections.emptyList();
} else {
for (LakeSplit lakeSplit : lakeSnapshotSplitAndFlussLogSplit.getLakeSplits()) {
for (LakeSplit lakeSplit : lakeSplits) {
lakeRecordIterators.add(
lakeSource.createRecordReader(() -> lakeSplit).read());
}
Expand All @@ -195,12 +181,11 @@ public CloseableIterator<InternalRow> pollBatch(Duration timeout) throws IOExcep
} else {
if (lakeRecordIterators.isEmpty()) {
List<RecordReader> recordReaders = new ArrayList<>();
if (lakeSnapshotSplitAndFlussLogSplit.getLakeSplits() == null
|| lakeSnapshotSplitAndFlussLogSplit.getLakeSplits().isEmpty()) {
if (lakeSplits == null || lakeSplits.isEmpty()) {
// pass null split to get rowComparator
recordReaders.add(lakeSource.createRecordReader(() -> null));
} else {
for (LakeSplit lakeSplit : lakeSnapshotSplitAndFlussLogSplit.getLakeSplits()) {
for (LakeSplit lakeSplit : lakeSplits) {
recordReaders.add(lakeSource.createRecordReader(() -> lakeSplit));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.fluss.client.table.Table;
import org.apache.fluss.client.table.scanner.batch.BatchScanner;
import org.apache.fluss.flink.lake.reader.LakeSnapshotAndLogSplitScanner;
import org.apache.fluss.client.table.scanner.batch.LakeSnapshotAndLogSplitScanner;
import org.apache.fluss.flink.lake.reader.LakeSnapshotScanner;
import org.apache.fluss.flink.lake.reader.SeekableLakeSnapshotSplitScanner;
import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
Expand Down Expand Up @@ -101,9 +101,23 @@ private BatchScanner getBatchScanner(LakeSnapshotAndFlussLogSplit lakeSplit) {
lakeSplit.getCurrentLakeSplitIndex());

} else {
long stoppingOffset =
lakeSplit
.getStoppingOffset()
.orElseThrow(
() ->
new RuntimeException(
"StoppingOffset is null for split: "
+ lakeSplit));
lakeBatchScanner =
new LakeSnapshotAndLogSplitScanner(
table, lakeSource, lakeSplit, projectedFields);
table,
lakeSource,
lakeSplit.getLakeSplits(),
lakeSplit.getTableBucket(),
lakeSplit.getStartingOffset(),
stoppingOffset,
projectedFields);
}
return lakeBatchScanner;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.fluss.client.admin.Admin
import org.apache.fluss.config.{Configuration => FlussConfiguration}
import org.apache.fluss.metadata.{TableInfo, TablePath}
import org.apache.fluss.spark.catalog.{AbstractSparkTable, SupportsFlussPartitionManagement}
import org.apache.fluss.spark.read.{FlussAppendScanBuilder, FlussLakeAppendScanBuilder, FlussUpsertScanBuilder}
import org.apache.fluss.spark.read.{FlussAppendScanBuilder, FlussLakeAppendScanBuilder, FlussLakeUpsertScanBuilder, FlussUpsertScanBuilder}
import org.apache.fluss.spark.write.{FlussAppendWriteBuilder, FlussUpsertWriteBuilder}

import org.apache.spark.sql.catalyst.SQLConfHelper
Expand Down Expand Up @@ -75,7 +75,11 @@ class SparkTable(
new FlussAppendScanBuilder(tablePath, tableInfo, options, flussConfig)
}
} else {
new FlussUpsertScanBuilder(tablePath, tableInfo, options, flussConfig)
if (isDataLakeEnabled) {
new FlussLakeUpsertScanBuilder(tablePath, tableInfo, options, flussConfig)
} else {
new FlussUpsertScanBuilder(tablePath, tableInfo, options, flussConfig)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,6 @@ case class FlussAppendInputPartition(tableBucket: TableBucket, startOffset: Long
}
}

/**
* Represents an input partition for reading data from a single lake split. Each lake split maps to
* one Spark task, enabling parallel lake reads across splits.
*
* @param tableBucket
* the table bucket this split belongs to
* @param lakeSplitBytes
* serialized lake split data
*/
case class FlussLakeInputPartition(tableBucket: TableBucket, lakeSplitBytes: Array[Byte])
extends FlussInputPartition {
override def toString: String = {
s"FlussLakeInputPartition{tableId=${tableBucket.getTableId}, bucketId=${tableBucket.getBucket}," +
s" partitionId=${tableBucket.getPartitionId}," +
s" splitSize=${lakeSplitBytes.length}}"
}
}

/**
* Represents an input partition for reading data from a primary key table bucket. This partition
* includes snapshot information for hybrid snapshot-log reading.
Expand All @@ -85,6 +67,6 @@ case class FlussUpsertInputPartition(
override def toString: String = {
s"FlussUpsertInputPartition{tableId=${tableBucket.getTableId}, bucketId=${tableBucket.getBucket}," +
s" partitionId=${tableBucket.getPartitionId}, snapshotId=$snapshotId," +
s" logStartOffset=$logStartingOffset, logStopOffset=$logStoppingOffset"
s" logStartOffset=$logStartingOffset, logStopOffset=$logStoppingOffset}"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.fluss.spark.read
import org.apache.fluss.config.Configuration
import org.apache.fluss.metadata.{TableInfo, TablePath}
import org.apache.fluss.spark.SparkConversions
import org.apache.fluss.spark.read.lake.{FlussLakeAppendBatch, FlussLakeUpsertBatch}

import org.apache.spark.sql.connector.read.{Batch, Scan}
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream
Expand Down Expand Up @@ -108,3 +109,27 @@ case class FlussUpsertScan(
checkpointLocation)
}
}

/** Fluss Lake Upsert Scan for lake-enabled primary key tables. */
case class FlussLakeUpsertScan(
tablePath: TablePath,
tableInfo: TableInfo,
requiredSchema: Option[StructType],
options: CaseInsensitiveStringMap,
flussConfig: Configuration)
extends FlussScan {

override def toBatch: Batch = {
new FlussLakeUpsertBatch(tablePath, tableInfo, readSchema, options, flussConfig)
}

override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = {
new FlussUpsertMicroBatchStream(
tablePath,
tableInfo,
readSchema,
options,
flussConfig,
checkpointLocation)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,16 @@ class FlussUpsertScanBuilder(
FlussUpsertScan(tablePath, tableInfo, requiredSchema, options, flussConfig)
}
}

/** Fluss Lake Upsert Scan Builder for lake-enabled primary key tables. */
class FlussLakeUpsertScanBuilder(
tablePath: TablePath,
tableInfo: TableInfo,
options: CaseInsensitiveStringMap,
flussConfig: FlussConfiguration)
extends FlussScanBuilder {

override def build(): Scan = {
FlussLakeUpsertScan(tablePath, tableInfo, requiredSchema, options, flussConfig)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.fluss.spark.read
package org.apache.fluss.spark.read.lake

import org.apache.fluss.client.initializer.{BucketOffsetsRetrieverImpl, OffsetsInitializer}
import org.apache.fluss.client.table.scanner.log.LogScanner
Expand All @@ -24,6 +24,7 @@ import org.apache.fluss.exception.LakeTableSnapshotNotExistException
import org.apache.fluss.lake.serializer.SimpleVersionedSerializer
import org.apache.fluss.lake.source.{LakeSource, LakeSplit}
import org.apache.fluss.metadata.{ResolvedPartitionSpec, TableBucket, TableInfo, TablePath}
import org.apache.fluss.spark.read._
import org.apache.fluss.utils.ExceptionUtils

import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory}
Expand All @@ -40,19 +41,11 @@ class FlussLakeAppendBatch(
readSchema: StructType,
options: CaseInsensitiveStringMap,
flussConfig: Configuration)
extends FlussBatch(tablePath, tableInfo, readSchema, flussConfig) {
extends FlussLakeBatch(tablePath, tableInfo, readSchema, options, flussConfig) {

// Required by FlussBatch but unused — lake snapshot determines start offsets.
// Required by FlussLakeBatch but unused — lake snapshot determines start offsets.
override val startOffsetsInitializer: OffsetsInitializer = OffsetsInitializer.earliest()

override val stoppingOffsetsInitializer: OffsetsInitializer = {
FlussOffsetInitializers.stoppingOffsetsInitializer(true, options, flussConfig)
}

private lazy val (partitions, isFallback) = doPlan()

override def planInputPartitions(): Array[InputPartition] = partitions

override def createReaderFactory(): PartitionReaderFactory = {
if (isFallback) {
new FlussAppendPartitionReaderFactory(tablePath, projection, options, flussConfig)
Expand All @@ -66,11 +59,7 @@ class FlussLakeAppendBatch(
}
}

/**
* Plans input partitions for reading. The returned isFallback flag is true when no lake snapshot
* exists and the plan falls back to pure log reading.
*/
private def doPlan(): (Array[InputPartition], Boolean) = {
override def doPlan(): (Array[InputPartition], Boolean) = {
val lakeSnapshot =
try {
admin.getReadableLakeSnapshot(tablePath).get()
Expand All @@ -86,8 +75,8 @@ class FlussLakeAppendBatch(
throw e
}

val lakeSource = FlussLakeSourceUtils.createLakeSource(tableInfo.getProperties.toMap, tablePath)
lakeSource.withProject(FlussLakeSourceUtils.lakeProjection(projection))
val lakeSource = FlussLakeUtils.createLakeSource(tableInfo.getProperties.toMap, tablePath)
lakeSource.withProject(FlussLakeUtils.lakeProjection(projection))

val lakeSplits = lakeSource
.createPlanner(new LakeSource.PlannerContext {
Expand Down Expand Up @@ -261,18 +250,6 @@ class FlussLakeAppendBatch(
}
}

private def getBucketOffsets(
initializer: OffsetsInitializer,
partitionName: String,
buckets: Seq[Int],
bucketOffsetsRetriever: BucketOffsetsRetrieverImpl): Map[Int, Long] = {
initializer
.getBucketOffsets(partitionName, buckets.map(Integer.valueOf).asJava, bucketOffsetsRetriever)
.asScala
.map(e => (e._1.intValue(), Long2long(e._2)))
.toMap
}

private def planFallbackPartitions(): Array[InputPartition] = {
val fallbackStartInit = FlussOffsetInitializers.startOffsetsInitializer(options, flussConfig)
val bucketOffsetsRetriever = new BucketOffsetsRetrieverImpl(admin, tablePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.fluss.spark.read
package org.apache.fluss.spark.read.lake

import org.apache.fluss.lake.source.{LakeSource, LakeSplit}
import org.apache.fluss.metadata.TablePath
Expand All @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.read.PartitionReader

/** Partition reader that reads data from a single lake split via lake storage (no Fluss connection). */
class FlussLakePartitionReader(
class FlussLakeAppendPartitionReader(
tablePath: TablePath,
rowType: RowType,
partition: FlussLakeInputPartition,
Expand All @@ -50,9 +50,7 @@ class FlussLakePartitionReader(
val split = splitSerializer.deserialize(splitSerializer.getVersion, partition.lakeSplitBytes)

recordIterator = lakeSource
.createRecordReader(new LakeSource.ReaderContext[LakeSplit] {
override def lakeSplit(): LakeSplit = split
})
.createRecordReader(() => split)
.read()
}

Expand Down
Loading
Loading