diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index 2d68b8d2da8..76f5ae6ad57 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -623,6 +623,10 @@ static void writePaddingBytes(JournalChannel jc, ByteBuf paddingBuffer, int jour private final LastLogMark lastLogMark = new LastLogMark(0, 0); + // Ensures lastMark only advances forward across concurrent checkpointComplete calls. + private final Object checkpointLock = new Object(); + private final LogMark lastPersistedMark = new LogMark(0, 0); + private static final String LAST_MARK_DEFAULT_NAME = "lastMark"; private final String lastMarkFileName; @@ -766,6 +770,9 @@ public Checkpoint newCheckpoint() { /** * Telling journal a checkpoint is finished. * + *

Skips if the checkpoint is not newer than the last persisted mark, + * preventing lastMark from regressing backwards. + * * @throws IOException */ @Override @@ -776,6 +783,15 @@ public void checkpointComplete(Checkpoint checkpoint, boolean compact) throws IO LogMarkCheckpoint lmcheckpoint = (LogMarkCheckpoint) checkpoint; LastLogMark mark = lmcheckpoint.mark; + // Skip if this mark is not newer than what was already persisted. + synchronized (checkpointLock) { + if (mark.getCurMark().compare(lastPersistedMark) < 0) { + return; + } + lastPersistedMark.setLogMark( + mark.getCurMark().getLogFileId(), mark.getCurMark().getLogFileOffset()); + } + mark.rollLog(mark); if (compact) { // list the journals that have been marked diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java index dfc2459678b..6ab136e1349 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java @@ -45,6 +45,7 @@ import org.apache.bookkeeper.bookie.CheckpointSourceList; import org.apache.bookkeeper.bookie.DefaultEntryLogger; import org.apache.bookkeeper.bookie.EntryLocation; +import org.apache.bookkeeper.bookie.Journal; import org.apache.bookkeeper.bookie.LedgerDirsManager; import org.apache.bookkeeper.bookie.LedgerStorage; import org.apache.bookkeeper.bookie.LogMark; @@ -881,4 +882,114 @@ public void testSingleLedgerDirectoryCheckpointTriggerRemovePendingDeletedLedger bookie.getLedgerStorage().flush(); Assert.assertEquals(pendingDeletedLedgers.size(), 0); } + + /** + * Verifies that the monotonic guard in checkpointComplete prevents lastMark regression. + * A newer checkpoint is completed first (with garbage collection deleting old journals), + * then a stale checkpoint attempts to overwrite lastMark backwards. The fix skips the + * stale mark, keeping lastMark at the newer position so restart succeeds. + */ + @Test + public void testConcurrentCheckpointCompleteLastMarkRegression() throws Exception { + File baseDir = new File(tmpDir, "journalMissingTest"); + File ledgerDir = new File(baseDir, "ledger"); + File journalBaseDir = new File(baseDir, "journal"); + ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); + conf.setGcWaitTime(1000); + conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 4); + conf.setProperty(DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB, 4); + conf.setLedgerStorageClass(DbLedgerStorage.class.getName()); + conf.setLedgerDirNames(new String[] { ledgerDir.getCanonicalPath() }); + conf.setJournalDirName(journalBaseDir.getCanonicalPath()); + // Set maxBackupJournals to 0 so garbage collection aggressively deletes all old journals + conf.setMaxBackupJournals(0); + + BookieImpl bookie = new TestBookieImpl(conf); + try { + Journal journal = bookie.getJournals().get(0); + File journalDir = journal.getJournalDirectory(); + File ledgerDirMark = new File(ledgerDir + "/current", "lastMark"); + + // Create fake journal files: 3.txn, 4.txn, 5.txn, 6.txn, 7.txn, 8.txn + for (long id = 3; id <= 8; id++) { + File journalFile = new File(journalDir, Long.toHexString(id) + ".txn"); + assertTrue("Failed to create journal file " + id, journalFile.createNewFile()); + } + + // Verify all journal files exist + for (long id = 3; id <= 8; id++) { + File journalFile = new File(journalDir, Long.toHexString(id) + ".txn"); + assertTrue("Journal file " + id + " should exist", journalFile.exists()); + } + + CheckpointSource checkpointSource = new CheckpointSourceList(bookie.getJournals()); + + // === Simulate concurrent checkpointComplete causing lastMark regression === + + // Step 1: SyncThread.checkpoint() captures a checkpoint before heavy flush I/O. + // This mark becomes stale during the I/O as the journal continues advancing. + journal.getLastLogMark().getCurMark().setLogMark(5, 100); + CheckpointSource.Checkpoint staleCheckpoint = checkpointSource.newCheckpoint(); + + // Step 2: During SyncThread's I/O, triggerFlushAndAddEntry fires on a separate + // thread. SingleDirectoryDbLedgerStorage.flush() captures a newer checkpoint + // (the journal has advanced to file 7 during the I/O). + journal.getLastLogMark().getCurMark().setLogMark(7, 200); + CheckpointSource.Checkpoint newerCheckpoint = checkpointSource.newCheckpoint(); + + // Step 3: After SyncThread releases flushMutex, the SingleDirectoryDbLedgerStorage + // executor wins the race and calls checkpointComplete first. + // checkpointComplete(mark=7, compact=true) + // rollLog to 7, garbage collects journals with id < 7 (deletes 3,4,5,6) + checkpointSource.checkpointComplete(newerCheckpoint, true); + + LogMark markAfterNewer = readLogMark(ledgerDirMark); + assertEquals("lastMark should be at 7 after newer checkpoint", 7, markAfterNewer.getLogFileId()); + assertEquals(200, markAfterNewer.getLogFileOffset()); + + // Verify journals 3,4,5,6 were garbage collected, 7,8 still exist + for (long id = 3; id <= 6; id++) { + File journalFile = new File(journalDir, Long.toHexString(id) + ".txn"); + assertFalse("Journal " + id + " should have been garbage collected", journalFile.exists()); + } + for (long id = 7; id <= 8; id++) { + File journalFile = new File(journalDir, Long.toHexString(id) + ".txn"); + assertTrue("Journal " + id + " should still exist", journalFile.exists()); + } + + // Step 4: SyncThread then calls checkpointComplete with its stale mark. + // The compact value does not matter — the regression is caused by rollLog + // overwriting the lastMark file backwards. + // WITHOUT FIX: rollLog overwrites lastMark to 5, but journal 5 is already deleted! + // WITH FIX: mark 5 < lastPersistedMark 7, so this is skipped entirely. + checkpointSource.checkpointComplete(staleCheckpoint, true); + + // Verify: lastMark must NOT regress to 5. Should stay at 7. + LogMark markAfterFlush = readLogMark(ledgerDirMark); + assertEquals("lastMark must not regress after older checkpoint completes", + 7, markAfterFlush.getLogFileId()); + assertEquals(200, markAfterFlush.getLogFileOffset()); + + // Step 5: Simulate bookie restart — reset curMark to (0,0) first so readLog() + // actually loads from disk rather than being a no-op due to the in-memory value. + journal.getLastLogMark().getCurMark().setLogMark(0, 0); + journal.getLastLogMark().readLog(); + LogMark restartMark = journal.getLastLogMark().getCurMark(); + assertEquals("Reloaded lastMark should be 7", 7, restartMark.getLogFileId()); + + // The journal file pointed to by lastMark must exist + File markedJournal = new File(journalDir, + Long.toHexString(restartMark.getLogFileId()) + ".txn"); + assertTrue("Journal file " + restartMark.getLogFileId() + " must exist for recovery", + markedJournal.exists()); + + // Verify that listJournalIds finds the expected journal for replay + List replayLogs = Journal.listJournalIds(journalDir, + journalId -> journalId >= restartMark.getLogFileId()); + assertTrue("Replay journal list must contain the marked journal", + replayLogs.size() > 0 && replayLogs.get(0) == restartMark.getLogFileId()); + } finally { + bookie.getLedgerStorage().shutdown(); + } + } }