diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index 2d68b8d2da8..76f5ae6ad57 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -623,6 +623,10 @@ static void writePaddingBytes(JournalChannel jc, ByteBuf paddingBuffer, int jour private final LastLogMark lastLogMark = new LastLogMark(0, 0); + // Ensures lastMark only advances forward across concurrent checkpointComplete calls. + private final Object checkpointLock = new Object(); + private final LogMark lastPersistedMark = new LogMark(0, 0); + private static final String LAST_MARK_DEFAULT_NAME = "lastMark"; private final String lastMarkFileName; @@ -766,6 +770,9 @@ public Checkpoint newCheckpoint() { /** * Telling journal a checkpoint is finished. * + *
Skips if the checkpoint is not newer than the last persisted mark,
+ * preventing lastMark from regressing backwards.
+ *
* @throws IOException
*/
@Override
@@ -776,6 +783,15 @@ public void checkpointComplete(Checkpoint checkpoint, boolean compact) throws IO
LogMarkCheckpoint lmcheckpoint = (LogMarkCheckpoint) checkpoint;
LastLogMark mark = lmcheckpoint.mark;
+ // Skip if this mark is not newer than what was already persisted.
+ synchronized (checkpointLock) {
+ if (mark.getCurMark().compare(lastPersistedMark) < 0) {
+ return;
+ }
+ lastPersistedMark.setLogMark(
+ mark.getCurMark().getLogFileId(), mark.getCurMark().getLogFileOffset());
+ }
+
mark.rollLog(mark);
if (compact) {
// list the journals that have been marked
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
index dfc2459678b..6ab136e1349 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
@@ -45,6 +45,7 @@
import org.apache.bookkeeper.bookie.CheckpointSourceList;
import org.apache.bookkeeper.bookie.DefaultEntryLogger;
import org.apache.bookkeeper.bookie.EntryLocation;
+import org.apache.bookkeeper.bookie.Journal;
import org.apache.bookkeeper.bookie.LedgerDirsManager;
import org.apache.bookkeeper.bookie.LedgerStorage;
import org.apache.bookkeeper.bookie.LogMark;
@@ -881,4 +882,114 @@ public void testSingleLedgerDirectoryCheckpointTriggerRemovePendingDeletedLedger
bookie.getLedgerStorage().flush();
Assert.assertEquals(pendingDeletedLedgers.size(), 0);
}
+
+ /**
+ * Verifies that the monotonic guard in checkpointComplete prevents lastMark regression.
+ * A newer checkpoint is completed first (with garbage collection deleting old journals),
+ * then a stale checkpoint attempts to overwrite lastMark backwards. The fix skips the
+ * stale mark, keeping lastMark at the newer position so restart succeeds.
+ */
+ @Test
+ public void testConcurrentCheckpointCompleteLastMarkRegression() throws Exception {
+ File baseDir = new File(tmpDir, "journalMissingTest");
+ File ledgerDir = new File(baseDir, "ledger");
+ File journalBaseDir = new File(baseDir, "journal");
+ ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+ conf.setGcWaitTime(1000);
+ conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 4);
+ conf.setProperty(DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB, 4);
+ conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+ conf.setLedgerDirNames(new String[] { ledgerDir.getCanonicalPath() });
+ conf.setJournalDirName(journalBaseDir.getCanonicalPath());
+ // Set maxBackupJournals to 0 so garbage collection aggressively deletes all old journals
+ conf.setMaxBackupJournals(0);
+
+ BookieImpl bookie = new TestBookieImpl(conf);
+ try {
+ Journal journal = bookie.getJournals().get(0);
+ File journalDir = journal.getJournalDirectory();
+ File ledgerDirMark = new File(ledgerDir + "/current", "lastMark");
+
+ // Create fake journal files: 3.txn, 4.txn, 5.txn, 6.txn, 7.txn, 8.txn
+ for (long id = 3; id <= 8; id++) {
+ File journalFile = new File(journalDir, Long.toHexString(id) + ".txn");
+ assertTrue("Failed to create journal file " + id, journalFile.createNewFile());
+ }
+
+ // Verify all journal files exist
+ for (long id = 3; id <= 8; id++) {
+ File journalFile = new File(journalDir, Long.toHexString(id) + ".txn");
+ assertTrue("Journal file " + id + " should exist", journalFile.exists());
+ }
+
+ CheckpointSource checkpointSource = new CheckpointSourceList(bookie.getJournals());
+
+ // === Simulate concurrent checkpointComplete causing lastMark regression ===
+
+ // Step 1: SyncThread.checkpoint() captures a checkpoint before heavy flush I/O.
+ // This mark becomes stale during the I/O as the journal continues advancing.
+ journal.getLastLogMark().getCurMark().setLogMark(5, 100);
+ CheckpointSource.Checkpoint staleCheckpoint = checkpointSource.newCheckpoint();
+
+ // Step 2: During SyncThread's I/O, triggerFlushAndAddEntry fires on a separate
+ // thread. SingleDirectoryDbLedgerStorage.flush() captures a newer checkpoint
+ // (the journal has advanced to file 7 during the I/O).
+ journal.getLastLogMark().getCurMark().setLogMark(7, 200);
+ CheckpointSource.Checkpoint newerCheckpoint = checkpointSource.newCheckpoint();
+
+ // Step 3: After SyncThread releases flushMutex, the SingleDirectoryDbLedgerStorage
+ // executor wins the race and calls checkpointComplete first.
+ // checkpointComplete(mark=7, compact=true)
+ // rollLog to 7, garbage collects journals with id < 7 (deletes 3,4,5,6)
+ checkpointSource.checkpointComplete(newerCheckpoint, true);
+
+ LogMark markAfterNewer = readLogMark(ledgerDirMark);
+ assertEquals("lastMark should be at 7 after newer checkpoint", 7, markAfterNewer.getLogFileId());
+ assertEquals(200, markAfterNewer.getLogFileOffset());
+
+ // Verify journals 3,4,5,6 were garbage collected, 7,8 still exist
+ for (long id = 3; id <= 6; id++) {
+ File journalFile = new File(journalDir, Long.toHexString(id) + ".txn");
+ assertFalse("Journal " + id + " should have been garbage collected", journalFile.exists());
+ }
+ for (long id = 7; id <= 8; id++) {
+ File journalFile = new File(journalDir, Long.toHexString(id) + ".txn");
+ assertTrue("Journal " + id + " should still exist", journalFile.exists());
+ }
+
+ // Step 4: SyncThread then calls checkpointComplete with its stale mark.
+ // The compact value does not matter — the regression is caused by rollLog
+ // overwriting the lastMark file backwards.
+ // WITHOUT FIX: rollLog overwrites lastMark to 5, but journal 5 is already deleted!
+ // WITH FIX: mark 5 < lastPersistedMark 7, so this is skipped entirely.
+ checkpointSource.checkpointComplete(staleCheckpoint, true);
+
+ // Verify: lastMark must NOT regress to 5. Should stay at 7.
+ LogMark markAfterFlush = readLogMark(ledgerDirMark);
+ assertEquals("lastMark must not regress after older checkpoint completes",
+ 7, markAfterFlush.getLogFileId());
+ assertEquals(200, markAfterFlush.getLogFileOffset());
+
+ // Step 5: Simulate bookie restart — reset curMark to (0,0) first so readLog()
+ // actually loads from disk rather than being a no-op due to the in-memory value.
+ journal.getLastLogMark().getCurMark().setLogMark(0, 0);
+ journal.getLastLogMark().readLog();
+ LogMark restartMark = journal.getLastLogMark().getCurMark();
+ assertEquals("Reloaded lastMark should be 7", 7, restartMark.getLogFileId());
+
+ // The journal file pointed to by lastMark must exist
+ File markedJournal = new File(journalDir,
+ Long.toHexString(restartMark.getLogFileId()) + ".txn");
+ assertTrue("Journal file " + restartMark.getLogFileId() + " must exist for recovery",
+ markedJournal.exists());
+
+ // Verify that listJournalIds finds the expected journal for replay
+ List