From c8d171150f9d2449d479df84eadb362ccfa9d262 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Sat, 21 Mar 2026 11:57:05 +0800 Subject: [PATCH 1/2] bootstrp --- .../paimon/lookup/sort/db/SimpleLsmKvDb.java | 86 ++++++++++++ .../lookup/sort/db/SimpleLsmKvDbTest.java | 124 +++++++++++++++++ .../clustering/ClusteringCompactManager.java | 130 +++++++++++++++--- 3 files changed, 319 insertions(+), 21 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDb.java b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDb.java index 7ee095da8696..8871fcab48d2 100644 --- a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDb.java +++ b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDb.java @@ -38,6 +38,7 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -94,6 +95,7 @@ public class SimpleLsmKvDb implements Closeable { private final SortLookupStoreFactory storeFactory; private final Comparator keyComparator; private final long memTableFlushThreshold; + private final long maxSstFileSize; private final LsmCompactor compactor; /** Active MemTable: key -> value bytes (empty byte[] = tombstone). */ @@ -127,6 +129,7 @@ private SimpleLsmKvDb( this.storeFactory = storeFactory; this.keyComparator = keyComparator; this.memTableFlushThreshold = memTableFlushThreshold; + this.maxSstFileSize = maxSstFileSize; this.memTable = new TreeMap<>(keyComparator); this.memTableSize = 0; this.levels = new ArrayList<>(); @@ -225,6 +228,89 @@ public void delete(byte[] key) throws IOException { maybeFlushMemTable(); } + /** + * Bulk-load globally sorted entries directly into SST files at the deepest level, bypassing + * MemTable, flush, and compaction entirely. The database must be empty when this is called. + * + * @param sortedEntries an iterator of key-value pairs in sorted order (by the DB's key + * comparator) + */ + public void bulkLoad(Iterator> sortedEntries) throws IOException { + ensureOpen(); + if (!memTable.isEmpty() || getSstFileCount() > 0) { + throw new IllegalStateException( + "bulkLoad requires an empty database (no memTable entries and no SST files)"); + } + + int targetLevel = MAX_LEVELS - 1; + List targetLevelFiles = levels.get(targetLevel); + + SortLookupStoreWriter currentWriter = null; + File currentSstFile = null; + MemorySlice currentFileMinKey = null; + MemorySlice currentFileMaxKey = null; + long currentBatchSize = 0; + + try { + while (sortedEntries.hasNext()) { + Map.Entry entry = sortedEntries.next(); + byte[] key = entry.getKey(); + byte[] value = entry.getValue(); + + if (currentWriter == null) { + currentSstFile = newSstFile(); + currentWriter = storeFactory.createWriter(currentSstFile, null); + currentFileMinKey = MemorySlice.wrap(key); + currentBatchSize = 0; + } + + currentWriter.put(key, value); + currentFileMaxKey = MemorySlice.wrap(key); + currentBatchSize += key.length + value.length; + + if (currentBatchSize >= maxSstFileSize) { + currentWriter.close(); + targetLevelFiles.add( + new SstFileMetadata( + currentSstFile, + currentFileMinKey, + currentFileMaxKey, + 0, + targetLevel)); + currentWriter = null; + currentSstFile = null; + currentFileMinKey = null; + currentFileMaxKey = null; + } + } + + if (currentWriter != null) { + currentWriter.close(); + targetLevelFiles.add( + new SstFileMetadata( + currentSstFile, + currentFileMinKey, + currentFileMaxKey, + 0, + targetLevel)); + } + } catch (IOException | RuntimeException e) { + if (currentWriter != null) { + try { + currentWriter.close(); + } catch (IOException suppressed) { + e.addSuppressed(suppressed); + } + } + throw e; + } + + LOG.info( + "Bulk-loaded {} SST files directly to level {}", + targetLevelFiles.size(), + targetLevel); + } + // ------------------------------------------------------------------------- // Read Operations // ------------------------------------------------------------------------- diff --git a/paimon-common/src/test/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDbTest.java b/paimon-common/src/test/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDbTest.java index 52dbfd3d011d..44d7d46ff506 100644 --- a/paimon-common/src/test/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDbTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDbTest.java @@ -28,7 +28,11 @@ import java.io.File; import java.io.IOException; +import java.util.AbstractMap; +import java.util.ArrayList; import java.util.Comparator; +import java.util.List; +import java.util.Map; import static java.nio.charset.StandardCharsets.UTF_8; @@ -1321,6 +1325,126 @@ public void testGroupMergePreservesDeleteSemantics() throws IOException { } } + @Test + public void testBulkLoad() throws IOException { + try (SimpleLsmKvDb db = createDb()) { + // Prepare sorted entries + List> entries = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + String key = String.format("key-%05d", i); + String value = String.format("value-%05d", i); + entries.add( + new AbstractMap.SimpleImmutableEntry<>( + key.getBytes(UTF_8), value.getBytes(UTF_8))); + } + + db.bulkLoad(entries.iterator()); + + // All data at deepest level, no L0 files + Assertions.assertEquals(0, db.getLevelFileCount(0)); + Assertions.assertTrue(db.getLevelFileCount(SimpleLsmKvDb.MAX_LEVELS - 1) > 0); + + // All keys should be readable + for (int i = 0; i < 100; i++) { + String expected = String.format("value-%05d", i); + String actual = getString(db, String.format("key-%05d", i)); + Assertions.assertEquals(expected, actual, "Mismatch at index " + i); + } + } + } + + @Test + public void testBulkLoadMultipleSstFiles() throws IOException { + // Use a small maxSstFileSize to force multiple SST files + SimpleLsmKvDb db = + SimpleLsmKvDb.builder(new File(tempDir.toFile(), "bulk-multi-db")) + .memTableFlushThreshold(1024) + .maxSstFileSize(512) + .blockSize(128) + .level0FileNumCompactTrigger(4) + .compressOptions(new CompressOptions("none", 1)) + .build(); + + try { + List> entries = new ArrayList<>(); + for (int i = 0; i < 200; i++) { + String key = String.format("key-%05d", i); + String value = String.format("value-%05d", i); + entries.add( + new AbstractMap.SimpleImmutableEntry<>( + key.getBytes(UTF_8), value.getBytes(UTF_8))); + } + + db.bulkLoad(entries.iterator()); + + // Multiple SST files should be created at the deepest level + int deepestLevelFiles = db.getLevelFileCount(SimpleLsmKvDb.MAX_LEVELS - 1); + Assertions.assertTrue( + deepestLevelFiles > 1, + "Expected multiple SST files at deepest level, got " + deepestLevelFiles); + Assertions.assertEquals(0, db.getLevelFileCount(0)); + + // All keys should be readable + for (int i = 0; i < 200; i++) { + String expected = String.format("value-%05d", i); + String actual = getString(db, String.format("key-%05d", i)); + Assertions.assertEquals(expected, actual, "Mismatch at index " + i); + } + } finally { + db.close(); + } + } + + @Test + public void testBulkLoadEmptyIterator() throws IOException { + try (SimpleLsmKvDb db = createDb()) { + List> empty = new ArrayList<>(); + db.bulkLoad(empty.iterator()); + + Assertions.assertEquals(0, db.getSstFileCount()); + Assertions.assertNull(getString(db, "any-key")); + } + } + + @Test + public void testBulkLoadThenPutAndGet() throws IOException { + try (SimpleLsmKvDb db = createDb()) { + // Bulk load initial data + List> entries = new ArrayList<>(); + for (int i = 0; i < 50; i++) { + String key = String.format("key-%05d", i); + String value = String.format("value-%05d", i); + entries.add( + new AbstractMap.SimpleImmutableEntry<>( + key.getBytes(UTF_8), value.getBytes(UTF_8))); + } + db.bulkLoad(entries.iterator()); + + // Now use normal put to add/overwrite data + putString(db, "key-00000", "overwritten"); + putString(db, "key-99999", "new-key"); + + Assertions.assertEquals("overwritten", getString(db, "key-00000")); + Assertions.assertEquals("new-key", getString(db, "key-99999")); + Assertions.assertEquals("value-00025", getString(db, String.format("key-%05d", 25))); + } + } + + @Test + public void testBulkLoadFailsOnNonEmptyDb() throws IOException { + try (SimpleLsmKvDb db = createDb()) { + putString(db, "existing", "data"); + + List> entries = new ArrayList<>(); + entries.add( + new AbstractMap.SimpleImmutableEntry<>( + "key".getBytes(UTF_8), "value".getBytes(UTF_8))); + + Assertions.assertThrows( + IllegalStateException.class, () -> db.bulkLoad(entries.iterator())); + } + } + private static void putString(SimpleLsmKvDb db, String key, String value) throws IOException { db.put(key.getBytes(UTF_8), value.getBytes(UTF_8)); } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java index 8c8806daeb15..72cbc8078459 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java @@ -30,6 +30,7 @@ import org.apache.paimon.compression.BlockCompressionFactory; import org.apache.paimon.compression.CompressOptions; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.serializer.BinaryRowSerializer; import org.apache.paimon.data.serializer.InternalRowSerializer; @@ -54,7 +55,9 @@ import org.apache.paimon.reader.RecordReader; import org.apache.paimon.reader.RecordReader.RecordIterator; import org.apache.paimon.sort.BinaryExternalSortBuffer; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowType; +import org.apache.paimon.types.VarBinaryType; import org.apache.paimon.utils.CloseableIterator; import org.apache.paimon.utils.KeyValueWithLevelNoReusingSerializer; import org.apache.paimon.utils.MutableObjectIterator; @@ -65,10 +68,13 @@ import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; +import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.PriorityQueue; import java.util.concurrent.ExecutionException; @@ -177,31 +183,113 @@ public ClusteringCompactManager( } private void bootstrapKeyIndex(List restoreFiles) { + // Build a combined RowType: key fields + one VARBINARY field for the encoded value + List combinedFields = new ArrayList<>(); + List keyFields = keyType.getFields(); + for (int i = 0; i < keyFields.size(); i++) { + DataField kf = keyFields.get(i); + combinedFields.add(new DataField(i, kf.name(), kf.type())); + } + int valueFieldIndex = keyFields.size(); + combinedFields.add( + new DataField( + valueFieldIndex, "_value_bytes", new VarBinaryType(Integer.MAX_VALUE))); + RowType combinedType = new RowType(combinedFields); + + int[] sortFields = IntStream.range(0, keyType.getFieldCount()).toArray(); + BinaryExternalSortBuffer sortBuffer = + BinaryExternalSortBuffer.create( + ioManager, + combinedType, + sortFields, + sortSpillBufferSize, + pageSize, + maxNumFileHandles, + compression, + MemorySize.MAX_VALUE, + false); + RowCompactedSerializer keySerializer = new RowCompactedSerializer(keyType); - for (DataFileMeta file : restoreFiles) { - if (file.level() == 0) { - continue; - } - int fileId = fileLevels.getFileIdByName(file.fileName()); - // Read with DV (auto-skips deleted rows). Use FileRecordIterator.returnedPosition() - // to get correct physical positions even after DV filtering. - try (RecordReader reader = keyReaderFactory.createRecordReader(file)) { - FileRecordIterator batch; - while ((batch = (FileRecordIterator) reader.readBatch()) != null) { - KeyValue kv; - while ((kv = batch.next()) != null) { - int position = (int) batch.returnedPosition(); - byte[] keyBytes = keySerializer.serializeToBytes(kv.key()); - ByteArrayOutputStream value = new ByteArrayOutputStream(8); - encodeInt(value, fileId); - encodeInt(value, position); - kvDb.put(keyBytes, value.toByteArray()); + InternalRow.FieldGetter[] keyFieldGetters = + new InternalRow.FieldGetter[keyType.getFieldCount()]; + for (int i = 0; i < keyType.getFieldCount(); i++) { + keyFieldGetters[i] = InternalRow.createFieldGetter(keyType.getTypeAt(i), i); + } + try { + // First pass: read all keys and write to sort buffer + for (DataFileMeta file : restoreFiles) { + if (file.level() == 0) { + continue; + } + int fileId = fileLevels.getFileIdByName(file.fileName()); + try (RecordReader reader = keyReaderFactory.createRecordReader(file)) { + FileRecordIterator batch; + while ((batch = (FileRecordIterator) reader.readBatch()) != null) { + KeyValue kv; + while ((kv = batch.next()) != null) { + int position = (int) batch.returnedPosition(); + ByteArrayOutputStream valueOut = new ByteArrayOutputStream(8); + encodeInt(valueOut, fileId); + encodeInt(valueOut, position); + byte[] valueBytes = valueOut.toByteArray(); + + GenericRow combinedRow = new GenericRow(combinedType.getFieldCount()); + for (int i = 0; i < keyType.getFieldCount(); i++) { + combinedRow.setField( + i, keyFieldGetters[i].getFieldOrNull(kv.key())); + } + combinedRow.setField(valueFieldIndex, valueBytes); + sortBuffer.write(combinedRow); + } + batch.releaseBatch(); } - batch.releaseBatch(); } - } catch (Exception e) { - throw new RuntimeException(e); } + + // Second pass: read sorted output and bulk-load into kvDb + MutableObjectIterator sortedIterator = sortBuffer.sortedIterator(); + BinaryRow binaryRow = new BinaryRow(combinedType.getFieldCount()); + InternalRow.FieldGetter valueGetter = + InternalRow.createFieldGetter( + new VarBinaryType(Integer.MAX_VALUE), valueFieldIndex); + + Iterator> entryIterator = + new Iterator>() { + private BinaryRow current = binaryRow; + private boolean hasNext; + + { + advance(); + } + + private void advance() { + try { + current = sortedIterator.next(current); + hasNext = current != null; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean hasNext() { + return hasNext; + } + + @Override + public Map.Entry next() { + byte[] key = keySerializer.serializeToBytes(current); + byte[] value = (byte[]) valueGetter.getFieldOrNull(current); + advance(); + return new AbstractMap.SimpleImmutableEntry<>(key, value); + } + }; + + kvDb.bulkLoad(entryIterator); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + sortBuffer.clear(); } } From 654901212fb182b7fa3de286e300725224f27b83 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Sat, 21 Mar 2026 12:33:41 +0800 Subject: [PATCH 2/2] [core] Add bulk load for bootstraping in ClusteringCompactManager --- .../clustering/ClusteringCompactManager.java | 737 ++---------------- .../clustering/ClusteringFileRewriter.java | 457 +++++++++++ .../clustering/ClusteringKeyIndex.java | 317 ++++++++ .../paimon/separated/ClusteringTableTest.java | 50 ++ 4 files changed, 880 insertions(+), 681 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java index 72cbc8078459..9e925d0764a4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java @@ -27,63 +27,29 @@ import org.apache.paimon.compact.CompactFutureManager; import org.apache.paimon.compact.CompactResult; import org.apache.paimon.compact.CompactTask; -import org.apache.paimon.compression.BlockCompressionFactory; import org.apache.paimon.compression.CompressOptions; -import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.data.GenericRow; -import org.apache.paimon.data.InternalRow; -import org.apache.paimon.data.serializer.BinaryRowSerializer; -import org.apache.paimon.data.serializer.InternalRowSerializer; import org.apache.paimon.data.serializer.RowCompactedSerializer; import org.apache.paimon.deletionvectors.BucketedDvMaintainer; -import org.apache.paimon.disk.ChannelReaderInputView; -import org.apache.paimon.disk.ChannelReaderInputViewIterator; -import org.apache.paimon.disk.ChannelWithMeta; -import org.apache.paimon.disk.ChannelWriterOutputView; -import org.apache.paimon.disk.FileChannelUtil; -import org.apache.paimon.disk.FileIOChannel; import org.apache.paimon.disk.IOManager; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.KeyValueFileReaderFactory; import org.apache.paimon.io.KeyValueFileWriterFactory; -import org.apache.paimon.io.RollingFileWriter; import org.apache.paimon.io.cache.CacheManager; import org.apache.paimon.lookup.sort.db.SimpleLsmKvDb; import org.apache.paimon.operation.metrics.CompactionMetrics; -import org.apache.paimon.options.MemorySize; -import org.apache.paimon.reader.FileRecordIterator; -import org.apache.paimon.reader.RecordReader; -import org.apache.paimon.reader.RecordReader.RecordIterator; -import org.apache.paimon.sort.BinaryExternalSortBuffer; -import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowType; -import org.apache.paimon.types.VarBinaryType; -import org.apache.paimon.utils.CloseableIterator; -import org.apache.paimon.utils.KeyValueWithLevelNoReusingSerializer; -import org.apache.paimon.utils.MutableObjectIterator; import javax.annotation.Nullable; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; -import java.util.AbstractMap; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Comparator; -import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Optional; -import java.util.PriorityQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.stream.IntStream; import static java.util.Collections.singletonList; -import static org.apache.paimon.utils.VarLengthIntUtils.decodeInt; -import static org.apache.paimon.utils.VarLengthIntUtils.encodeInt; /** * Key Value clustering compact manager for {@link KeyValueFileStore}. @@ -102,27 +68,14 @@ public class ClusteringCompactManager extends CompactFutureManager { private final RowType keyType; private final RowType valueType; - private final long sortSpillBufferSize; - private final int pageSize; - private final int maxNumFileHandles; - private final int spillThreshold; - private final CompressOptions compression; - private final int[] clusteringColumns; - private final RecordComparator clusteringComparatorAlone; - private final RecordComparator clusteringComparatorInValue; - private final IOManager ioManager; - private final KeyValueFileReaderFactory keyReaderFactory; - private final KeyValueFileReaderFactory valueReaderFactory; - private final KeyValueFileWriterFactory writerFactory; private final ExecutorService executor; private final BucketedDvMaintainer dvMaintainer; - private final SimpleLsmKvDb kvDb; private final boolean lazyGenDeletionFile; - private final boolean firstRow; @Nullable private final CompactionMetrics.Reporter metricsReporter; private final ClusteringFiles fileLevels; - private final long targetFileSize; + private final ClusteringKeyIndex keyIndex; + private final ClusteringFileRewriter fileRewriter; public ClusteringCompactManager( RowType keyType, @@ -145,160 +98,64 @@ public ClusteringCompactManager( CompressOptions compression, boolean firstRow, @Nullable CompactionMetrics.Reporter metricsReporter) { - this.targetFileSize = targetFileSize; this.keyType = keyType; this.valueType = valueType; - this.sortSpillBufferSize = sortSpillBufferSize; - this.pageSize = pageSize; - this.maxNumFileHandles = maxNumFileHandles; - this.spillThreshold = spillThreshold; - this.compression = compression; - this.firstRow = firstRow; - this.clusteringColumns = valueType.projectIndexes(clusteringColumns); - this.clusteringComparatorAlone = - CodeGenUtils.newRecordComparator( - valueType.project(clusteringColumns).getFieldTypes(), - IntStream.range(0, clusteringColumns.size()).toArray(), - true); - this.clusteringComparatorInValue = - CodeGenUtils.newRecordComparator( - valueType.getFieldTypes(), this.clusteringColumns, true); - this.ioManager = ioManager; - this.keyReaderFactory = keyReaderFactory; - this.valueReaderFactory = valueReaderFactory; - this.writerFactory = writerFactory; this.executor = executor; this.dvMaintainer = dvMaintainer; this.lazyGenDeletionFile = lazyGenDeletionFile; this.metricsReporter = metricsReporter; + this.fileLevels = new ClusteringFiles(); restoreFiles.forEach(this::addNewFile); - this.kvDb = + int[] clusteringColumnIndexes = valueType.projectIndexes(clusteringColumns); + RecordComparator clusteringComparatorAlone = + CodeGenUtils.newRecordComparator( + valueType.project(clusteringColumns).getFieldTypes(), + IntStream.range(0, clusteringColumns.size()).toArray(), + true); + RecordComparator clusteringComparatorInValue = + CodeGenUtils.newRecordComparator( + valueType.getFieldTypes(), clusteringColumnIndexes, true); + + SimpleLsmKvDb kvDb = SimpleLsmKvDb.builder(new File(ioManager.pickRandomTempDir())) .cacheManager(cacheManager) .keyComparator(new RowCompactedSerializer(keyType).createSliceComparator()) .build(); - bootstrapKeyIndex(restoreFiles); - } - - private void bootstrapKeyIndex(List restoreFiles) { - // Build a combined RowType: key fields + one VARBINARY field for the encoded value - List combinedFields = new ArrayList<>(); - List keyFields = keyType.getFields(); - for (int i = 0; i < keyFields.size(); i++) { - DataField kf = keyFields.get(i); - combinedFields.add(new DataField(i, kf.name(), kf.type())); - } - int valueFieldIndex = keyFields.size(); - combinedFields.add( - new DataField( - valueFieldIndex, "_value_bytes", new VarBinaryType(Integer.MAX_VALUE))); - RowType combinedType = new RowType(combinedFields); - int[] sortFields = IntStream.range(0, keyType.getFieldCount()).toArray(); - BinaryExternalSortBuffer sortBuffer = - BinaryExternalSortBuffer.create( + this.keyIndex = + new ClusteringKeyIndex( + keyType, ioManager, - combinedType, - sortFields, + keyReaderFactory, + dvMaintainer, + kvDb, + fileLevels, + firstRow, sortSpillBufferSize, pageSize, maxNumFileHandles, - compression, - MemorySize.MAX_VALUE, - false); - - RowCompactedSerializer keySerializer = new RowCompactedSerializer(keyType); - InternalRow.FieldGetter[] keyFieldGetters = - new InternalRow.FieldGetter[keyType.getFieldCount()]; - for (int i = 0; i < keyType.getFieldCount(); i++) { - keyFieldGetters[i] = InternalRow.createFieldGetter(keyType.getTypeAt(i), i); - } - try { - // First pass: read all keys and write to sort buffer - for (DataFileMeta file : restoreFiles) { - if (file.level() == 0) { - continue; - } - int fileId = fileLevels.getFileIdByName(file.fileName()); - try (RecordReader reader = keyReaderFactory.createRecordReader(file)) { - FileRecordIterator batch; - while ((batch = (FileRecordIterator) reader.readBatch()) != null) { - KeyValue kv; - while ((kv = batch.next()) != null) { - int position = (int) batch.returnedPosition(); - ByteArrayOutputStream valueOut = new ByteArrayOutputStream(8); - encodeInt(valueOut, fileId); - encodeInt(valueOut, position); - byte[] valueBytes = valueOut.toByteArray(); - - GenericRow combinedRow = new GenericRow(combinedType.getFieldCount()); - for (int i = 0; i < keyType.getFieldCount(); i++) { - combinedRow.setField( - i, keyFieldGetters[i].getFieldOrNull(kv.key())); - } - combinedRow.setField(valueFieldIndex, valueBytes); - sortBuffer.write(combinedRow); - } - batch.releaseBatch(); - } - } - } - - // Second pass: read sorted output and bulk-load into kvDb - MutableObjectIterator sortedIterator = sortBuffer.sortedIterator(); - BinaryRow binaryRow = new BinaryRow(combinedType.getFieldCount()); - InternalRow.FieldGetter valueGetter = - InternalRow.createFieldGetter( - new VarBinaryType(Integer.MAX_VALUE), valueFieldIndex); - - Iterator> entryIterator = - new Iterator>() { - private BinaryRow current = binaryRow; - private boolean hasNext; - - { - advance(); - } - - private void advance() { - try { - current = sortedIterator.next(current); - hasNext = current != null; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public boolean hasNext() { - return hasNext; - } - - @Override - public Map.Entry next() { - byte[] key = keySerializer.serializeToBytes(current); - byte[] value = (byte[]) valueGetter.getFieldOrNull(current); - advance(); - return new AbstractMap.SimpleImmutableEntry<>(key, value); - } - }; - - kvDb.bulkLoad(entryIterator); - } catch (Exception e) { - throw new RuntimeException(e); - } finally { - sortBuffer.clear(); - } - } - - private CloseableIterator readKeyIterator(DataFileMeta file) throws IOException { - //noinspection resource - return keyReaderFactory - .createRecordReader(file) - .transform(KeyValue::key) - .toCloseableIterator(); + compression); + keyIndex.bootstrap(restoreFiles); + + this.fileRewriter = + new ClusteringFileRewriter( + keyType, + valueType, + clusteringColumnIndexes, + clusteringComparatorAlone, + clusteringComparatorInValue, + ioManager, + valueReaderFactory, + writerFactory, + fileLevels, + targetFileSize, + sortSpillBufferSize, + pageSize, + maxNumFileHandles, + spillThreshold, + compression); } @Override @@ -334,7 +191,6 @@ protected CompactResult doCompact() throws Exception { } private CompactResult compact(boolean fullCompaction) throws Exception { - RowCompactedSerializer keySerializer = new RowCompactedSerializer(keyType); KeyValueSerializer kvSerializer = new KeyValueSerializer(keyType, valueType); RowType kvSchemaType = KeyValue.schema(keyType, valueType); @@ -346,25 +202,32 @@ private CompactResult compact(boolean fullCompaction) throws Exception { List existingSortedFiles = fileLevels.sortedFiles(); for (DataFileMeta file : unsortedFiles) { List sortedFiles = - sortAndRewriteFiles(singletonList(file), kvSerializer, kvSchemaType); - updateKeyIndex(keySerializer, file, sortedFiles); + fileRewriter.sortAndRewriteFiles( + singletonList(file), kvSerializer, kvSchemaType); + keyIndex.updateIndex(file, sortedFiles); result.before().add(file); result.after().addAll(sortedFiles); } // Phase 2: Universal Compaction on sorted files that existed before Phase 1. - // Files produced by Phase 1 are excluded to avoid the same file appearing in both - // result.before() and result.after(). List> mergeGroups; if (fullCompaction) { mergeGroups = singletonList(existingSortedFiles); } else { - mergeGroups = pickMergeCandidates(existingSortedFiles); + mergeGroups = fileRewriter.pickMergeCandidates(existingSortedFiles); } for (List mergeGroup : mergeGroups) { if (mergeGroup.size() >= 2) { - List mergedFiles = mergeAndRewriteFiles(mergeGroup, keySerializer); + // Delete key index entries before merge + for (DataFileMeta file : mergeGroup) { + keyIndex.deleteIndex(file); + } + List mergedFiles = fileRewriter.mergeAndRewriteFiles(mergeGroup); + // Rebuild key index for new files + for (DataFileMeta newFile : mergedFiles) { + keyIndex.rebuildIndex(newFile); + } result.before().addAll(mergeGroup); result.after().addAll(mergedFiles); } @@ -378,494 +241,6 @@ private CompactResult compact(boolean fullCompaction) throws Exception { return result; } - /** - * Pick merge candidate groups based on clustering column range overlap and file sizes. - * - *
    - *
  1. Group into sections: Files are sorted by minKey and grouped into sections based - * on clustering column key range overlap. Overlapping files belong to the same section. - *
  2. Merge adjacent sections: Sections that have overlapping files (size >= 2) or - * are small (total size < targetFileSize/2) are accumulated together. Large - * single-file sections act as barriers, flushing accumulated files into a merge group. - *
- * - * @param sortedFiles all sorted files - * @return list of merge groups; each group contains files to merge together - */ - private List> pickMergeCandidates(List sortedFiles) { - if (sortedFiles.size() < 2) { - return java.util.Collections.emptyList(); - } - - // Step 1: Group files into sections based on clustering column range overlap. - List> sections = groupIntoSections(sortedFiles); - - // Step 2: Merge adjacent sections when beneficial to reduce small files. - // A section should be merged if it has overlapping files (size >= 2) or is small. - long smallSectionThreshold = targetFileSize / 2; - List> mergeGroups = new ArrayList<>(); - List pending = new ArrayList<>(); - - for (List section : sections) { - boolean needsMerge = section.size() >= 2; - boolean isSmall = sectionSize(section) < smallSectionThreshold; - - if (needsMerge || isSmall) { - // This section should be merged, accumulate it - pending.addAll(section); - } else { - // This section is a single large file, flush pending if any - if (pending.size() >= 2) { - mergeGroups.add(new ArrayList<>(pending)); - } - pending.clear(); - } - } - - // Flush remaining pending files - if (pending.size() >= 2) { - mergeGroups.add(pending); - } - - return mergeGroups; - } - - private long sectionSize(List section) { - long total = 0; - for (DataFileMeta file : section) { - total += file.fileSize(); - } - return total; - } - - /** - * Group files into sections based on clustering column key range overlap. Files are first - * sorted by minKey, then adjacent files with overlapping ranges are grouped into the same - * section. - * - * @param files input files - * @return list of sections, each section contains overlapping files - */ - private List> groupIntoSections(List files) { - // Sort files by minKey to properly detect overlapping ranges - List sorted = new ArrayList<>(files); - sorted.sort((a, b) -> clusteringComparatorAlone.compare(a.minKey(), b.minKey())); - - List> sections = new ArrayList<>(); - List currentSection = new ArrayList<>(); - currentSection.add(sorted.get(0)); - BinaryRow currentMax = sorted.get(0).maxKey(); - - for (int i = 1; i < sorted.size(); i++) { - DataFileMeta file = sorted.get(i); - if (clusteringComparatorAlone.compare(currentMax, file.minKey()) >= 0) { - // Overlaps with current section - currentSection.add(file); - if (clusteringComparatorAlone.compare(file.maxKey(), currentMax) > 0) { - currentMax = file.maxKey(); - } - } else { - sections.add(currentSection); - currentSection = new ArrayList<>(); - currentSection.add(file); - currentMax = file.maxKey(); - } - } - sections.add(currentSection); - return sections; - } - - /** - * Update the key index for a single original file replaced by new sorted files. Marks old key - * positions in deletion vectors and registers new positions. - */ - private void updateKeyIndex( - RowCompactedSerializer keySerializer, - DataFileMeta originalFile, - List newSortedFiles) - throws Exception { - updateKeyIndex(keySerializer, singletonList(originalFile), newSortedFiles); - } - - /** - * Update the key index for multiple original files replaced by new sorted files. - * - *

For DEDUPLICATE mode: mark the old position in deletion vectors, keep the new position. - * - *

For FIRST_ROW mode: if key exists, mark the new position in deletion vectors (keep the - * first/old one); if key is new, store the new position. - */ - private void updateKeyIndex( - RowCompactedSerializer keySerializer, - List originalFiles, - List newSortedFiles) - throws Exception { - // Collect file names of original files to avoid self-deletion marking - java.util.Set originalFileNames = new java.util.HashSet<>(); - for (DataFileMeta file : originalFiles) { - originalFileNames.add(file.fileName()); - } - - for (DataFileMeta sortedFile : newSortedFiles) { - int fileId = fileLevels.getFileIdByName(sortedFile.fileName()); - int position = 0; - try (CloseableIterator iterator = readKeyIterator(sortedFile)) { - while (iterator.hasNext()) { - byte[] key = keySerializer.serializeToBytes(iterator.next()); - byte[] oldValue = kvDb.get(key); - if (oldValue != null) { - ByteArrayInputStream valueIn = new ByteArrayInputStream(oldValue); - int oldFileId = decodeInt(valueIn); - int oldPosition = decodeInt(valueIn); - DataFileMeta oldFile = fileLevels.getFileById(oldFileId); - if (oldFile != null && !originalFileNames.contains(oldFile.fileName())) { - if (firstRow) { - // First-row mode: keep the old (first) record, delete the new one - dvMaintainer.notifyNewDeletion(sortedFile.fileName(), position); - position++; - continue; - } else { - // Deduplicate mode: keep the new record, delete the old one - dvMaintainer.notifyNewDeletion(oldFile.fileName(), oldPosition); - } - } - } - ByteArrayOutputStream value = new ByteArrayOutputStream(8); - encodeInt(value, fileId); - encodeInt(value, position); - kvDb.put(key, value.toByteArray()); - position++; - } - } - } - } - - /** - * Sort and rewrite one or more unsorted files by clustering columns. Reads all KeyValue records - * from the input files, sorts them using an external sort buffer, and writes to new level-1 - * files. - */ - private List sortAndRewriteFiles( - List inputFiles, KeyValueSerializer kvSerializer, RowType kvSchemaType) - throws Exception { - int[] sortFieldsInKeyValue = - Arrays.stream(clusteringColumns) - .map(i -> i + keyType.getFieldCount() + 2) - .toArray(); - BinaryExternalSortBuffer sortBuffer = - BinaryExternalSortBuffer.create( - ioManager, - kvSchemaType, - sortFieldsInKeyValue, - sortSpillBufferSize, - pageSize, - maxNumFileHandles, - compression, - MemorySize.MAX_VALUE, - false); - - for (DataFileMeta file : inputFiles) { - try (RecordReader reader = valueReaderFactory.createRecordReader(file)) { - try (CloseableIterator iterator = reader.toCloseableIterator()) { - while (iterator.hasNext()) { - KeyValue kv = iterator.next(); - InternalRow serializedRow = kvSerializer.toRow(kv); - sortBuffer.write(serializedRow); - } - } - } - } - - RollingFileWriter writer = - writerFactory.createRollingClusteringFileWriter(); - try { - MutableObjectIterator sortedIterator = sortBuffer.sortedIterator(); - BinaryRow binaryRow = new BinaryRow(kvSchemaType.getFieldCount()); - while ((binaryRow = sortedIterator.next(binaryRow)) != null) { - KeyValue kv = kvSerializer.fromRow(binaryRow); - writer.write( - kv.copy( - new InternalRowSerializer(keyType), - new InternalRowSerializer(valueType))); - } - } finally { - sortBuffer.clear(); - writer.close(); - } - - List newFiles = writer.result(); - for (DataFileMeta file : inputFiles) { - fileLevels.removeFile(file); - } - for (DataFileMeta newFile : newFiles) { - fileLevels.addNewFile(newFile); - } - - return newFiles; - } - - /** - * Merge sorted files using min-heap based multi-way merge. Since all input files are already - * sorted by clustering columns, we use a PriorityQueue to merge them efficiently without - * re-sorting. Key index entries are deleted during reading and rebuilt after writing. - * - *

When the number of input files exceeds spillThreshold, smaller files are spilled to - * row-based temp files first. Row-based iterators consume much less memory than columnar file - * readers. - */ - private List mergeAndRewriteFiles( - List inputFiles, RowCompactedSerializer keySerializer) throws Exception { - InternalRowSerializer keyRowSerializer = new InternalRowSerializer(keyType); - InternalRowSerializer valueRowSerializer = new InternalRowSerializer(valueType); - - // Delete key index entries for all input files before reading - for (DataFileMeta file : inputFiles) { - deleteKeyIndexForFile(keySerializer, file); - } - - // Determine which files to spill to row-based temp files - List filesToSpill = new ArrayList<>(); - List filesToKeep = new ArrayList<>(); - if (inputFiles.size() > spillThreshold) { - List sortedBySize = new ArrayList<>(inputFiles); - sortedBySize.sort(Comparator.comparingLong(DataFileMeta::fileSize)); - int spillCount = inputFiles.size() - spillThreshold; - filesToSpill = new ArrayList<>(sortedBySize.subList(0, spillCount)); - filesToKeep = new ArrayList<>(sortedBySize.subList(spillCount, sortedBySize.size())); - } else { - filesToKeep = inputFiles; - } - - // Spill smaller files to row-based temp files - List spilledChannels = new ArrayList<>(); - for (DataFileMeta file : filesToSpill) { - spilledChannels.add(spillToRowBasedFile(file)); - } - - // Open iterators and initialize the min-heap - List> openIterators = new ArrayList<>(); - PriorityQueue minHeap = - new PriorityQueue<>( - (a, b) -> - clusteringComparatorInValue.compare( - a.currentKeyValue.value(), b.currentKeyValue.value())); - - try { - // Add iterators for columnar files (kept in memory) - for (DataFileMeta file : filesToKeep) { - @SuppressWarnings("resource") - CloseableIterator iterator = - valueReaderFactory.createRecordReader(file).toCloseableIterator(); - openIterators.add(iterator); - if (iterator.hasNext()) { - KeyValue firstKv = iterator.next().copy(keyRowSerializer, valueRowSerializer); - minHeap.add(new MergeEntry(firstKv, iterator)); - } - } - - // Add iterators for row-based spilled files (low memory consumption) - for (SpilledChannel spilled : spilledChannels) { - CloseableIterator iterator = spilled.createIterator(); - openIterators.add(iterator); - if (iterator.hasNext()) { - KeyValue firstKv = iterator.next().copy(keyRowSerializer, valueRowSerializer); - minHeap.add(new MergeEntry(firstKv, iterator)); - } - } - - // Multi-way merge: write records in sorted order - RollingFileWriter writer = - writerFactory.createRollingClusteringFileWriter(); - try { - while (!minHeap.isEmpty()) { - MergeEntry entry = minHeap.poll(); - writer.write(entry.currentKeyValue); - if (entry.iterator.hasNext()) { - entry.currentKeyValue = - entry.iterator.next().copy(keyRowSerializer, valueRowSerializer); - minHeap.add(entry); - } - } - } finally { - writer.close(); - } - - // Remove original files and register new sorted files - List newFiles = writer.result(); - for (DataFileMeta file : inputFiles) { - fileLevels.removeFile(file); - } - for (DataFileMeta newFile : newFiles) { - fileLevels.addNewFile(newFile); - } - - // Rebuild key index for the new files - for (DataFileMeta newFile : newFiles) { - int fileId = fileLevels.getFileIdByName(newFile.fileName()); - int position = 0; - try (CloseableIterator keyIterator = readKeyIterator(newFile)) { - while (keyIterator.hasNext()) { - byte[] key = keySerializer.serializeToBytes(keyIterator.next()); - ByteArrayOutputStream value = new ByteArrayOutputStream(8); - encodeInt(value, fileId); - encodeInt(value, position); - kvDb.put(key, value.toByteArray()); - position++; - } - } - } - - return newFiles; - } finally { - for (CloseableIterator iterator : openIterators) { - try { - iterator.close(); - } catch (Exception ignored) { - } - } - } - } - - /** - * Spill a columnar DataFileMeta to a row-based temp file. Row-based files consume much less - * memory when reading compared to columnar files. - */ - private SpilledChannel spillToRowBasedFile(DataFileMeta file) throws Exception { - FileIOChannel.ID channel = ioManager.createChannel(); - KeyValueWithLevelNoReusingSerializer serializer = - new KeyValueWithLevelNoReusingSerializer(keyType, valueType); - BlockCompressionFactory compressFactory = BlockCompressionFactory.create(compression); - int compressBlock = (int) MemorySize.parse("64 kb").getBytes(); - - ChannelWithMeta channelWithMeta; - ChannelWriterOutputView out = - FileChannelUtil.createOutputView( - ioManager, channel, compressFactory, compressBlock); - try (RecordReader reader = valueReaderFactory.createRecordReader(file)) { - RecordIterator batch; - KeyValue record; - while ((batch = reader.readBatch()) != null) { - while ((record = batch.next()) != null) { - serializer.serialize(record, out); - } - batch.releaseBatch(); - } - } finally { - out.close(); - channelWithMeta = - new ChannelWithMeta(channel, out.getBlockCount(), out.getWriteBytes()); - } - - return new SpilledChannel(channelWithMeta, compressFactory, compressBlock, serializer); - } - - /** Holds metadata for a spilled row-based temp file. */ - private class SpilledChannel { - private final ChannelWithMeta channel; - private final BlockCompressionFactory compressFactory; - private final int compressBlock; - private final KeyValueWithLevelNoReusingSerializer serializer; - - SpilledChannel( - ChannelWithMeta channel, - BlockCompressionFactory compressFactory, - int compressBlock, - KeyValueWithLevelNoReusingSerializer serializer) { - this.channel = channel; - this.compressFactory = compressFactory; - this.compressBlock = compressBlock; - this.serializer = serializer; - } - - CloseableIterator createIterator() throws IOException { - ChannelReaderInputView view = - FileChannelUtil.createInputView( - ioManager, channel, new ArrayList<>(), compressFactory, compressBlock); - BinaryRowSerializer rowSerializer = new BinaryRowSerializer(serializer.numFields()); - ChannelReaderInputViewIterator iterator = - new ChannelReaderInputViewIterator(view, null, rowSerializer); - return new SpilledChannelIterator(view, iterator, serializer); - } - } - - /** Iterator that reads KeyValue records from a spilled row-based temp file. */ - private static class SpilledChannelIterator implements CloseableIterator { - private final ChannelReaderInputView view; - private final ChannelReaderInputViewIterator iterator; - private final KeyValueWithLevelNoReusingSerializer serializer; - private KeyValue next; - - SpilledChannelIterator( - ChannelReaderInputView view, - ChannelReaderInputViewIterator iterator, - KeyValueWithLevelNoReusingSerializer serializer) { - this.view = view; - this.iterator = iterator; - this.serializer = serializer; - } - - @Override - public boolean hasNext() { - if (next != null) { - return true; - } - try { - BinaryRow row = iterator.next(); - if (row == null) { - return false; - } - next = serializer.fromRow(row); - return true; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public KeyValue next() { - if (!hasNext()) { - throw new java.util.NoSuchElementException(); - } - KeyValue result = next; - next = null; - return result; - } - - @Override - public void close() throws Exception { - view.getChannel().closeAndDelete(); - } - } - - /** Delete key index entries for the given file from kvDb (only if they still point to it). */ - private void deleteKeyIndexForFile(RowCompactedSerializer keySerializer, DataFileMeta file) - throws Exception { - int fileId = fileLevels.getFileIdByName(file.fileName()); - try (CloseableIterator iterator = readKeyIterator(file)) { - while (iterator.hasNext()) { - byte[] key = keySerializer.serializeToBytes(iterator.next()); - byte[] value = kvDb.get(key); - if (value != null) { - int storedFileId = decodeInt(new ByteArrayInputStream(value)); - if (storedFileId == fileId) { - kvDb.delete(key); - } - } - } - } - } - - /** Entry in the min-heap for multi-way merge, holding the current KeyValue and its iterator. */ - private static class MergeEntry { - KeyValue currentKeyValue; - final CloseableIterator iterator; - - MergeEntry(KeyValue currentKeyValue, CloseableIterator iterator) { - this.currentKeyValue = currentKeyValue; - this.iterator = iterator; - } - } - @Override public Optional getCompactionResult(boolean blocking) throws ExecutionException, InterruptedException { @@ -879,6 +254,6 @@ public boolean compactNotCompleted() { @Override public void close() throws IOException { - kvDb.close(); + keyIndex.close(); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java new file mode 100644 index 000000000000..ece48137645d --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java @@ -0,0 +1,457 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.mergetree.compact.clustering; + +import org.apache.paimon.KeyValue; +import org.apache.paimon.KeyValueSerializer; +import org.apache.paimon.codegen.RecordComparator; +import org.apache.paimon.compression.BlockCompressionFactory; +import org.apache.paimon.compression.CompressOptions; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.serializer.BinaryRowSerializer; +import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.disk.ChannelReaderInputView; +import org.apache.paimon.disk.ChannelReaderInputViewIterator; +import org.apache.paimon.disk.ChannelWithMeta; +import org.apache.paimon.disk.ChannelWriterOutputView; +import org.apache.paimon.disk.FileChannelUtil; +import org.apache.paimon.disk.FileIOChannel; +import org.apache.paimon.disk.IOManager; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.KeyValueFileReaderFactory; +import org.apache.paimon.io.KeyValueFileWriterFactory; +import org.apache.paimon.io.RollingFileWriter; +import org.apache.paimon.options.MemorySize; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.reader.RecordReader.RecordIterator; +import org.apache.paimon.sort.BinaryExternalSortBuffer; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.CloseableIterator; +import org.apache.paimon.utils.KeyValueWithLevelNoReusingSerializer; +import org.apache.paimon.utils.MutableObjectIterator; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; + +/** + * Handles file rewriting for clustering compaction, including sorting unsorted files (Phase 1) and + * merging sorted files via multi-way merge (Phase 2). + */ +public class ClusteringFileRewriter { + + private final RowType keyType; + private final RowType valueType; + private final int[] clusteringColumns; + private final RecordComparator clusteringComparatorAlone; + private final RecordComparator clusteringComparatorInValue; + private final IOManager ioManager; + private final KeyValueFileReaderFactory valueReaderFactory; + private final KeyValueFileWriterFactory writerFactory; + private final ClusteringFiles fileLevels; + private final long targetFileSize; + private final long sortSpillBufferSize; + private final int pageSize; + private final int maxNumFileHandles; + private final int spillThreshold; + private final CompressOptions compression; + + public ClusteringFileRewriter( + RowType keyType, + RowType valueType, + int[] clusteringColumns, + RecordComparator clusteringComparatorAlone, + RecordComparator clusteringComparatorInValue, + IOManager ioManager, + KeyValueFileReaderFactory valueReaderFactory, + KeyValueFileWriterFactory writerFactory, + ClusteringFiles fileLevels, + long targetFileSize, + long sortSpillBufferSize, + int pageSize, + int maxNumFileHandles, + int spillThreshold, + CompressOptions compression) { + this.keyType = keyType; + this.valueType = valueType; + this.clusteringColumns = clusteringColumns; + this.clusteringComparatorAlone = clusteringComparatorAlone; + this.clusteringComparatorInValue = clusteringComparatorInValue; + this.ioManager = ioManager; + this.valueReaderFactory = valueReaderFactory; + this.writerFactory = writerFactory; + this.fileLevels = fileLevels; + this.targetFileSize = targetFileSize; + this.sortSpillBufferSize = sortSpillBufferSize; + this.pageSize = pageSize; + this.maxNumFileHandles = maxNumFileHandles; + this.spillThreshold = spillThreshold; + this.compression = compression; + } + + /** + * Sort and rewrite unsorted files by clustering columns. Reads all KeyValue records, sorts them + * using an external sort buffer, and writes to new level-1 files. + */ + public List sortAndRewriteFiles( + List inputFiles, KeyValueSerializer kvSerializer, RowType kvSchemaType) + throws Exception { + int[] sortFieldsInKeyValue = + Arrays.stream(clusteringColumns) + .map(i -> i + keyType.getFieldCount() + 2) + .toArray(); + BinaryExternalSortBuffer sortBuffer = + BinaryExternalSortBuffer.create( + ioManager, + kvSchemaType, + sortFieldsInKeyValue, + sortSpillBufferSize, + pageSize, + maxNumFileHandles, + compression, + MemorySize.MAX_VALUE, + false); + + for (DataFileMeta file : inputFiles) { + try (RecordReader reader = valueReaderFactory.createRecordReader(file)) { + try (CloseableIterator iterator = reader.toCloseableIterator()) { + while (iterator.hasNext()) { + KeyValue kv = iterator.next(); + InternalRow serializedRow = kvSerializer.toRow(kv); + sortBuffer.write(serializedRow); + } + } + } + } + + RollingFileWriter writer = + writerFactory.createRollingClusteringFileWriter(); + try { + MutableObjectIterator sortedIterator = sortBuffer.sortedIterator(); + BinaryRow binaryRow = new BinaryRow(kvSchemaType.getFieldCount()); + while ((binaryRow = sortedIterator.next(binaryRow)) != null) { + KeyValue kv = kvSerializer.fromRow(binaryRow); + writer.write( + kv.copy( + new InternalRowSerializer(keyType), + new InternalRowSerializer(valueType))); + } + } finally { + sortBuffer.clear(); + writer.close(); + } + + List newFiles = writer.result(); + for (DataFileMeta file : inputFiles) { + fileLevels.removeFile(file); + } + for (DataFileMeta newFile : newFiles) { + fileLevels.addNewFile(newFile); + } + + return newFiles; + } + + /** + * Pick merge candidate groups based on clustering column range overlap and file sizes. + * + * @param sortedFiles all sorted files + * @return list of merge groups; each group contains files to merge together + */ + public List> pickMergeCandidates(List sortedFiles) { + if (sortedFiles.size() < 2) { + return Collections.emptyList(); + } + + List> sections = groupIntoSections(sortedFiles); + + long smallSectionThreshold = targetFileSize / 2; + List> mergeGroups = new ArrayList<>(); + List pending = new ArrayList<>(); + + for (List section : sections) { + boolean needsMerge = section.size() >= 2; + boolean isSmall = sectionSize(section) < smallSectionThreshold; + + if (needsMerge || isSmall) { + pending.addAll(section); + } else { + if (pending.size() >= 2) { + mergeGroups.add(new ArrayList<>(pending)); + } + pending.clear(); + } + } + + if (pending.size() >= 2) { + mergeGroups.add(pending); + } + + return mergeGroups; + } + + /** + * Merge sorted files using min-heap based multi-way merge. Key index entries are deleted before + * reading and rebuilt after writing by the caller. + */ + public List mergeAndRewriteFiles(List inputFiles) throws Exception { + InternalRowSerializer keyRowSerializer = new InternalRowSerializer(keyType); + InternalRowSerializer valueRowSerializer = new InternalRowSerializer(valueType); + + // Determine which files to spill to row-based temp files + List filesToSpill = new ArrayList<>(); + List filesToKeep; + if (inputFiles.size() > spillThreshold) { + List sortedBySize = new ArrayList<>(inputFiles); + sortedBySize.sort(Comparator.comparingLong(DataFileMeta::fileSize)); + int spillCount = inputFiles.size() - spillThreshold; + filesToSpill = new ArrayList<>(sortedBySize.subList(0, spillCount)); + filesToKeep = new ArrayList<>(sortedBySize.subList(spillCount, sortedBySize.size())); + } else { + filesToKeep = inputFiles; + } + + // Spill smaller files to row-based temp files + List spilledChannels = new ArrayList<>(); + for (DataFileMeta file : filesToSpill) { + spilledChannels.add(spillToRowBasedFile(file)); + } + + // Open iterators and initialize the min-heap + List> openIterators = new ArrayList<>(); + PriorityQueue minHeap = + new PriorityQueue<>( + (a, b) -> + clusteringComparatorInValue.compare( + a.currentKeyValue.value(), b.currentKeyValue.value())); + + try { + for (DataFileMeta file : filesToKeep) { + @SuppressWarnings("resource") + CloseableIterator iterator = + valueReaderFactory.createRecordReader(file).toCloseableIterator(); + openIterators.add(iterator); + if (iterator.hasNext()) { + KeyValue firstKv = iterator.next().copy(keyRowSerializer, valueRowSerializer); + minHeap.add(new MergeEntry(firstKv, iterator)); + } + } + + for (SpilledChannel spilled : spilledChannels) { + CloseableIterator iterator = spilled.createIterator(); + openIterators.add(iterator); + if (iterator.hasNext()) { + KeyValue firstKv = iterator.next().copy(keyRowSerializer, valueRowSerializer); + minHeap.add(new MergeEntry(firstKv, iterator)); + } + } + + RollingFileWriter writer = + writerFactory.createRollingClusteringFileWriter(); + try { + while (!minHeap.isEmpty()) { + MergeEntry entry = minHeap.poll(); + writer.write(entry.currentKeyValue); + if (entry.iterator.hasNext()) { + entry.currentKeyValue = + entry.iterator.next().copy(keyRowSerializer, valueRowSerializer); + minHeap.add(entry); + } + } + } finally { + writer.close(); + } + + List newFiles = writer.result(); + for (DataFileMeta file : inputFiles) { + fileLevels.removeFile(file); + } + for (DataFileMeta newFile : newFiles) { + fileLevels.addNewFile(newFile); + } + + return newFiles; + } finally { + for (CloseableIterator iterator : openIterators) { + try { + iterator.close(); + } catch (Exception ignored) { + } + } + } + } + + private List> groupIntoSections(List files) { + List sorted = new ArrayList<>(files); + sorted.sort((a, b) -> clusteringComparatorAlone.compare(a.minKey(), b.minKey())); + + List> sections = new ArrayList<>(); + List currentSection = new ArrayList<>(); + currentSection.add(sorted.get(0)); + BinaryRow currentMax = sorted.get(0).maxKey(); + + for (int i = 1; i < sorted.size(); i++) { + DataFileMeta file = sorted.get(i); + if (clusteringComparatorAlone.compare(currentMax, file.minKey()) >= 0) { + currentSection.add(file); + if (clusteringComparatorAlone.compare(file.maxKey(), currentMax) > 0) { + currentMax = file.maxKey(); + } + } else { + sections.add(currentSection); + currentSection = new ArrayList<>(); + currentSection.add(file); + currentMax = file.maxKey(); + } + } + sections.add(currentSection); + return sections; + } + + private long sectionSize(List section) { + long total = 0; + for (DataFileMeta file : section) { + total += file.fileSize(); + } + return total; + } + + private SpilledChannel spillToRowBasedFile(DataFileMeta file) throws Exception { + FileIOChannel.ID channel = ioManager.createChannel(); + KeyValueWithLevelNoReusingSerializer serializer = + new KeyValueWithLevelNoReusingSerializer(keyType, valueType); + BlockCompressionFactory compressFactory = BlockCompressionFactory.create(compression); + int compressBlock = (int) MemorySize.parse("64 kb").getBytes(); + + ChannelWithMeta channelWithMeta; + ChannelWriterOutputView out = + FileChannelUtil.createOutputView( + ioManager, channel, compressFactory, compressBlock); + try (RecordReader reader = valueReaderFactory.createRecordReader(file)) { + RecordIterator batch; + KeyValue record; + while ((batch = reader.readBatch()) != null) { + while ((record = batch.next()) != null) { + serializer.serialize(record, out); + } + batch.releaseBatch(); + } + } finally { + out.close(); + channelWithMeta = + new ChannelWithMeta(channel, out.getBlockCount(), out.getWriteBytes()); + } + + return new SpilledChannel(channelWithMeta, compressFactory, compressBlock, serializer); + } + + /** Holds metadata for a spilled row-based temp file. */ + private class SpilledChannel { + private final ChannelWithMeta channel; + private final BlockCompressionFactory compressFactory; + private final int compressBlock; + private final KeyValueWithLevelNoReusingSerializer serializer; + + SpilledChannel( + ChannelWithMeta channel, + BlockCompressionFactory compressFactory, + int compressBlock, + KeyValueWithLevelNoReusingSerializer serializer) { + this.channel = channel; + this.compressFactory = compressFactory; + this.compressBlock = compressBlock; + this.serializer = serializer; + } + + CloseableIterator createIterator() throws IOException { + ChannelReaderInputView view = + FileChannelUtil.createInputView( + ioManager, channel, new ArrayList<>(), compressFactory, compressBlock); + BinaryRowSerializer rowSerializer = new BinaryRowSerializer(serializer.numFields()); + ChannelReaderInputViewIterator iterator = + new ChannelReaderInputViewIterator(view, null, rowSerializer); + return new SpilledChannelIterator(view, iterator, serializer); + } + } + + /** Iterator that reads KeyValue records from a spilled row-based temp file. */ + private static class SpilledChannelIterator implements CloseableIterator { + private final ChannelReaderInputView view; + private final ChannelReaderInputViewIterator iterator; + private final KeyValueWithLevelNoReusingSerializer serializer; + private KeyValue next; + + SpilledChannelIterator( + ChannelReaderInputView view, + ChannelReaderInputViewIterator iterator, + KeyValueWithLevelNoReusingSerializer serializer) { + this.view = view; + this.iterator = iterator; + this.serializer = serializer; + } + + @Override + public boolean hasNext() { + if (next != null) { + return true; + } + try { + BinaryRow row = iterator.next(); + if (row == null) { + return false; + } + next = serializer.fromRow(row); + return true; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public KeyValue next() { + if (!hasNext()) { + throw new java.util.NoSuchElementException(); + } + KeyValue result = next; + next = null; + return result; + } + + @Override + public void close() throws Exception { + view.getChannel().closeAndDelete(); + } + } + + /** Entry in the min-heap for multi-way merge. */ + private static class MergeEntry { + KeyValue currentKeyValue; + final CloseableIterator iterator; + + MergeEntry(KeyValue currentKeyValue, CloseableIterator iterator) { + this.currentKeyValue = currentKeyValue; + this.iterator = iterator; + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java new file mode 100644 index 000000000000..d7234345f67c --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.mergetree.compact.clustering; + +import org.apache.paimon.KeyValue; +import org.apache.paimon.compression.CompressOptions; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.serializer.RowCompactedSerializer; +import org.apache.paimon.deletionvectors.BucketedDvMaintainer; +import org.apache.paimon.disk.IOManager; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.KeyValueFileReaderFactory; +import org.apache.paimon.lookup.sort.db.SimpleLsmKvDb; +import org.apache.paimon.options.MemorySize; +import org.apache.paimon.reader.FileRecordIterator; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.sort.BinaryExternalSortBuffer; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.VarBinaryType; +import org.apache.paimon.utils.CloseableIterator; +import org.apache.paimon.utils.MutableObjectIterator; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.Closeable; +import java.io.IOException; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.IntStream; + +import static org.apache.paimon.utils.VarLengthIntUtils.decodeInt; +import static org.apache.paimon.utils.VarLengthIntUtils.encodeInt; + +/** + * Manages the primary key index for clustering compaction. Maps each primary key to its file + * location (fileId + row position) using a {@link SimpleLsmKvDb}. + */ +public class ClusteringKeyIndex implements Closeable { + + private final RowType keyType; + private final IOManager ioManager; + private final KeyValueFileReaderFactory keyReaderFactory; + private final BucketedDvMaintainer dvMaintainer; + private final SimpleLsmKvDb kvDb; + private final ClusteringFiles fileLevels; + private final boolean firstRow; + private final long sortSpillBufferSize; + private final int pageSize; + private final int maxNumFileHandles; + private final CompressOptions compression; + + public ClusteringKeyIndex( + RowType keyType, + IOManager ioManager, + KeyValueFileReaderFactory keyReaderFactory, + BucketedDvMaintainer dvMaintainer, + SimpleLsmKvDb kvDb, + ClusteringFiles fileLevels, + boolean firstRow, + long sortSpillBufferSize, + int pageSize, + int maxNumFileHandles, + CompressOptions compression) { + this.keyType = keyType; + this.ioManager = ioManager; + this.keyReaderFactory = keyReaderFactory; + this.dvMaintainer = dvMaintainer; + this.kvDb = kvDb; + this.fileLevels = fileLevels; + this.firstRow = firstRow; + this.sortSpillBufferSize = sortSpillBufferSize; + this.pageSize = pageSize; + this.maxNumFileHandles = maxNumFileHandles; + this.compression = compression; + } + + /** Bootstrap the key index from existing sorted files using external sort + bulk load. */ + public void bootstrap(List restoreFiles) { + List combinedFields = new ArrayList<>(); + List keyFields = keyType.getFields(); + for (int i = 0; i < keyFields.size(); i++) { + DataField kf = keyFields.get(i); + combinedFields.add(new DataField(i, kf.name(), kf.type())); + } + int valueFieldIndex = keyFields.size(); + combinedFields.add( + new DataField( + valueFieldIndex, "_value_bytes", new VarBinaryType(Integer.MAX_VALUE))); + RowType combinedType = new RowType(combinedFields); + + int[] sortFields = IntStream.range(0, keyType.getFieldCount()).toArray(); + BinaryExternalSortBuffer sortBuffer = + BinaryExternalSortBuffer.create( + ioManager, + combinedType, + sortFields, + sortSpillBufferSize, + pageSize, + maxNumFileHandles, + compression, + MemorySize.MAX_VALUE, + false); + + RowCompactedSerializer keySerializer = new RowCompactedSerializer(keyType); + InternalRow.FieldGetter[] keyFieldGetters = + new InternalRow.FieldGetter[keyType.getFieldCount()]; + for (int i = 0; i < keyType.getFieldCount(); i++) { + keyFieldGetters[i] = InternalRow.createFieldGetter(keyType.getTypeAt(i), i); + } + try { + for (DataFileMeta file : restoreFiles) { + if (file.level() == 0) { + continue; + } + int fileId = fileLevels.getFileIdByName(file.fileName()); + try (RecordReader reader = keyReaderFactory.createRecordReader(file)) { + FileRecordIterator batch; + while ((batch = (FileRecordIterator) reader.readBatch()) != null) { + KeyValue kv; + while ((kv = batch.next()) != null) { + int position = (int) batch.returnedPosition(); + ByteArrayOutputStream valueOut = new ByteArrayOutputStream(8); + encodeInt(valueOut, fileId); + encodeInt(valueOut, position); + byte[] valueBytes = valueOut.toByteArray(); + + GenericRow combinedRow = new GenericRow(combinedType.getFieldCount()); + for (int i = 0; i < keyType.getFieldCount(); i++) { + combinedRow.setField( + i, keyFieldGetters[i].getFieldOrNull(kv.key())); + } + combinedRow.setField(valueFieldIndex, valueBytes); + sortBuffer.write(combinedRow); + } + batch.releaseBatch(); + } + } + } + + MutableObjectIterator sortedIterator = sortBuffer.sortedIterator(); + BinaryRow binaryRow = new BinaryRow(combinedType.getFieldCount()); + InternalRow.FieldGetter valueGetter = + InternalRow.createFieldGetter( + new VarBinaryType(Integer.MAX_VALUE), valueFieldIndex); + + Iterator> entryIterator = + new Iterator>() { + private BinaryRow current = binaryRow; + private boolean hasNext; + + { + advance(); + } + + private void advance() { + try { + current = sortedIterator.next(current); + hasNext = current != null; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean hasNext() { + return hasNext; + } + + @Override + public Map.Entry next() { + byte[] key = keySerializer.serializeToBytes(current); + byte[] value = (byte[]) valueGetter.getFieldOrNull(current); + advance(); + return new AbstractMap.SimpleImmutableEntry<>(key, value); + } + }; + + kvDb.bulkLoad(entryIterator); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + sortBuffer.clear(); + } + } + + /** + * Update the key index after a single original file is replaced by new sorted files. + * + *

For DEDUPLICATE mode: mark the old position in deletion vectors, keep the new position. + * + *

For FIRST_ROW mode: if key exists, mark the new position in deletion vectors (keep the + * first/old one); if key is new, store the new position. + */ + public void updateIndex(DataFileMeta originalFile, List newSortedFiles) + throws Exception { + updateIndex(Collections.singletonList(originalFile), newSortedFiles); + } + + /** + * Update the key index after multiple original files are replaced by new sorted files. + * + * @see #updateIndex(DataFileMeta, List) + */ + public void updateIndex(List originalFiles, List newSortedFiles) + throws Exception { + RowCompactedSerializer keySerializer = new RowCompactedSerializer(keyType); + + Set originalFileNames = new HashSet<>(); + for (DataFileMeta file : originalFiles) { + originalFileNames.add(file.fileName()); + } + + for (DataFileMeta sortedFile : newSortedFiles) { + int fileId = fileLevels.getFileIdByName(sortedFile.fileName()); + int position = 0; + try (CloseableIterator iterator = readKeyIterator(sortedFile)) { + while (iterator.hasNext()) { + byte[] key = keySerializer.serializeToBytes(iterator.next()); + byte[] oldValue = kvDb.get(key); + if (oldValue != null) { + ByteArrayInputStream valueIn = new ByteArrayInputStream(oldValue); + int oldFileId = decodeInt(valueIn); + int oldPosition = decodeInt(valueIn); + DataFileMeta oldFile = fileLevels.getFileById(oldFileId); + if (oldFile != null && !originalFileNames.contains(oldFile.fileName())) { + if (firstRow) { + dvMaintainer.notifyNewDeletion(sortedFile.fileName(), position); + position++; + continue; + } else { + dvMaintainer.notifyNewDeletion(oldFile.fileName(), oldPosition); + } + } + } + ByteArrayOutputStream value = new ByteArrayOutputStream(8); + encodeInt(value, fileId); + encodeInt(value, position); + kvDb.put(key, value.toByteArray()); + position++; + } + } + } + } + + /** Delete key index entries for the given file (only if they still point to it). */ + public void deleteIndex(DataFileMeta file) throws Exception { + RowCompactedSerializer keySerializer = new RowCompactedSerializer(keyType); + int fileId = fileLevels.getFileIdByName(file.fileName()); + try (CloseableIterator iterator = readKeyIterator(file)) { + while (iterator.hasNext()) { + byte[] key = keySerializer.serializeToBytes(iterator.next()); + byte[] value = kvDb.get(key); + if (value != null) { + int storedFileId = decodeInt(new ByteArrayInputStream(value)); + if (storedFileId == fileId) { + kvDb.delete(key); + } + } + } + } + } + + /** Rebuild key index entries for a newly written file. */ + public void rebuildIndex(DataFileMeta newFile) throws Exception { + RowCompactedSerializer keySerializer = new RowCompactedSerializer(keyType); + int fileId = fileLevels.getFileIdByName(newFile.fileName()); + int position = 0; + try (CloseableIterator keyIterator = readKeyIterator(newFile)) { + while (keyIterator.hasNext()) { + byte[] key = keySerializer.serializeToBytes(keyIterator.next()); + ByteArrayOutputStream value = new ByteArrayOutputStream(8); + encodeInt(value, fileId); + encodeInt(value, position); + kvDb.put(key, value.toByteArray()); + position++; + } + } + } + + private CloseableIterator readKeyIterator(DataFileMeta file) throws IOException { + //noinspection resource + return keyReaderFactory + .createRecordReader(file) + .transform(KeyValue::key) + .toCloseableIterator(); + } + + @Override + public void close() throws IOException { + kvDb.close(); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java b/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java index f69f5fc6aed6..aaf661be2fac 100644 --- a/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java @@ -336,6 +336,56 @@ public void testDeletionVectorCorrectness() throws Exception { .containsExactlyInAnyOrder(GenericRow.of(1, 50), GenericRow.of(2, 60)); } + /** + * Test that bootstrap correctly rebuilds the key index via bulkLoad from existing sorted files. + * + *

Each writeRows() call creates a new writer (and thus a new ClusteringCompactManager), + * which calls {@code keyIndex.bootstrap(restoreFiles)}. The bootstrap method reads all level > + * 0 files, sorts them externally, and bulk-loads into the LSM KV DB — bypassing the normal + * put-per-entry path. This test verifies that the bulkLoad-based index is correct by checking + * deduplication across multiple commits with overlapping keys. + */ + @Test + public void testBootstrapBulkLoadIndex() throws Exception { + // Commit 1: write initial data → compaction produces level > 0 sorted files + writeRows( + Arrays.asList( + GenericRow.of(1, 10), + GenericRow.of(2, 20), + GenericRow.of(3, 30), + GenericRow.of(4, 40), + GenericRow.of(5, 50))); + + // Commit 2: new writer bootstraps index from level > 0 files via bulkLoad, + // then writes overlapping keys — updateIndex must find existing entries in the + // bulkLoaded index to generate correct deletion vectors + writeRows( + Arrays.asList(GenericRow.of(1, 100), GenericRow.of(3, 300), GenericRow.of(5, 500))); + + // Verify dedup: keys 1,3,5 updated; keys 2,4 unchanged + assertThat(readRows()) + .containsExactlyInAnyOrder( + GenericRow.of(1, 100), + GenericRow.of(2, 20), + GenericRow.of(3, 300), + GenericRow.of(4, 40), + GenericRow.of(5, 500)); + + // Commit 3: another bootstrap from the updated sorted files, + // verifies bulkLoad works correctly after files have been rewritten + writeRows( + Arrays.asList(GenericRow.of(2, 200), GenericRow.of(4, 400), GenericRow.of(6, 600))); + + assertThat(readRows()) + .containsExactlyInAnyOrder( + GenericRow.of(1, 100), + GenericRow.of(2, 200), + GenericRow.of(3, 300), + GenericRow.of(4, 400), + GenericRow.of(5, 500), + GenericRow.of(6, 600)); + } + // ==================== Clustering Column Filter Tests ==================== /** Test that equality filter on clustering column skips irrelevant files in the scan plan. */