From d0c6e944f12dd42540c5585c61721913d2176b42 Mon Sep 17 00:00:00 2001 From: fhan Date: Fri, 3 Apr 2026 16:33:28 +0800 Subject: [PATCH] support lance union read --- .../fluss/lake/lance/LanceLakeStorage.java | 3 +- .../lake/lance/source/LanceLakeSource.java | 101 ++++++ .../lance/source/LancePredicatePushDown.java | 204 ++++++++++++ .../lake/lance/source/LanceRecordReader.java | 310 ++++++++++++++++++ .../fluss/lake/lance/source/LanceSplit.java | 97 ++++++ .../lake/lance/source/LanceSplitPlanner.java | 88 +++++ .../lance/source/LanceSplitSerializer.java | 87 +++++ .../lance/source/LanceRecordReaderTest.java | 127 +++++++ .../lance/source/LanceSourceTestBase.java | 190 +++++++++++ .../lance/source/LanceSplitPlannerTest.java | 83 +++++ .../source/LanceSplitSerializerTest.java | 61 ++++ 11 files changed, 1350 insertions(+), 1 deletion(-) create mode 100644 fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/source/LanceLakeSource.java create mode 100644 fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/source/LancePredicatePushDown.java create mode 100644 fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/source/LanceRecordReader.java create mode 100644 fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/source/LanceSplit.java create mode 100644 fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/source/LanceSplitPlanner.java create mode 100644 fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/source/LanceSplitSerializer.java create mode 100644 fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/source/LanceRecordReaderTest.java create mode 100644 fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/source/LanceSourceTestBase.java create mode 100644 fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/source/LanceSplitPlannerTest.java create mode 100644 fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/source/LanceSplitSerializerTest.java diff --git a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/LanceLakeStorage.java b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/LanceLakeStorage.java index 3cadec74ad..6ac99cf03b 100644 --- a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/LanceLakeStorage.java +++ b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/LanceLakeStorage.java @@ -19,6 +19,7 @@ import org.apache.fluss.config.Configuration; import org.apache.fluss.lake.lakestorage.LakeStorage; +import org.apache.fluss.lake.lance.source.LanceLakeSource; import org.apache.fluss.lake.lance.tiering.LanceCommittable; import org.apache.fluss.lake.lance.tiering.LanceLakeTieringFactory; import org.apache.fluss.lake.lance.tiering.LanceWriteResult; @@ -46,6 +47,6 @@ public LanceLakeCatalog createLakeCatalog() { @Override public LakeSource createLakeSource(TablePath tablePath) { - throw new UnsupportedOperationException("Not implemented"); + return new LanceLakeSource(config, tablePath); } } diff --git a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/source/LanceLakeSource.java b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/source/LanceLakeSource.java new file mode 100644 index 0000000000..0ec700b449 --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/source/LanceLakeSource.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.lance.source; + +import org.apache.fluss.config.Configuration; +import org.apache.fluss.lake.serializer.SimpleVersionedSerializer; +import org.apache.fluss.lake.source.LakeSource; +import org.apache.fluss.lake.source.Planner; +import org.apache.fluss.lake.source.RecordReader; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.predicate.Predicate; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +/** Lance lake source. */ +public class LanceLakeSource implements LakeSource { + private static final long serialVersionUID = 1L; + + private final Configuration configuration; + private final TablePath tablePath; + + private @Nullable int[][] project; + private @Nullable Integer limit; + private @Nullable String filterSql; + + public LanceLakeSource(Configuration configuration, TablePath tablePath) { + this.configuration = configuration; + this.tablePath = tablePath; + } + + @Override + public void withProject(int[][] project) { + this.project = project; + } + + @Override + public void withLimit(int limit) { + this.limit = limit; + } + + @Override + public FilterPushDownResult withFilters(List predicates) { + List acceptedPredicates = new ArrayList(); + List remainingPredicates = new ArrayList(); + List sqlParts = new ArrayList(); + + for (Predicate predicate : predicates) { + Optional sql = LancePredicatePushDown.toSql(predicate); + if (sql.isPresent()) { + acceptedPredicates.add(predicate); + sqlParts.add("(" + sql.get() + ")"); + } else { + remainingPredicates.add(predicate); + } + } + + if (sqlParts.isEmpty()) { + this.filterSql = null; + } else { + this.filterSql = String.join(" AND ", sqlParts); + } + + return FilterPushDownResult.of(acceptedPredicates, remainingPredicates); + } + + @Override + public Planner createPlanner(PlannerContext context) { + return new LanceSplitPlanner(configuration, tablePath, context.snapshotId(), limit); + } + + @Override + public RecordReader createRecordReader(ReaderContext context) throws IOException { + return new LanceRecordReader(configuration, tablePath, context.lakeSplit(), project, filterSql); + } + + @Override + public SimpleVersionedSerializer getSplitSerializer() { + return new LanceSplitSerializer(); + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/source/LancePredicatePushDown.java b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/source/LancePredicatePushDown.java new file mode 100644 index 0000000000..7fcf4c4122 --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/source/LancePredicatePushDown.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.lance.source; + +import org.apache.fluss.predicate.And; +import org.apache.fluss.predicate.CompoundPredicate; +import org.apache.fluss.predicate.Equal; +import org.apache.fluss.predicate.GreaterOrEqual; +import org.apache.fluss.predicate.GreaterThan; +import org.apache.fluss.predicate.In; +import org.apache.fluss.predicate.IsNotNull; +import org.apache.fluss.predicate.IsNull; +import org.apache.fluss.predicate.LeafPredicate; +import org.apache.fluss.predicate.LessOrEqual; +import org.apache.fluss.predicate.LessThan; +import org.apache.fluss.predicate.NotEqual; +import org.apache.fluss.predicate.NotIn; +import org.apache.fluss.predicate.Or; +import org.apache.fluss.predicate.Predicate; +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.Decimal; +import org.apache.fluss.row.TimestampLtz; +import org.apache.fluss.row.TimestampNtz; + +import java.math.BigDecimal; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +/** Converts Fluss predicate into Lance SQL filter expression. */ +final class LancePredicatePushDown { + + private LancePredicatePushDown() {} + + static Optional toSql(Predicate predicate) { + if (predicate instanceof LeafPredicate) { + return toLeafSql((LeafPredicate) predicate); + } + + if (predicate instanceof CompoundPredicate) { + CompoundPredicate compound = (CompoundPredicate) predicate; + String op; + if (compound.function() instanceof And) { + op = "AND"; + } else if (compound.function() instanceof Or) { + op = "OR"; + } else { + return Optional.empty(); + } + + List childSql = new ArrayList<>(); + for (Predicate child : compound.children()) { + Optional sql = toSql(child); + if (!sql.isPresent()) { + return Optional.empty(); + } + childSql.add("(" + sql.get() + ")"); + } + if (childSql.isEmpty()) { + return Optional.empty(); + } + return Optional.of(String.join(" " + op + " ", childSql)); + } + + return Optional.empty(); + } + + private static Optional toLeafSql(LeafPredicate predicate) { + String column = quoteIdentifier(predicate.fieldName()); + + if (predicate.function() instanceof IsNull) { + return Optional.of(column + " IS NULL"); + } + if (predicate.function() instanceof IsNotNull) { + return Optional.of(column + " IS NOT NULL"); + } + + if (predicate.function() instanceof In || predicate.function() instanceof NotIn) { + if (predicate.literals() == null || predicate.literals().isEmpty()) { + return Optional.empty(); + } + + List literals = new ArrayList<>(); + for (Object literal : predicate.literals()) { + Optional sql = literalToSql(literal); + if (!sql.isPresent()) { + return Optional.empty(); + } + literals.add(sql.get()); + } + + String op = predicate.function() instanceof In ? " IN " : " NOT IN "; + return Optional.of(column + op + "(" + String.join(", ", literals) + ")"); + } + + if (predicate.literals() == null || predicate.literals().size() != 1) { + return Optional.empty(); + } + Optional literal = literalToSql(predicate.literals().get(0)); + if (!literal.isPresent()) { + return Optional.empty(); + } + + if (predicate.function() instanceof Equal) { + return Optional.of(column + " = " + literal.get()); + } + if (predicate.function() instanceof NotEqual) { + return Optional.of(column + " <> " + literal.get()); + } + if (predicate.function() instanceof GreaterThan) { + return Optional.of(column + " > " + literal.get()); + } + if (predicate.function() instanceof GreaterOrEqual) { + return Optional.of(column + " >= " + literal.get()); + } + if (predicate.function() instanceof LessThan) { + return Optional.of(column + " < " + literal.get()); + } + if (predicate.function() instanceof LessOrEqual) { + return Optional.of(column + " <= " + literal.get()); + } + + return Optional.empty(); + } + + private static String quoteIdentifier(String name) { + return "\"" + name.replace("\"", "\"\"") + "\""; + } + + private static String quoteString(String value) { + return "'" + value.replace("'", "''") + "'"; + } + + private static Optional literalToSql(Object literal) { + if (literal == null) { + return Optional.of("NULL"); + } + + if (literal instanceof String) { + return Optional.of(quoteString((String) literal)); + } + if (literal instanceof BinaryString) { + return Optional.of(quoteString(((BinaryString) literal).toString())); + } + if (literal instanceof Boolean) { + return Optional.of((Boolean) literal ? "TRUE" : "FALSE"); + } + if (literal instanceof Byte + || literal instanceof Short + || literal instanceof Integer + || literal instanceof Long + || literal instanceof Float + || literal instanceof Double) { + return Optional.of(String.valueOf(literal)); + } + if (literal instanceof BigDecimal) { + return Optional.of(((BigDecimal) literal).toPlainString()); + } + if (literal instanceof Decimal) { + return Optional.of(((Decimal) literal).toBigDecimal().toPlainString()); + } + if (literal instanceof LocalDate) { + return Optional.of(quoteString(literal.toString())); + } + if (literal instanceof LocalDateTime) { + return Optional.of(quoteString(literal.toString())); + } + if (literal instanceof OffsetDateTime) { + return Optional.of(quoteString(literal.toString())); + } + if (literal instanceof Instant) { + return Optional.of(quoteString(((Instant) literal).atOffset(ZoneOffset.UTC).toString())); + } + if (literal instanceof TimestampNtz) { + return Optional.of(quoteString(((TimestampNtz) literal).toLocalDateTime().toString())); + } + if (literal instanceof TimestampLtz) { + return Optional.of(quoteString(((TimestampLtz) literal).toInstant().toString())); + } + + return Optional.empty(); + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/source/LanceRecordReader.java b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/source/LanceRecordReader.java new file mode 100644 index 0000000000..a96d2588bc --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/source/LanceRecordReader.java @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.lance.source; + +import org.apache.fluss.config.Configuration; +import org.apache.fluss.lake.lance.LanceConfig; +import org.apache.fluss.lake.source.RecordReader; +import org.apache.fluss.record.ChangeType; +import org.apache.fluss.record.GenericRecord; +import org.apache.fluss.record.LogRecord; +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.Decimal; +import org.apache.fluss.row.GenericRow; +import org.apache.fluss.utils.CloseableIterator; +import org.apache.fluss.metadata.TablePath; + +import com.lancedb.lance.Dataset; +import com.lancedb.lance.ReadOptions; +import com.lancedb.lance.ipc.LanceScanner; +import com.lancedb.lance.ipc.ScanOptions; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.DecimalVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.FixedSizeBinaryVector; +import org.apache.arrow.vector.Float4Vector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.SmallIntVector; +import org.apache.arrow.vector.TinyIntVector; +import org.apache.arrow.vector.ValueVector; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.util.Text; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME; +import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME; +import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME; + +/** Record reader for Lance table split. */ +public class LanceRecordReader implements RecordReader { + private final LanceConfig lanceConfig; + private final LanceSplit split; + private final @Nullable int[][] project; + private final @Nullable String filterSql; + + public LanceRecordReader( + Configuration configuration, + TablePath tablePath, + LanceSplit split, + @Nullable int[][] project, + @Nullable String filterSql) { + this.lanceConfig = + LanceConfig.from( + configuration.toMap(), + Collections.emptyMap(), + tablePath.getDatabaseName(), + tablePath.getTableName()); + this.split = split; + this.project = project; + this.filterSql = filterSql; + } + + @Override + public CloseableIterator read() throws IOException { + if (split.scanLimit() == 0) { + return CloseableIterator.emptyIterator(); + } + + ReadOptions readOptions = LanceConfig.genReadOptionFromConfig(lanceConfig); + Dataset dataset = Dataset.open(lanceConfig.getDatasetUri(), readOptions); + if (split.snapshotId() > 0) { + dataset.checkoutVersion(split.snapshotId()); + } + + List dataColumns = listDataColumns(dataset); + List outputColumns = resolveOutputColumns(dataColumns, project); + List scanColumns = buildScanColumns(outputColumns); + + ScanOptions.Builder optionsBuilder = + new ScanOptions.Builder().fragmentIds(Arrays.asList(split.fragmentId())); + if (!scanColumns.isEmpty()) { + optionsBuilder.columns(scanColumns); + } + if (filterSql != null && !filterSql.isEmpty()) { + optionsBuilder.filter(filterSql); + } + if (split.scanLimit() > 0) { + optionsBuilder.limit(split.scanLimit()); + } + + LanceScanner scanner = dataset.newScan(optionsBuilder.build()); + ArrowReader arrowReader = scanner.scanBatches(); + + return new LanceRecordIterator(dataset, scanner, arrowReader, outputColumns); + } + + private static List listDataColumns(Dataset dataset) { + List columns = new ArrayList(); + for (Field field : dataset.getSchema().getFields()) { + if (isSystemColumn(field.getName())) { + continue; + } + columns.add(field.getName()); + } + return columns; + } + + private static List resolveOutputColumns(List dataColumns, @Nullable int[][] project) { + if (project == null) { + return dataColumns; + } + + List projected = new ArrayList(); + for (int[] indexPath : project) { + if (indexPath == null || indexPath.length == 0) { + continue; + } + int topLevelIndex = indexPath[0]; + if (topLevelIndex < 0 || topLevelIndex >= dataColumns.size()) { + continue; + } + projected.add(dataColumns.get(topLevelIndex)); + } + return projected; + } + + private static List buildScanColumns(List outputColumns) { + Set columns = new LinkedHashSet(outputColumns); + columns.add(OFFSET_COLUMN_NAME); + columns.add(TIMESTAMP_COLUMN_NAME); + return new ArrayList(columns); + } + + private static boolean isSystemColumn(String name) { + return OFFSET_COLUMN_NAME.equals(name) + || TIMESTAMP_COLUMN_NAME.equals(name) + || BUCKET_COLUMN_NAME.equals(name); + } + + private static final class LanceRecordIterator implements CloseableIterator { + private final Dataset dataset; + private final LanceScanner scanner; + private final ArrowReader arrowReader; + private final List outputColumns; + + private VectorSchemaRoot currentBatch; + private int rowIndexInBatch; + + private LanceRecordIterator( + Dataset dataset, + LanceScanner scanner, + ArrowReader arrowReader, + List outputColumns) { + this.dataset = dataset; + this.scanner = scanner; + this.arrowReader = arrowReader; + this.outputColumns = outputColumns; + this.currentBatch = null; + this.rowIndexInBatch = 0; + } + + @Override + public boolean hasNext() { + try { + while (currentBatch == null || rowIndexInBatch >= currentBatch.getRowCount()) { + if (!arrowReader.loadNextBatch()) { + return false; + } + currentBatch = arrowReader.getVectorSchemaRoot(); + rowIndexInBatch = 0; + } + return true; + } catch (IOException e) { + throw new RuntimeException("Failed to read Lance scan batch.", e); + } + } + + @Override + public LogRecord next() { + GenericRow row = new GenericRow(outputColumns.size()); + for (int i = 0; i < outputColumns.size(); i++) { + FieldVector vector = currentBatch.getVector(outputColumns.get(i)); + row.setField(i, vectorToInternalValue(vector, rowIndexInBatch)); + } + + long offset = getLong(currentBatch.getVector(OFFSET_COLUMN_NAME), rowIndexInBatch, -1L); + long timestamp = + getLong(currentBatch.getVector(TIMESTAMP_COLUMN_NAME), rowIndexInBatch, System.currentTimeMillis()); + + rowIndexInBatch++; + return new GenericRecord(offset, timestamp, ChangeType.APPEND_ONLY, row); + } + + @Override + public void close() { + closeQuietly(arrowReader); + closeQuietly(scanner); + closeQuietly(dataset); + } + + private static long getLong(@Nullable ValueVector vector, int row, long defaultValue) { + if (vector == null || vector.isNull(row)) { + return defaultValue; + } + if (vector instanceof BigIntVector) { + return ((BigIntVector) vector).get(row); + } + Object value = vector.getObject(row); + if (value instanceof Number) { + return ((Number) value).longValue(); + } + return defaultValue; + } + + private static Object vectorToInternalValue(@Nullable ValueVector vector, int row) { + if (vector == null || vector.isNull(row)) { + return null; + } + + if (vector instanceof IntVector) { + return ((IntVector) vector).get(row); + } + if (vector instanceof BigIntVector) { + return ((BigIntVector) vector).get(row); + } + if (vector instanceof TinyIntVector) { + return ((TinyIntVector) vector).get(row); + } + if (vector instanceof SmallIntVector) { + return ((SmallIntVector) vector).get(row); + } + if (vector instanceof Float4Vector) { + return ((Float4Vector) vector).get(row); + } + if (vector instanceof Float8Vector) { + return ((Float8Vector) vector).get(row); + } + if (vector instanceof BitVector) { + return ((BitVector) vector).get(row) == 1; + } + if (vector instanceof VarCharVector) { + return BinaryString.fromString(((VarCharVector) vector).getObject(row).toString()); + } + if (vector instanceof VarBinaryVector) { + return ((VarBinaryVector) vector).get(row); + } + if (vector instanceof FixedSizeBinaryVector) { + return ((FixedSizeBinaryVector) vector).get(row); + } + if (vector instanceof DecimalVector) { + DecimalVector decimalVector = (DecimalVector) vector; + BigDecimal value = decimalVector.getObject(row); + return Decimal.fromBigDecimal(value, decimalVector.getPrecision(), decimalVector.getScale()); + } + + Object value = vector.getObject(row); + if (value instanceof Text) { + return BinaryString.fromString(value.toString()); + } + if (value instanceof String) { + return BinaryString.fromString((String) value); + } + return value; + } + + private static void closeQuietly(Object resource) { + if (resource == null) { + return; + } + try { + if (resource instanceof AutoCloseable) { + ((AutoCloseable) resource).close(); + } + } catch (Exception e) { + // best effort close + } + } + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/source/LanceSplit.java b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/source/LanceSplit.java new file mode 100644 index 0000000000..2cbbc4b4ce --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/source/LanceSplit.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.lance.source; + +import org.apache.fluss.lake.source.LakeSplit; + +import java.io.Serializable; +import java.util.Collections; +import java.util.List; + +/** Split for Lance table. */ +public class LanceSplit implements LakeSplit, Serializable { + private static final long serialVersionUID = 1L; + + private final int fragmentId; + private final long snapshotId; + private final long fragmentRows; + private final long scanLimit; + private final int bucket; + private final List partition; + + public LanceSplit( + int fragmentId, + long snapshotId, + long fragmentRows, + long scanLimit, + int bucket, + List partition) { + this.fragmentId = fragmentId; + this.snapshotId = snapshotId; + this.fragmentRows = fragmentRows; + this.scanLimit = scanLimit; + this.bucket = bucket; + this.partition = + partition == null ? Collections.emptyList() : Collections.unmodifiableList(partition); + } + + public int fragmentId() { + return fragmentId; + } + + public long snapshotId() { + return snapshotId; + } + + public long fragmentRows() { + return fragmentRows; + } + + public long scanLimit() { + return scanLimit; + } + + @Override + public int bucket() { + return bucket; + } + + @Override + public List partition() { + return partition; + } + + @Override + public String toString() { + return "LanceSplit{" + + "fragmentId=" + + fragmentId + + ", snapshotId=" + + snapshotId + + ", fragmentRows=" + + fragmentRows + + ", scanLimit=" + + scanLimit + + ", bucket=" + + bucket + + ", partition=" + + partition + + '}'; + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/source/LanceSplitPlanner.java b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/source/LanceSplitPlanner.java new file mode 100644 index 0000000000..b271589f92 --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/source/LanceSplitPlanner.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.lance.source; + +import org.apache.fluss.config.Configuration; +import org.apache.fluss.lake.lance.LanceConfig; +import org.apache.fluss.lake.source.Planner; +import org.apache.fluss.metadata.TablePath; + +import com.lancedb.lance.Dataset; +import com.lancedb.lance.Fragment; +import com.lancedb.lance.ReadOptions; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** Split planner for Lance table. */ +public class LanceSplitPlanner implements Planner { + private final LanceConfig lanceConfig; + private final long snapshotId; + private final @Nullable Integer limit; + + public LanceSplitPlanner( + Configuration configuration, + TablePath tablePath, + long snapshotId, + @Nullable Integer limit) { + this.lanceConfig = + LanceConfig.from( + configuration.toMap(), + Collections.emptyMap(), + tablePath.getDatabaseName(), + tablePath.getTableName()); + this.snapshotId = snapshotId; + this.limit = limit; + } + + @Override + public List plan() { + ReadOptions readOptions = LanceConfig.genReadOptionFromConfig(lanceConfig); + + List splits = new ArrayList(); + try (Dataset dataset = Dataset.open(lanceConfig.getDatasetUri(), readOptions)) { + if (snapshotId > 0) { + dataset.checkoutVersion(snapshotId); + } + + long remaining = limit == null ? -1L : Math.max(0L, limit.longValue()); + for (Fragment fragment : dataset.getFragments()) { + long fragmentRows = fragment.countRows(); + long scanLimit = -1L; + if (limit != null) { + scanLimit = Math.max(0L, Math.min(fragmentRows, remaining)); + remaining -= scanLimit; + } + + splits.add( + new LanceSplit( + fragment.getId(), + snapshotId, + fragmentRows, + scanLimit, + -1, + Collections.emptyList())); + } + } + return splits; + } +} diff --git a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/source/LanceSplitSerializer.java b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/source/LanceSplitSerializer.java new file mode 100644 index 0000000000..e3e6e3e987 --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/source/LanceSplitSerializer.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.lance.source; + +import org.apache.fluss.lake.serializer.SimpleVersionedSerializer; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** Serializer for Lance split. */ +public class LanceSplitSerializer implements SimpleVersionedSerializer { + private static final int VERSION_1 = 1; + + @Override + public int getVersion() { + return VERSION_1; + } + + @Override + public byte[] serialize(LanceSplit split) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(baos); + + out.writeInt(split.fragmentId()); + out.writeLong(split.snapshotId()); + out.writeLong(split.fragmentRows()); + out.writeLong(split.scanLimit()); + out.writeInt(split.bucket()); + + List partition = split.partition(); + out.writeInt(partition.size()); + for (String part : partition) { + out.writeUTF(part == null ? "" : part); + } + out.flush(); + return baos.toByteArray(); + } + + @Override + public LanceSplit deserialize(int version, byte[] serialized) throws IOException { + if (version != VERSION_1) { + throw new IOException("Unsupported LanceSplit serialization version: " + version); + } + + DataInputStream in = new DataInputStream(new ByteArrayInputStream(serialized)); + int fragmentId = in.readInt(); + long snapshotId = in.readLong(); + long fragmentRows = in.readLong(); + long scanLimit = in.readLong(); + int bucket = in.readInt(); + + int partitionSize = in.readInt(); + List partition; + if (partitionSize <= 0) { + partition = Collections.emptyList(); + } else { + partition = new ArrayList(partitionSize); + for (int i = 0; i < partitionSize; i++) { + partition.add(in.readUTF()); + } + } + + return new LanceSplit(fragmentId, snapshotId, fragmentRows, scanLimit, bucket, partition); + } +} diff --git a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/source/LanceRecordReaderTest.java b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/source/LanceRecordReaderTest.java new file mode 100644 index 0000000000..66de96bc29 --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/source/LanceRecordReaderTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.lance.source; + +import org.apache.fluss.lake.source.RecordReader; +import org.apache.fluss.predicate.Predicate; +import org.apache.fluss.predicate.PredicateBuilder; +import org.apache.fluss.record.LogRecord; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.types.IntType; +import org.apache.fluss.types.RowType; +import org.apache.fluss.types.StringType; +import org.apache.fluss.utils.CloseableIterator; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test class for {@link LanceRecordReader}. */ +class LanceRecordReaderTest extends LanceSourceTestBase { + + @Test + void testReadAllRows() throws Exception { + createEmptyDataset(); + long snapshotId = appendRows(Arrays.asList(row(1, "a", 0, 101L, 1001L), row(2, "b", 0, 102L, 1002L))); + snapshotId = appendRows(Arrays.asList(row(3, "c", 1, 103L, 1003L), row(4, "d", 1, 104L, 1004L))); + final long plannedSnapshotId = snapshotId; + + LanceLakeSource source = new LanceLakeSource(config, tablePath); + List splits = source.createPlanner(() -> plannedSnapshotId).plan(); + + List records = readAll(source, splits); + assertThat(records).hasSize(4); + + Set ids = new HashSet<>(); + Set names = new HashSet<>(); + Set offsets = new HashSet<>(); + for (LogRecord record : records) { + InternalRow row = record.getRow(); + ids.add(row.getInt(0)); + names.add(row.getString(1).toString()); + offsets.add(record.logOffset()); + assertThat(record.timestamp()).isGreaterThan(0L); + } + + assertThat(ids).containsExactlyInAnyOrder(1, 2, 3, 4); + assertThat(names).containsExactlyInAnyOrder("a", "b", "c", "d"); + assertThat(offsets).containsExactlyInAnyOrder(101L, 102L, 103L, 104L); + } + + @Test + void testReadWithProjectFilterAndLimit() throws Exception { + createEmptyDataset(); + long snapshotId = appendRows(Arrays.asList(row(1, "a", 0, 101L, 1001L), row(2, "b", 0, 102L, 1002L))); + snapshotId = appendRows(Arrays.asList(row(3, "c", 1, 103L, 1003L), row(4, "d", 1, 104L, 1004L))); + final long plannedSnapshotId = snapshotId; + + LanceLakeSource source = new LanceLakeSource(config, tablePath); + source.withProject(new int[][] {new int[] {1}}); + + RowType rowType = + RowType.of( + new org.apache.fluss.types.DataType[] {new IntType(), new StringType()}, + new String[] {"id", "name"}); + PredicateBuilder predicateBuilder = new PredicateBuilder(rowType); + Predicate predicate = predicateBuilder.lessOrEqual(predicateBuilder.indexOf("id"), 2); + LanceLakeSource.FilterPushDownResult pushDownResult = + source.withFilters(Collections.singletonList(predicate)); + assertThat(pushDownResult.acceptedPredicates()).hasSize(1); + assertThat(pushDownResult.remainingPredicates()).isEmpty(); + + source.withLimit(2); + + List splits = source.createPlanner(() -> plannedSnapshotId).plan(); + List records = readAll(source, splits); + + assertThat(records).hasSize(2); + List names = new ArrayList<>(); + for (LogRecord record : records) { + InternalRow row = record.getRow(); + assertThat(row.getFieldCount()).isEqualTo(1); + names.add(row.getString(0).toString()); + } + assertThat(names).containsExactlyInAnyOrder("a", "b"); + } + + private List readAll(LanceLakeSource source, List splits) + throws IOException { + List all = new ArrayList<>(); + for (LanceSplit split : splits) { + RecordReader recordReader = source.createRecordReader(() -> split); + CloseableIterator iterator = recordReader.read(); + try { + while (iterator.hasNext()) { + all.add(iterator.next()); + } + } finally { + iterator.close(); + } + } + return all; + } +} diff --git a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/source/LanceSourceTestBase.java b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/source/LanceSourceTestBase.java new file mode 100644 index 0000000000..eb26ceb574 --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/source/LanceSourceTestBase.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.lance.source; + +import org.apache.fluss.config.Configuration; +import org.apache.fluss.lake.lance.LanceConfig; +import org.apache.fluss.lake.lance.utils.LanceDatasetAdapter; +import org.apache.fluss.metadata.TablePath; + +import com.lancedb.lance.Dataset; +import com.lancedb.lance.Fragment; +import com.lancedb.lance.FragmentMetadata; +import com.lancedb.lance.ReadOptions; +import com.lancedb.lance.WriteParams; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME; +import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME; +import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME; +import static org.assertj.core.api.Assertions.assertThat; + +/** Base utilities for Lance source tests. */ +class LanceSourceTestBase { + + protected static final String DEFAULT_DB = "fluss_lance"; + protected static final String DEFAULT_TABLE = "source_test_table"; + + @TempDir protected File tempWarehouseDir; + + protected Configuration config; + protected TablePath tablePath; + protected Schema schema; + protected LanceConfig lanceConfig; + + @BeforeEach + void setupBase() { + Assumptions.assumeTrue( + isLanceJniAvailable(), + "Skip Lance source tests because Lance JNI is unavailable for current runtime."); + + this.config = new Configuration(); + this.config.setString("warehouse", tempWarehouseDir.getAbsolutePath()); + + this.tablePath = TablePath.of(DEFAULT_DB, DEFAULT_TABLE); + this.lanceConfig = + LanceConfig.from( + config.toMap(), + Collections.emptyMap(), + tablePath.getDatabaseName(), + tablePath.getTableName()); + + this.schema = + new Schema( + Arrays.asList( + new Field( + "id", + FieldType.nullable(new ArrowType.Int(32, true)), + null), + new Field( + "name", + FieldType.nullable(ArrowType.Utf8.INSTANCE), + null), + new Field( + BUCKET_COLUMN_NAME, + FieldType.nullable(new ArrowType.Int(32, true)), + null), + new Field( + OFFSET_COLUMN_NAME, + FieldType.nullable(new ArrowType.Int(64, true)), + null), + new Field( + TIMESTAMP_COLUMN_NAME, + FieldType.nullable(new ArrowType.Int(64, true)), + null))); + } + + private static boolean isLanceJniAvailable() { + try { + Class.forName("com.lancedb.lance.Dataset", true, LanceSourceTestBase.class.getClassLoader()); + return true; + } catch (Throwable t) { + return false; + } + } + + protected void createEmptyDataset() { + LanceDatasetAdapter.createDataset( + lanceConfig.getDatasetUri(), schema, new WriteParams.Builder().build()); + } + + protected long appendRows(List rows) { + WriteParams writeParams = new WriteParams.Builder().build(); + + try (RootAllocator allocator = new RootAllocator(); + VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + IntVector idVector = (IntVector) root.getVector("id"); + VarCharVector nameVector = (VarCharVector) root.getVector("name"); + IntVector bucketVector = (IntVector) root.getVector(BUCKET_COLUMN_NAME); + BigIntVector offsetVector = (BigIntVector) root.getVector(OFFSET_COLUMN_NAME); + BigIntVector tsVector = (BigIntVector) root.getVector(TIMESTAMP_COLUMN_NAME); + + for (int i = 0; i < rows.size(); i++) { + RowData row = rows.get(i); + idVector.setSafe(i, row.id); + nameVector.setSafe(i, row.name.getBytes(StandardCharsets.UTF_8)); + bucketVector.setSafe(i, row.bucket); + offsetVector.setSafe(i, row.offset); + tsVector.setSafe(i, row.timestamp); + } + root.setRowCount(rows.size()); + + List fragments = + Fragment.create(lanceConfig.getDatasetUri(), allocator, root, writeParams); + return LanceDatasetAdapter.commitAppend(lanceConfig, fragments, Collections.emptyMap()); + } + } + + protected Map currentFragmentRowCount() { + ReadOptions readOptions = LanceConfig.genReadOptionFromConfig(lanceConfig); + Map result = new HashMap<>(); + try (Dataset dataset = Dataset.open(lanceConfig.getDatasetUri(), readOptions)) { + for (Fragment fragment : dataset.getFragments()) { + result.put(fragment.getId(), fragment.countRows()); + } + } + return result; + } + + protected void assertDatasetHasAtLeastFragments(int minFragmentCount) { + ReadOptions readOptions = LanceConfig.genReadOptionFromConfig(lanceConfig); + try (Dataset dataset = Dataset.open(lanceConfig.getDatasetUri(), readOptions)) { + assertThat(dataset.getFragments().size()).isGreaterThanOrEqualTo(minFragmentCount); + } + } + + protected static RowData row(int id, String name, int bucket, long offset, long timestamp) { + return new RowData(id, name, bucket, offset, timestamp); + } + + protected static class RowData { + private final int id; + private final String name; + private final int bucket; + private final long offset; + private final long timestamp; + + private RowData(int id, String name, int bucket, long offset, long timestamp) { + this.id = id; + this.name = name; + this.bucket = bucket; + this.offset = offset; + this.timestamp = timestamp; + } + } +} diff --git a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/source/LanceSplitPlannerTest.java b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/source/LanceSplitPlannerTest.java new file mode 100644 index 0000000000..b341761120 --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/source/LanceSplitPlannerTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.lance.source; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test class for {@link LanceSplitPlanner}. */ +class LanceSplitPlannerTest extends LanceSourceTestBase { + + @Test + void testPlanSplitsWithoutLimit() throws Exception { + createEmptyDataset(); + long snapshotId = appendRows(Arrays.asList(row(1, "a", 0, 101L, 1001L), row(2, "b", 0, 102L, 1002L))); + snapshotId = appendRows(Arrays.asList(row(3, "c", 1, 103L, 1003L), row(4, "d", 1, 104L, 1004L), row(5, "e", 1, 105L, 1005L))); + + assertDatasetHasAtLeastFragments(2); + + LanceSplitPlanner planner = new LanceSplitPlanner(config, tablePath, snapshotId, null); + List splits = planner.plan(); + + Map fragmentRows = currentFragmentRowCount(); + assertThat(splits).hasSize(fragmentRows.size()); + + for (LanceSplit split : splits) { + assertThat(split.snapshotId()).isEqualTo(snapshotId); + assertThat(split.bucket()).isEqualTo(-1); + assertThat(split.partition()).isEmpty(); + assertThat(split.scanLimit()).isEqualTo(-1L); + assertThat(fragmentRows).containsKey(split.fragmentId()); + assertThat(split.fragmentRows()).isEqualTo(fragmentRows.get(split.fragmentId())); + } + } + + @Test + void testPlanSplitsWithLimit() throws Exception { + createEmptyDataset(); + long snapshotId = appendRows(Arrays.asList(row(1, "a", 0, 101L, 1001L), row(2, "b", 0, 102L, 1002L))); + snapshotId = appendRows(Arrays.asList(row(3, "c", 1, 103L, 1003L), row(4, "d", 1, 104L, 1004L), row(5, "e", 1, 105L, 1005L))); + final long plannedSnapshotId = snapshotId; + + LanceSplitPlanner planner = new LanceSplitPlanner(config, tablePath, plannedSnapshotId, 3); + List splits = planner.plan(); + + assertThat(splits).isNotEmpty(); + assertThat(splits.stream().allMatch(s -> s.snapshotId() == plannedSnapshotId)).isTrue(); + + long totalAssignedLimit = + splits.stream() + .mapToLong(s -> Math.max(0L, s.scanLimit())) + .sum(); + assertThat(totalAssignedLimit).isEqualTo(3L); + + List positiveLimitSplits = + splits.stream().filter(s -> s.scanLimit() > 0).collect(Collectors.toList()); + assertThat(positiveLimitSplits).isNotEmpty(); + for (LanceSplit split : positiveLimitSplits) { + assertThat(split.scanLimit()).isLessThanOrEqualTo(split.fragmentRows()); + } + } +} diff --git a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/source/LanceSplitSerializerTest.java b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/source/LanceSplitSerializerTest.java new file mode 100644 index 0000000000..7228975bbe --- /dev/null +++ b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/source/LanceSplitSerializerTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.lance.source; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test class for {@link LanceSplitSerializer}. */ +class LanceSplitSerializerTest { + + + @Test + void testSerializeDeserializeWithFullFields() throws IOException { + LanceSplit split = + new LanceSplit(12, 101L, 999L, 77L, 3, Arrays.asList("p0", "p1")); + + LanceSplitSerializer serializer = new LanceSplitSerializer(); + byte[] bytes = serializer.serialize(split); + LanceSplit restored = serializer.deserialize(serializer.getVersion(), bytes); + + assertThat(restored.fragmentId()).isEqualTo(12); + assertThat(restored.snapshotId()).isEqualTo(101L); + assertThat(restored.fragmentRows()).isEqualTo(999L); + assertThat(restored.scanLimit()).isEqualTo(77L); + assertThat(restored.bucket()).isEqualTo(3); + assertThat(restored.partition()).containsExactly("p0", "p1"); + } + + @Test + void testSerializeDeserializeWithEmptyPartition() throws IOException { + LanceSplit split = new LanceSplit(1, 2L, 3L, -1L, -1, Collections.emptyList()); + + LanceSplitSerializer serializer = new LanceSplitSerializer(); + byte[] bytes = serializer.serialize(split); + LanceSplit restored = serializer.deserialize(serializer.getVersion(), bytes); + + assertThat(restored.partition()).isEmpty(); + assertThat(restored.scanLimit()).isEqualTo(-1L); + } +}