Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
Expand Down Expand Up @@ -94,6 +95,7 @@ public class SimpleLsmKvDb implements Closeable {
private final SortLookupStoreFactory storeFactory;
private final Comparator<MemorySlice> keyComparator;
private final long memTableFlushThreshold;
private final long maxSstFileSize;
private final LsmCompactor compactor;

/** Active MemTable: key -> value bytes (empty byte[] = tombstone). */
Expand Down Expand Up @@ -127,6 +129,7 @@ private SimpleLsmKvDb(
this.storeFactory = storeFactory;
this.keyComparator = keyComparator;
this.memTableFlushThreshold = memTableFlushThreshold;
this.maxSstFileSize = maxSstFileSize;
this.memTable = new TreeMap<>(keyComparator);
this.memTableSize = 0;
this.levels = new ArrayList<>();
Expand Down Expand Up @@ -225,6 +228,89 @@ public void delete(byte[] key) throws IOException {
maybeFlushMemTable();
}

/**
* Bulk-load globally sorted entries directly into SST files at the deepest level, bypassing
* MemTable, flush, and compaction entirely. The database must be empty when this is called.
*
* @param sortedEntries an iterator of key-value pairs in sorted order (by the DB's key
* comparator)
*/
public void bulkLoad(Iterator<Map.Entry<byte[], byte[]>> sortedEntries) throws IOException {
ensureOpen();
if (!memTable.isEmpty() || getSstFileCount() > 0) {
throw new IllegalStateException(
"bulkLoad requires an empty database (no memTable entries and no SST files)");
}

int targetLevel = MAX_LEVELS - 1;
List<SstFileMetadata> targetLevelFiles = levels.get(targetLevel);

SortLookupStoreWriter currentWriter = null;
File currentSstFile = null;
MemorySlice currentFileMinKey = null;
MemorySlice currentFileMaxKey = null;
long currentBatchSize = 0;

try {
while (sortedEntries.hasNext()) {
Map.Entry<byte[], byte[]> entry = sortedEntries.next();
byte[] key = entry.getKey();
byte[] value = entry.getValue();

if (currentWriter == null) {
currentSstFile = newSstFile();
currentWriter = storeFactory.createWriter(currentSstFile, null);
currentFileMinKey = MemorySlice.wrap(key);
currentBatchSize = 0;
}

currentWriter.put(key, value);
currentFileMaxKey = MemorySlice.wrap(key);
currentBatchSize += key.length + value.length;

if (currentBatchSize >= maxSstFileSize) {
currentWriter.close();
targetLevelFiles.add(
new SstFileMetadata(
currentSstFile,
currentFileMinKey,
currentFileMaxKey,
0,
targetLevel));
currentWriter = null;
currentSstFile = null;
currentFileMinKey = null;
currentFileMaxKey = null;
}
}

if (currentWriter != null) {
currentWriter.close();
targetLevelFiles.add(
new SstFileMetadata(
currentSstFile,
currentFileMinKey,
currentFileMaxKey,
0,
targetLevel));
}
} catch (IOException | RuntimeException e) {
if (currentWriter != null) {
try {
currentWriter.close();
} catch (IOException suppressed) {
e.addSuppressed(suppressed);
}
}
throw e;
}

LOG.info(
"Bulk-loaded {} SST files directly to level {}",
targetLevelFiles.size(),
targetLevel);
}

// -------------------------------------------------------------------------
// Read Operations
// -------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@

import java.io.File;
import java.io.IOException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;

import static java.nio.charset.StandardCharsets.UTF_8;

Expand Down Expand Up @@ -1321,6 +1325,126 @@ public void testGroupMergePreservesDeleteSemantics() throws IOException {
}
}

@Test
public void testBulkLoad() throws IOException {
try (SimpleLsmKvDb db = createDb()) {
// Prepare sorted entries
List<Map.Entry<byte[], byte[]>> entries = new ArrayList<>();
for (int i = 0; i < 100; i++) {
String key = String.format("key-%05d", i);
String value = String.format("value-%05d", i);
entries.add(
new AbstractMap.SimpleImmutableEntry<>(
key.getBytes(UTF_8), value.getBytes(UTF_8)));
}

db.bulkLoad(entries.iterator());

// All data at deepest level, no L0 files
Assertions.assertEquals(0, db.getLevelFileCount(0));
Assertions.assertTrue(db.getLevelFileCount(SimpleLsmKvDb.MAX_LEVELS - 1) > 0);

// All keys should be readable
for (int i = 0; i < 100; i++) {
String expected = String.format("value-%05d", i);
String actual = getString(db, String.format("key-%05d", i));
Assertions.assertEquals(expected, actual, "Mismatch at index " + i);
}
}
}

@Test
public void testBulkLoadMultipleSstFiles() throws IOException {
// Use a small maxSstFileSize to force multiple SST files
SimpleLsmKvDb db =
SimpleLsmKvDb.builder(new File(tempDir.toFile(), "bulk-multi-db"))
.memTableFlushThreshold(1024)
.maxSstFileSize(512)
.blockSize(128)
.level0FileNumCompactTrigger(4)
.compressOptions(new CompressOptions("none", 1))
.build();

try {
List<Map.Entry<byte[], byte[]>> entries = new ArrayList<>();
for (int i = 0; i < 200; i++) {
String key = String.format("key-%05d", i);
String value = String.format("value-%05d", i);
entries.add(
new AbstractMap.SimpleImmutableEntry<>(
key.getBytes(UTF_8), value.getBytes(UTF_8)));
}

db.bulkLoad(entries.iterator());

// Multiple SST files should be created at the deepest level
int deepestLevelFiles = db.getLevelFileCount(SimpleLsmKvDb.MAX_LEVELS - 1);
Assertions.assertTrue(
deepestLevelFiles > 1,
"Expected multiple SST files at deepest level, got " + deepestLevelFiles);
Assertions.assertEquals(0, db.getLevelFileCount(0));

// All keys should be readable
for (int i = 0; i < 200; i++) {
String expected = String.format("value-%05d", i);
String actual = getString(db, String.format("key-%05d", i));
Assertions.assertEquals(expected, actual, "Mismatch at index " + i);
}
} finally {
db.close();
}
}

@Test
public void testBulkLoadEmptyIterator() throws IOException {
try (SimpleLsmKvDb db = createDb()) {
List<Map.Entry<byte[], byte[]>> empty = new ArrayList<>();
db.bulkLoad(empty.iterator());

Assertions.assertEquals(0, db.getSstFileCount());
Assertions.assertNull(getString(db, "any-key"));
}
}

@Test
public void testBulkLoadThenPutAndGet() throws IOException {
try (SimpleLsmKvDb db = createDb()) {
// Bulk load initial data
List<Map.Entry<byte[], byte[]>> entries = new ArrayList<>();
for (int i = 0; i < 50; i++) {
String key = String.format("key-%05d", i);
String value = String.format("value-%05d", i);
entries.add(
new AbstractMap.SimpleImmutableEntry<>(
key.getBytes(UTF_8), value.getBytes(UTF_8)));
}
db.bulkLoad(entries.iterator());

// Now use normal put to add/overwrite data
putString(db, "key-00000", "overwritten");
putString(db, "key-99999", "new-key");

Assertions.assertEquals("overwritten", getString(db, "key-00000"));
Assertions.assertEquals("new-key", getString(db, "key-99999"));
Assertions.assertEquals("value-00025", getString(db, String.format("key-%05d", 25)));
}
}

@Test
public void testBulkLoadFailsOnNonEmptyDb() throws IOException {
try (SimpleLsmKvDb db = createDb()) {
putString(db, "existing", "data");

List<Map.Entry<byte[], byte[]>> entries = new ArrayList<>();
entries.add(
new AbstractMap.SimpleImmutableEntry<>(
"key".getBytes(UTF_8), "value".getBytes(UTF_8)));

Assertions.assertThrows(
IllegalStateException.class, () -> db.bulkLoad(entries.iterator()));
}
}

private static void putString(SimpleLsmKvDb db, String key, String value) throws IOException {
db.put(key.getBytes(UTF_8), value.getBytes(UTF_8));
}
Expand Down
Loading