From 398aafb58dc2fcb22b5086d38da66455b94f3fc7 Mon Sep 17 00:00:00 2001 From: void-ptr974 Date: Tue, 7 Apr 2026 12:50:49 +0800 Subject: [PATCH 1/4] fix #4105 When journal and dbledgerstorage on the same disk, concurrent checkpointComplete calls from SyncThread and SingleDirectoryDbLedgerStorage.flush() can overwrite lastMark backwards,causing journal files to be deleted while still referenced by the older mark. --- .../org/apache/bookkeeper/bookie/Journal.java | 21 ++++ .../storage/ldb/DbLedgerStorageTest.java | 118 ++++++++++++++++++ 2 files changed, 139 insertions(+) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index 2d68b8d2da8..730f3a2c895 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -623,6 +623,13 @@ static void writePaddingBytes(JournalChannel jc, ByteBuf paddingBuffer, int jour private final LastLogMark lastLogMark = new LastLogMark(0, 0); + // Guards checkpointComplete to ensure lastMark only advances forward. + // Without this, concurrent checkpointComplete calls from SyncThread and + // SingleDirectoryDbLedgerStorage.flush() can overwrite lastMark backwards, + // causing journal files to be deleted while still referenced by the older mark. + private final Object checkpointLock = new Object(); + private final LogMark lastPersistedMark = new LogMark(0, 0); + private static final String LAST_MARK_DEFAULT_NAME = "lastMark"; private final String lastMarkFileName; @@ -766,6 +773,11 @@ public Checkpoint newCheckpoint() { /** * Telling journal a checkpoint is finished. * + *

Multiple threads (SyncThread and DbLedgerStorage flush) may call this concurrently. + * A monotonic check ensures the lastMark file only advances forward — a later call with + * an older mark is safely skipped. This prevents the race where a slower flush overwrites + * lastMark backwards, causing referenced journal files to be garbage collected prematurely. + * * @throws IOException */ @Override @@ -776,6 +788,15 @@ public void checkpointComplete(Checkpoint checkpoint, boolean compact) throws IO LogMarkCheckpoint lmcheckpoint = (LogMarkCheckpoint) checkpoint; LastLogMark mark = lmcheckpoint.mark; + // Monotonic check: only advance lastMark forward, never backwards. + synchronized (checkpointLock) { + if (mark.getCurMark().compare(lastPersistedMark) <= 0) { + return; + } + lastPersistedMark.setLogMark( + mark.getCurMark().getLogFileId(), mark.getCurMark().getLogFileOffset()); + } + mark.rollLog(mark); if (compact) { // list the journals that have been marked diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java index dfc2459678b..b58cec54e5c 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java @@ -45,6 +45,7 @@ import org.apache.bookkeeper.bookie.CheckpointSourceList; import org.apache.bookkeeper.bookie.DefaultEntryLogger; import org.apache.bookkeeper.bookie.EntryLocation; +import org.apache.bookkeeper.bookie.Journal; import org.apache.bookkeeper.bookie.LedgerDirsManager; import org.apache.bookkeeper.bookie.LedgerStorage; import org.apache.bookkeeper.bookie.LogMark; @@ -881,4 +882,121 @@ public void testSingleLedgerDirectoryCheckpointTriggerRemovePendingDeletedLedger bookie.getLedgerStorage().flush(); Assert.assertEquals(pendingDeletedLedgers.size(), 0); } + + /** + * Simulates the full journal-missing scenario caused by concurrent checkpointComplete + * calls in single-dir mode and verifies the monotonic fix prevents it. + * + *

Timeline of the original bug: + *

+     * 1. SDLS.flush() starts: newCheckpoint → mark(5, 0), begins flushing data
+     * 2. SyncThread starts:   newCheckpoint → mark(7, 0), waits for flushMutex
+     * 3. SDLS.flush() completes flushing, releases flushMutex
+     * 4. SyncThread acquires flushMutex, finds writeCache empty, returns
+     * 5. SyncThread calls checkpointComplete(mark=7, compact=true) FIRST
+     *    → rollLog(7), GC deletes journal files 3,4,5,6 (all with id < 7)
+     * 6. SDLS.flush() calls checkpointComplete(mark=5, compact=true) SECOND
+     *    → rollLog(5) — OVERWRITES lastMark backwards from 7 to 5!
+     *    → journal file 5 was already deleted in step 5
+     * 7. Bookie restarts: reads lastMark=5, looks for journal file 5 → MISSING!
+     *    → throws "Recovery log 5 is missing"
+     * 
+ * + *

With the monotonic fix, step 6 is skipped (mark 5 <= lastPersistedMark 7), + * so lastMark stays at 7 and the restart succeeds. + */ + @Test + public void testConcurrentCheckpointCompleteJournalMissing() throws Exception { + File baseDir = new File(tmpDir, "journalMissingTest"); + File ledgerDir = new File(baseDir, "ledger"); + File journalBaseDir = new File(baseDir, "journal"); + ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); + conf.setGcWaitTime(1000); + conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 4); + conf.setProperty(DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB, 4); + conf.setLedgerStorageClass(DbLedgerStorage.class.getName()); + conf.setLedgerDirNames(new String[] { ledgerDir.getCanonicalPath() }); + conf.setJournalDirName(journalBaseDir.getCanonicalPath()); + // Set maxBackupJournals to 0 so GC aggressively deletes all old journals + conf.setMaxBackupJournals(0); + + BookieImpl bookie = new TestBookieImpl(conf); + try { + Journal journal = bookie.getJournals().get(0); + File journalDir = journal.getJournalDirectory(); + File ledgerDirMark = new File(ledgerDir + "/current", "lastMark"); + + // Create fake journal files: 3.txn, 4.txn, 5.txn, 6.txn, 7.txn, 8.txn + for (long id = 3; id <= 8; id++) { + File journalFile = new File(journalDir, Long.toHexString(id) + ".txn"); + assertTrue("Failed to create journal file " + id, journalFile.createNewFile()); + } + + // Verify all journal files exist + for (long id = 3; id <= 8; id++) { + File journalFile = new File(journalDir, Long.toHexString(id) + ".txn"); + assertTrue("Journal file " + id + " should exist", journalFile.exists()); + } + + CheckpointSource checkpointSource = new CheckpointSourceList(bookie.getJournals()); + + // === Simulate the race === + + // Step 1: SDLS.flush() captures checkpoint at mark(5, 100) + journal.getLastLogMark().getCurMark().setLogMark(5, 100); + CheckpointSource.Checkpoint cpFlush = checkpointSource.newCheckpoint(); + + // Step 2: SyncThread captures checkpoint at mark(7, 200) — newer position + journal.getLastLogMark().getCurMark().setLogMark(7, 200); + CheckpointSource.Checkpoint cpSync = checkpointSource.newCheckpoint(); + + // Step 5: SyncThread completes FIRST — checkpointComplete(mark=7, compact=true) + // This should: rollLog to 7, GC journals with id < 7 (deletes 3,4,5,6) + checkpointSource.checkpointComplete(cpSync, true); + + LogMark markAfterSync = readLogMark(ledgerDirMark); + assertEquals("lastMark should be at 7 after SyncThread", 7, markAfterSync.getLogFileId()); + assertEquals(200, markAfterSync.getLogFileOffset()); + + // Verify journals 3,4,5,6 were GC'd, 7,8 still exist + for (long id = 3; id <= 6; id++) { + File journalFile = new File(journalDir, Long.toHexString(id) + ".txn"); + assertFalse("Journal " + id + " should have been GC'd", journalFile.exists()); + } + for (long id = 7; id <= 8; id++) { + File journalFile = new File(journalDir, Long.toHexString(id) + ".txn"); + assertTrue("Journal " + id + " should still exist", journalFile.exists()); + } + + // Step 6: SDLS.flush() completes SECOND — checkpointComplete(mark=5, compact=true) + // WITHOUT FIX: rollLog would overwrite lastMark to 5, but journal 5 is already deleted! + // WITH FIX: mark 5 <= lastPersistedMark 7, so this is skipped entirely. + checkpointSource.checkpointComplete(cpFlush, true); + + // Verify: lastMark must NOT regress to 5. Should stay at 7. + LogMark markAfterFlush = readLogMark(ledgerDirMark); + assertEquals("lastMark must not regress after older checkpoint completes", + 7, markAfterFlush.getLogFileId()); + assertEquals(200, markAfterFlush.getLogFileOffset()); + + // Step 7: Simulate bookie restart — read lastMark and check journal exists + journal.getLastLogMark().readLog(); + LogMark restartMark = journal.getLastLogMark().getCurMark(); + assertEquals("Reloaded lastMark should be 7", 7, restartMark.getLogFileId()); + + // The journal file pointed to by lastMark must exist + File markedJournal = new File(journalDir, + Long.toHexString(restartMark.getLogFileId()) + ".txn"); + assertTrue("Journal file " + restartMark.getLogFileId() + " must exist for recovery", + markedJournal.exists()); + + // Verify that listJournalIds finds the expected journal for replay + List replayLogs = Journal.listJournalIds(journalDir, + journalId -> journalId >= restartMark.getLogFileId()); + assertTrue("Replay journal list must contain the marked journal", + replayLogs.size() > 0 && replayLogs.get(0) == restartMark.getLogFileId()); + } finally { + bookie.getLedgerStorage().shutdown(); + } + } } From 7a5309afa459661bae298fb3af6407005a539cb7 Mon Sep 17 00:00:00 2001 From: void-ptr974 Date: Wed, 8 Apr 2026 18:23:07 +0800 Subject: [PATCH 2/4] fix #4105 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When singleLedgerDirs=true, bookie can crash on restart with "Recovery log is missing" due to lastMark file being overwritten backwards. Root cause SyncThread.flush() has a nested call pattern that causes lastMark regression: SyncThread.flush(): 1. outerCheckpoint = newCheckpoint() → mark = (file 5, offset 100) 2. ledgerStorage.flush() → SingleDirectoryDbLedgerStorage.flush(): a. innerCheckpoint = newCheckpoint() → mark = (file 7, offset 200), journal has advanced b. flushes data to disk c. checkpointComplete(mark=7, compact=true) → persists lastMark=7, deletes journal files with id < 7 (including file 5) 3. checkpointComplete(mark=5, compact=false) → persists lastMark=5, overwriting the 7 written in step 2c 4. Bookie restarts → reads lastMark=5 → file 5 no longer exists → crash Step 1 captures the checkpoint before step 2 runs, so it is always older. Step 2c advances lastMark and garbage-collects old journals. Step 3 then overwrites lastMark backwards to a position whose journal file was already deleted. Conditions - singleLedgerDirs=true (journal and ledger on the same disk, so SingleDirectoryDbLedgerStorage.flush() calls checkpointComplete internally) - Journal file rotates between the two newCheckpoint() calls (requires sufficient write throughput) - maxBackupJournals small enough for old files to actually be deleted Fix Add a monotonic guard in checkpointComplete(): track the highest mark persisted so far, skip any call with an older mark. This prevents rollLog from overwriting lastMark backwards. --- .../org/apache/bookkeeper/bookie/Journal.java | 21 +++--- .../storage/ldb/DbLedgerStorageTest.java | 65 ++++++++++--------- 2 files changed, 48 insertions(+), 38 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index 730f3a2c895..3af2f219121 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -624,9 +624,11 @@ static void writePaddingBytes(JournalChannel jc, ByteBuf paddingBuffer, int jour private final LastLogMark lastLogMark = new LastLogMark(0, 0); // Guards checkpointComplete to ensure lastMark only advances forward. - // Without this, concurrent checkpointComplete calls from SyncThread and - // SingleDirectoryDbLedgerStorage.flush() can overwrite lastMark backwards, - // causing journal files to be deleted while still referenced by the older mark. + // When singleLedgerDirs=true, SyncThread.flush() takes a checkpoint, then calls + // ledgerStorage.flush() → SingleDirectoryDbLedgerStorage.flush(), which internally + // takes a NEWER checkpoint and completes it (with garbage collection). When control + // returns, SyncThread completes its own OLDER checkpoint, overwriting lastMark backwards + // to a journal file already deleted by the inner garbage collection. private final Object checkpointLock = new Object(); private final LogMark lastPersistedMark = new LogMark(0, 0); @@ -773,10 +775,9 @@ public Checkpoint newCheckpoint() { /** * Telling journal a checkpoint is finished. * - *

Multiple threads (SyncThread and DbLedgerStorage flush) may call this concurrently. - * A monotonic check ensures the lastMark file only advances forward — a later call with - * an older mark is safely skipped. This prevents the race where a slower flush overwrites - * lastMark backwards, causing referenced journal files to be garbage collected prematurely. + *

If the given checkpoint is not newer than the last persisted mark, the call is + * a no-op. This monotonic guarantee prevents lastMark from regressing backwards. + * See the comment on {@code checkpointLock} for the full scenario. * * @throws IOException */ @@ -788,9 +789,11 @@ public void checkpointComplete(Checkpoint checkpoint, boolean compact) throws IO LogMarkCheckpoint lmcheckpoint = (LogMarkCheckpoint) checkpoint; LastLogMark mark = lmcheckpoint.mark; - // Monotonic check: only advance lastMark forward, never backwards. + // Monotonic check: skip if this mark is not newer than what was already persisted. + // This prevents lastMark regression from nested checkpointComplete calls + // (see class-level comment on checkpointLock for the full scenario). synchronized (checkpointLock) { - if (mark.getCurMark().compare(lastPersistedMark) <= 0) { + if (mark.getCurMark().compare(lastPersistedMark) < 0) { return; } lastPersistedMark.setLogMark( diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java index b58cec54e5c..c99c123a7cb 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java @@ -884,26 +884,29 @@ public void testSingleLedgerDirectoryCheckpointTriggerRemovePendingDeletedLedger } /** - * Simulates the full journal-missing scenario caused by concurrent checkpointComplete - * calls in single-dir mode and verifies the monotonic fix prevents it. + * Simulates the journal-missing scenario caused by lastMark regression in single-dir + * mode (singleLedgerDirs=true) and verifies the monotonic fix prevents it. * - *

Timeline of the original bug: + *

The trigger is a single-thread nested call within SyncThread.flush(): *

-     * 1. SDLS.flush() starts: newCheckpoint → mark(5, 0), begins flushing data
-     * 2. SyncThread starts:   newCheckpoint → mark(7, 0), waits for flushMutex
-     * 3. SDLS.flush() completes flushing, releases flushMutex
-     * 4. SyncThread acquires flushMutex, finds writeCache empty, returns
-     * 5. SyncThread calls checkpointComplete(mark=7, compact=true) FIRST
-     *    → rollLog(7), GC deletes journal files 3,4,5,6 (all with id < 7)
-     * 6. SDLS.flush() calls checkpointComplete(mark=5, compact=true) SECOND
-     *    → rollLog(5) — OVERWRITES lastMark backwards from 7 to 5!
-     *    → journal file 5 was already deleted in step 5
-     * 7. Bookie restarts: reads lastMark=5, looks for journal file 5 → MISSING!
-     *    → throws "Recovery log 5 is missing"
+     * SyncThread.flush():
+     *   1. outerCheckpoint = newCheckpoint()          → captures mark(5, 100)
+     *   2. ledgerStorage.flush()                      → delegates to SingleDirectoryDbLedgerStorage.flush():
+     *      2a. innerCheckpoint = newCheckpoint()      → captures mark(7, 200), journal has advanced
+     *      2b. checkpoint(innerCheckpoint)            → flushes data to disk
+     *      2c. checkpointComplete(innerCheckpoint=7, compact=true)
+     *          → rollLog(7), garbage collects journal files 3,4,5,6 (id < 7)
+     *   3. checkpointComplete(outerCheckpoint=5, compact=false)
+     *      → rollLog(5) — OVERWRITES lastMark backwards from 7 to 5!
+     *      → journal file 5 was already deleted in step 2c
+     *   4. Bookie restarts: reads lastMark=5, looks for journal 5 → MISSING!
+     *      → throws "Recovery log 5 is missing"
      * 
* - *

With the monotonic fix, step 6 is skipped (mark 5 <= lastPersistedMark 7), - * so lastMark stays at 7 and the restart succeeds. + *

This test simulates the core regression: a newer checkpoint is completed first + * (with garbage collection), then an older checkpoint attempts to overwrite lastMark + * backwards. With the monotonic fix, the older mark is skipped + * (5 < lastPersistedMark 7), so lastMark stays at 7 and the restart succeeds. */ @Test public void testConcurrentCheckpointCompleteJournalMissing() throws Exception { @@ -917,7 +920,7 @@ public void testConcurrentCheckpointCompleteJournalMissing() throws Exception { conf.setLedgerStorageClass(DbLedgerStorage.class.getName()); conf.setLedgerDirNames(new String[] { ledgerDir.getCanonicalPath() }); conf.setJournalDirName(journalBaseDir.getCanonicalPath()); - // Set maxBackupJournals to 0 so GC aggressively deletes all old journals + // Set maxBackupJournals to 0 so garbage collection aggressively deletes all old journals conf.setMaxBackupJournals(0); BookieImpl bookie = new TestBookieImpl(conf); @@ -940,38 +943,42 @@ public void testConcurrentCheckpointCompleteJournalMissing() throws Exception { CheckpointSource checkpointSource = new CheckpointSourceList(bookie.getJournals()); - // === Simulate the race === + // === Simulate the lastMark regression === - // Step 1: SDLS.flush() captures checkpoint at mark(5, 100) + // Step 1: SyncThread takes checkpoint at mark(5, 100) before calling ledgerStorage.flush() journal.getLastLogMark().getCurMark().setLogMark(5, 100); CheckpointSource.Checkpoint cpFlush = checkpointSource.newCheckpoint(); - // Step 2: SyncThread captures checkpoint at mark(7, 200) — newer position + // Step 2: Inside ledgerStorage.flush() → SingleDirectoryDbLedgerStorage.flush() + // takes a newer checkpoint at mark(7, 200) journal.getLastLogMark().getCurMark().setLogMark(7, 200); CheckpointSource.Checkpoint cpSync = checkpointSource.newCheckpoint(); - // Step 5: SyncThread completes FIRST — checkpointComplete(mark=7, compact=true) - // This should: rollLog to 7, GC journals with id < 7 (deletes 3,4,5,6) + // Step 3: SingleDirectoryDbLedgerStorage.flush() completes its checkpoint FIRST + // checkpointComplete(mark=7, compact=true) + // rollLog to 7, garbage collects journals with id < 7 (deletes 3,4,5,6) checkpointSource.checkpointComplete(cpSync, true); LogMark markAfterSync = readLogMark(ledgerDirMark); - assertEquals("lastMark should be at 7 after SyncThread", 7, markAfterSync.getLogFileId()); + assertEquals("lastMark should be at 7 after inner flush", 7, markAfterSync.getLogFileId()); assertEquals(200, markAfterSync.getLogFileOffset()); - // Verify journals 3,4,5,6 were GC'd, 7,8 still exist + // Verify journals 3,4,5,6 were garbage collected, 7,8 still exist for (long id = 3; id <= 6; id++) { File journalFile = new File(journalDir, Long.toHexString(id) + ".txn"); - assertFalse("Journal " + id + " should have been GC'd", journalFile.exists()); + assertFalse("Journal " + id + " should have been garbage collected", journalFile.exists()); } for (long id = 7; id <= 8; id++) { File journalFile = new File(journalDir, Long.toHexString(id) + ".txn"); assertTrue("Journal " + id + " should still exist", journalFile.exists()); } - // Step 6: SDLS.flush() completes SECOND — checkpointComplete(mark=5, compact=true) - // WITHOUT FIX: rollLog would overwrite lastMark to 5, but journal 5 is already deleted! - // WITH FIX: mark 5 <= lastPersistedMark 7, so this is skipped entirely. - checkpointSource.checkpointComplete(cpFlush, true); + // Step 4: Control returns to SyncThread, which completes its OLDER checkpoint + // checkpointComplete(mark=5, compact=false). The compact value does not matter here — + // the regression is caused by rollLog overwriting the lastMark file backwards. + // WITHOUT FIX: rollLog overwrites lastMark to 5, but journal 5 is already deleted! + // WITH FIX: mark 5 < lastPersistedMark 7, so this is skipped entirely. + checkpointSource.checkpointComplete(cpFlush, false); // Verify: lastMark must NOT regress to 5. Should stay at 7. LogMark markAfterFlush = readLogMark(ledgerDirMark); From 10cc005682624fb8b1c2d96b9172f8deb9362c29 Mon Sep 17 00:00:00 2001 From: void-ptr974 Date: Wed, 8 Apr 2026 18:33:58 +0800 Subject: [PATCH 3/4] fix #4105 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When singleLedgerDirs=true, bookie can crash on restart with "Recovery log is missing" due to lastMark file being overwritten backwards. Root cause SyncThread.flush() has a nested call pattern that causes lastMark regression: SyncThread.flush(): 1. outerCheckpoint = newCheckpoint() → mark = (file 5, offset 100) 2. ledgerStorage.flush() → SingleDirectoryDbLedgerStorage.flush(): a. innerCheckpoint = newCheckpoint() → mark = (file 7, offset 200), journal has advanced b. flushes data to disk c. checkpointComplete(mark=7, compact=true) → persists lastMark=7, deletes journal files with id < 7 (including file 5) 3. checkpointComplete(mark=5, compact=false) → persists lastMark=5, overwriting the 7 written in step 2c 4. Bookie restarts → reads lastMark=5 → file 5 no longer exists → crash Step 1 captures the checkpoint before step 2 runs, so it is always older. Step 2c advances lastMark and garbage-collects old journals. Step 3 then overwrites lastMark backwards to a position whose journal file was already deleted. Conditions - singleLedgerDirs=true (journal and ledger on the same disk, so SingleDirectoryDbLedgerStorage.flush() calls checkpointComplete internally) - Journal file rotates between the two newCheckpoint() calls (requires sufficient write throughput) - maxBackupJournals small enough for old files to actually be deleted Fix Add a monotonic guard in checkpointComplete(): track the highest mark persisted so far, skip any call with an older mark. This prevents rollLog from overwriting lastMark backwards. --- .../bookie/storage/ldb/DbLedgerStorageTest.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java index c99c123a7cb..651ed02ef51 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java @@ -909,7 +909,7 @@ public void testSingleLedgerDirectoryCheckpointTriggerRemovePendingDeletedLedger * (5 < lastPersistedMark 7), so lastMark stays at 7 and the restart succeeds. */ @Test - public void testConcurrentCheckpointCompleteJournalMissing() throws Exception { + public void testNestedCheckpointCompleteLastMarkRegression() throws Exception { File baseDir = new File(tmpDir, "journalMissingTest"); File ledgerDir = new File(baseDir, "ledger"); File journalBaseDir = new File(baseDir, "journal"); @@ -947,17 +947,17 @@ public void testConcurrentCheckpointCompleteJournalMissing() throws Exception { // Step 1: SyncThread takes checkpoint at mark(5, 100) before calling ledgerStorage.flush() journal.getLastLogMark().getCurMark().setLogMark(5, 100); - CheckpointSource.Checkpoint cpFlush = checkpointSource.newCheckpoint(); + CheckpointSource.Checkpoint outerCheckpoint = checkpointSource.newCheckpoint(); // Step 2: Inside ledgerStorage.flush() → SingleDirectoryDbLedgerStorage.flush() // takes a newer checkpoint at mark(7, 200) journal.getLastLogMark().getCurMark().setLogMark(7, 200); - CheckpointSource.Checkpoint cpSync = checkpointSource.newCheckpoint(); + CheckpointSource.Checkpoint innerCheckpoint = checkpointSource.newCheckpoint(); // Step 3: SingleDirectoryDbLedgerStorage.flush() completes its checkpoint FIRST // checkpointComplete(mark=7, compact=true) // rollLog to 7, garbage collects journals with id < 7 (deletes 3,4,5,6) - checkpointSource.checkpointComplete(cpSync, true); + checkpointSource.checkpointComplete(innerCheckpoint, true); LogMark markAfterSync = readLogMark(ledgerDirMark); assertEquals("lastMark should be at 7 after inner flush", 7, markAfterSync.getLogFileId()); @@ -978,7 +978,7 @@ public void testConcurrentCheckpointCompleteJournalMissing() throws Exception { // the regression is caused by rollLog overwriting the lastMark file backwards. // WITHOUT FIX: rollLog overwrites lastMark to 5, but journal 5 is already deleted! // WITH FIX: mark 5 < lastPersistedMark 7, so this is skipped entirely. - checkpointSource.checkpointComplete(cpFlush, false); + checkpointSource.checkpointComplete(outerCheckpoint, false); // Verify: lastMark must NOT regress to 5. Should stay at 7. LogMark markAfterFlush = readLogMark(ledgerDirMark); @@ -986,7 +986,9 @@ public void testConcurrentCheckpointCompleteJournalMissing() throws Exception { 7, markAfterFlush.getLogFileId()); assertEquals(200, markAfterFlush.getLogFileOffset()); - // Step 7: Simulate bookie restart — read lastMark and check journal exists + // Step 5: Simulate bookie restart — reset curMark to (0,0) first so readLog() + // actually loads from disk rather than being a no-op due to the in-memory value. + journal.getLastLogMark().getCurMark().setLogMark(0, 0); journal.getLastLogMark().readLog(); LogMark restartMark = journal.getLastLogMark().getCurMark(); assertEquals("Reloaded lastMark should be 7", 7, restartMark.getLogFileId()); From 6f966bee62b95dd615f206d08f24ef234305bb1e Mon Sep 17 00:00:00 2001 From: void-ptr974 Date: Wed, 8 Apr 2026 19:40:14 +0800 Subject: [PATCH 4/4] fix #4105 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When singleLedgerDirs=true, bookie can crash on restart with "Recovery log is missing" due to lastMark file being overwritten backwards. Root cause SyncThread.flush() has a nested call pattern that causes lastMark regression: SyncThread.flush(): 1. outerCheckpoint = newCheckpoint() → mark = (file 5, offset 100) 2. ledgerStorage.flush() → SingleDirectoryDbLedgerStorage.flush(): a. innerCheckpoint = newCheckpoint() → mark = (file 7, offset 200), journal has advanced b. flushes data to disk c. checkpointComplete(mark=7, compact=true) → persists lastMark=7, deletes journal files with id < 7 (including file 5) 3. checkpointComplete(mark=5, compact=false) → persists lastMark=5, overwriting the 7 written in step 2c 4. Bookie restarts → reads lastMark=5 → file 5 no longer exists → crash Step 1 captures the checkpoint before step 2 runs, so it is always older. Step 2c advances lastMark and garbage-collects old journals. Step 3 then overwrites lastMark backwards to a position whose journal file was already deleted. Conditions - singleLedgerDirs=true (journal and ledger on the same disk, so SingleDirectoryDbLedgerStorage.flush() calls checkpointComplete internally) - Journal file rotates between the two newCheckpoint() calls (requires sufficient write throughput) - maxBackupJournals small enough for old files to actually be deleted Fix Add a monotonic guard in checkpointComplete(): track the highest mark persisted so far, skip any call with an older mark. This prevents rollLog from overwriting lastMark backwards. --- .../org/apache/bookkeeper/bookie/Journal.java | 16 ++--- .../storage/ldb/DbLedgerStorageTest.java | 62 +++++++------------ 2 files changed, 27 insertions(+), 51 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index 3af2f219121..76f5ae6ad57 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -623,12 +623,7 @@ static void writePaddingBytes(JournalChannel jc, ByteBuf paddingBuffer, int jour private final LastLogMark lastLogMark = new LastLogMark(0, 0); - // Guards checkpointComplete to ensure lastMark only advances forward. - // When singleLedgerDirs=true, SyncThread.flush() takes a checkpoint, then calls - // ledgerStorage.flush() → SingleDirectoryDbLedgerStorage.flush(), which internally - // takes a NEWER checkpoint and completes it (with garbage collection). When control - // returns, SyncThread completes its own OLDER checkpoint, overwriting lastMark backwards - // to a journal file already deleted by the inner garbage collection. + // Ensures lastMark only advances forward across concurrent checkpointComplete calls. private final Object checkpointLock = new Object(); private final LogMark lastPersistedMark = new LogMark(0, 0); @@ -775,9 +770,8 @@ public Checkpoint newCheckpoint() { /** * Telling journal a checkpoint is finished. * - *

If the given checkpoint is not newer than the last persisted mark, the call is - * a no-op. This monotonic guarantee prevents lastMark from regressing backwards. - * See the comment on {@code checkpointLock} for the full scenario. + *

Skips if the checkpoint is not newer than the last persisted mark, + * preventing lastMark from regressing backwards. * * @throws IOException */ @@ -789,9 +783,7 @@ public void checkpointComplete(Checkpoint checkpoint, boolean compact) throws IO LogMarkCheckpoint lmcheckpoint = (LogMarkCheckpoint) checkpoint; LastLogMark mark = lmcheckpoint.mark; - // Monotonic check: skip if this mark is not newer than what was already persisted. - // This prevents lastMark regression from nested checkpointComplete calls - // (see class-level comment on checkpointLock for the full scenario). + // Skip if this mark is not newer than what was already persisted. synchronized (checkpointLock) { if (mark.getCurMark().compare(lastPersistedMark) < 0) { return; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java index 651ed02ef51..6ab136e1349 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java @@ -884,32 +884,13 @@ public void testSingleLedgerDirectoryCheckpointTriggerRemovePendingDeletedLedger } /** - * Simulates the journal-missing scenario caused by lastMark regression in single-dir - * mode (singleLedgerDirs=true) and verifies the monotonic fix prevents it. - * - *

The trigger is a single-thread nested call within SyncThread.flush(): - *

-     * SyncThread.flush():
-     *   1. outerCheckpoint = newCheckpoint()          → captures mark(5, 100)
-     *   2. ledgerStorage.flush()                      → delegates to SingleDirectoryDbLedgerStorage.flush():
-     *      2a. innerCheckpoint = newCheckpoint()      → captures mark(7, 200), journal has advanced
-     *      2b. checkpoint(innerCheckpoint)            → flushes data to disk
-     *      2c. checkpointComplete(innerCheckpoint=7, compact=true)
-     *          → rollLog(7), garbage collects journal files 3,4,5,6 (id < 7)
-     *   3. checkpointComplete(outerCheckpoint=5, compact=false)
-     *      → rollLog(5) — OVERWRITES lastMark backwards from 7 to 5!
-     *      → journal file 5 was already deleted in step 2c
-     *   4. Bookie restarts: reads lastMark=5, looks for journal 5 → MISSING!
-     *      → throws "Recovery log 5 is missing"
-     * 
- * - *

This test simulates the core regression: a newer checkpoint is completed first - * (with garbage collection), then an older checkpoint attempts to overwrite lastMark - * backwards. With the monotonic fix, the older mark is skipped - * (5 < lastPersistedMark 7), so lastMark stays at 7 and the restart succeeds. + * Verifies that the monotonic guard in checkpointComplete prevents lastMark regression. + * A newer checkpoint is completed first (with garbage collection deleting old journals), + * then a stale checkpoint attempts to overwrite lastMark backwards. The fix skips the + * stale mark, keeping lastMark at the newer position so restart succeeds. */ @Test - public void testNestedCheckpointCompleteLastMarkRegression() throws Exception { + public void testConcurrentCheckpointCompleteLastMarkRegression() throws Exception { File baseDir = new File(tmpDir, "journalMissingTest"); File ledgerDir = new File(baseDir, "ledger"); File journalBaseDir = new File(baseDir, "journal"); @@ -943,25 +924,28 @@ public void testNestedCheckpointCompleteLastMarkRegression() throws Exception { CheckpointSource checkpointSource = new CheckpointSourceList(bookie.getJournals()); - // === Simulate the lastMark regression === + // === Simulate concurrent checkpointComplete causing lastMark regression === - // Step 1: SyncThread takes checkpoint at mark(5, 100) before calling ledgerStorage.flush() + // Step 1: SyncThread.checkpoint() captures a checkpoint before heavy flush I/O. + // This mark becomes stale during the I/O as the journal continues advancing. journal.getLastLogMark().getCurMark().setLogMark(5, 100); - CheckpointSource.Checkpoint outerCheckpoint = checkpointSource.newCheckpoint(); + CheckpointSource.Checkpoint staleCheckpoint = checkpointSource.newCheckpoint(); - // Step 2: Inside ledgerStorage.flush() → SingleDirectoryDbLedgerStorage.flush() - // takes a newer checkpoint at mark(7, 200) + // Step 2: During SyncThread's I/O, triggerFlushAndAddEntry fires on a separate + // thread. SingleDirectoryDbLedgerStorage.flush() captures a newer checkpoint + // (the journal has advanced to file 7 during the I/O). journal.getLastLogMark().getCurMark().setLogMark(7, 200); - CheckpointSource.Checkpoint innerCheckpoint = checkpointSource.newCheckpoint(); + CheckpointSource.Checkpoint newerCheckpoint = checkpointSource.newCheckpoint(); - // Step 3: SingleDirectoryDbLedgerStorage.flush() completes its checkpoint FIRST + // Step 3: After SyncThread releases flushMutex, the SingleDirectoryDbLedgerStorage + // executor wins the race and calls checkpointComplete first. // checkpointComplete(mark=7, compact=true) // rollLog to 7, garbage collects journals with id < 7 (deletes 3,4,5,6) - checkpointSource.checkpointComplete(innerCheckpoint, true); + checkpointSource.checkpointComplete(newerCheckpoint, true); - LogMark markAfterSync = readLogMark(ledgerDirMark); - assertEquals("lastMark should be at 7 after inner flush", 7, markAfterSync.getLogFileId()); - assertEquals(200, markAfterSync.getLogFileOffset()); + LogMark markAfterNewer = readLogMark(ledgerDirMark); + assertEquals("lastMark should be at 7 after newer checkpoint", 7, markAfterNewer.getLogFileId()); + assertEquals(200, markAfterNewer.getLogFileOffset()); // Verify journals 3,4,5,6 were garbage collected, 7,8 still exist for (long id = 3; id <= 6; id++) { @@ -973,12 +957,12 @@ public void testNestedCheckpointCompleteLastMarkRegression() throws Exception { assertTrue("Journal " + id + " should still exist", journalFile.exists()); } - // Step 4: Control returns to SyncThread, which completes its OLDER checkpoint - // checkpointComplete(mark=5, compact=false). The compact value does not matter here — - // the regression is caused by rollLog overwriting the lastMark file backwards. + // Step 4: SyncThread then calls checkpointComplete with its stale mark. + // The compact value does not matter — the regression is caused by rollLog + // overwriting the lastMark file backwards. // WITHOUT FIX: rollLog overwrites lastMark to 5, but journal 5 is already deleted! // WITH FIX: mark 5 < lastPersistedMark 7, so this is skipped entirely. - checkpointSource.checkpointComplete(outerCheckpoint, false); + checkpointSource.checkpointComplete(staleCheckpoint, true); // Verify: lastMark must NOT regress to 5. Should stay at 7. LogMark markAfterFlush = readLogMark(ledgerDirMark);