Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,10 @@ static void writePaddingBytes(JournalChannel jc, ByteBuf paddingBuffer, int jour

private final LastLogMark lastLogMark = new LastLogMark(0, 0);

// Ensures lastMark only advances forward across concurrent checkpointComplete calls.
private final Object checkpointLock = new Object();
private final LogMark lastPersistedMark = new LogMark(0, 0);

private static final String LAST_MARK_DEFAULT_NAME = "lastMark";

private final String lastMarkFileName;
Expand Down Expand Up @@ -766,6 +770,9 @@ public Checkpoint newCheckpoint() {
/**
* Telling journal a checkpoint is finished.
*
* <p>Skips if the checkpoint is not newer than the last persisted mark,
* preventing lastMark from regressing backwards.
*
* @throws IOException
*/
@Override
Expand All @@ -776,6 +783,15 @@ public void checkpointComplete(Checkpoint checkpoint, boolean compact) throws IO
LogMarkCheckpoint lmcheckpoint = (LogMarkCheckpoint) checkpoint;
LastLogMark mark = lmcheckpoint.mark;

// Skip if this mark is not newer than what was already persisted.
synchronized (checkpointLock) {
if (mark.getCurMark().compare(lastPersistedMark) < 0) {
return;
}
lastPersistedMark.setLogMark(
mark.getCurMark().getLogFileId(), mark.getCurMark().getLogFileOffset());
}

mark.rollLog(mark);
Comment thread
void-ptr974 marked this conversation as resolved.
if (compact) {
// list the journals that have been marked
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.bookkeeper.bookie.CheckpointSourceList;
import org.apache.bookkeeper.bookie.DefaultEntryLogger;
import org.apache.bookkeeper.bookie.EntryLocation;
import org.apache.bookkeeper.bookie.Journal;
import org.apache.bookkeeper.bookie.LedgerDirsManager;
import org.apache.bookkeeper.bookie.LedgerStorage;
import org.apache.bookkeeper.bookie.LogMark;
Expand Down Expand Up @@ -881,4 +882,114 @@ public void testSingleLedgerDirectoryCheckpointTriggerRemovePendingDeletedLedger
bookie.getLedgerStorage().flush();
Assert.assertEquals(pendingDeletedLedgers.size(), 0);
}

/**
* Verifies that the monotonic guard in checkpointComplete prevents lastMark regression.
* A newer checkpoint is completed first (with garbage collection deleting old journals),
* then a stale checkpoint attempts to overwrite lastMark backwards. The fix skips the
* stale mark, keeping lastMark at the newer position so restart succeeds.
*/
@Test
public void testConcurrentCheckpointCompleteLastMarkRegression() throws Exception {
File baseDir = new File(tmpDir, "journalMissingTest");
File ledgerDir = new File(baseDir, "ledger");
File journalBaseDir = new File(baseDir, "journal");
ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
conf.setGcWaitTime(1000);
conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 4);
conf.setProperty(DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB, 4);
conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
conf.setLedgerDirNames(new String[] { ledgerDir.getCanonicalPath() });
conf.setJournalDirName(journalBaseDir.getCanonicalPath());
// Set maxBackupJournals to 0 so garbage collection aggressively deletes all old journals
conf.setMaxBackupJournals(0);

BookieImpl bookie = new TestBookieImpl(conf);
try {
Journal journal = bookie.getJournals().get(0);
File journalDir = journal.getJournalDirectory();
File ledgerDirMark = new File(ledgerDir + "/current", "lastMark");

// Create fake journal files: 3.txn, 4.txn, 5.txn, 6.txn, 7.txn, 8.txn
for (long id = 3; id <= 8; id++) {
File journalFile = new File(journalDir, Long.toHexString(id) + ".txn");
assertTrue("Failed to create journal file " + id, journalFile.createNewFile());
}

// Verify all journal files exist
for (long id = 3; id <= 8; id++) {
File journalFile = new File(journalDir, Long.toHexString(id) + ".txn");
assertTrue("Journal file " + id + " should exist", journalFile.exists());
}

CheckpointSource checkpointSource = new CheckpointSourceList(bookie.getJournals());

// === Simulate concurrent checkpointComplete causing lastMark regression ===

// Step 1: SyncThread.checkpoint() captures a checkpoint before heavy flush I/O.
// This mark becomes stale during the I/O as the journal continues advancing.
journal.getLastLogMark().getCurMark().setLogMark(5, 100);
CheckpointSource.Checkpoint staleCheckpoint = checkpointSource.newCheckpoint();

// Step 2: During SyncThread's I/O, triggerFlushAndAddEntry fires on a separate
// thread. SingleDirectoryDbLedgerStorage.flush() captures a newer checkpoint
// (the journal has advanced to file 7 during the I/O).
journal.getLastLogMark().getCurMark().setLogMark(7, 200);
CheckpointSource.Checkpoint newerCheckpoint = checkpointSource.newCheckpoint();

// Step 3: After SyncThread releases flushMutex, the SingleDirectoryDbLedgerStorage
// executor wins the race and calls checkpointComplete first.
// checkpointComplete(mark=7, compact=true)
// rollLog to 7, garbage collects journals with id < 7 (deletes 3,4,5,6)
checkpointSource.checkpointComplete(newerCheckpoint, true);

LogMark markAfterNewer = readLogMark(ledgerDirMark);
assertEquals("lastMark should be at 7 after newer checkpoint", 7, markAfterNewer.getLogFileId());
assertEquals(200, markAfterNewer.getLogFileOffset());

// Verify journals 3,4,5,6 were garbage collected, 7,8 still exist
for (long id = 3; id <= 6; id++) {
File journalFile = new File(journalDir, Long.toHexString(id) + ".txn");
assertFalse("Journal " + id + " should have been garbage collected", journalFile.exists());
}
for (long id = 7; id <= 8; id++) {
File journalFile = new File(journalDir, Long.toHexString(id) + ".txn");
assertTrue("Journal " + id + " should still exist", journalFile.exists());
}

// Step 4: SyncThread then calls checkpointComplete with its stale mark.
// The compact value does not matter — the regression is caused by rollLog
// overwriting the lastMark file backwards.
// WITHOUT FIX: rollLog overwrites lastMark to 5, but journal 5 is already deleted!
// WITH FIX: mark 5 < lastPersistedMark 7, so this is skipped entirely.
checkpointSource.checkpointComplete(staleCheckpoint, true);

Comment thread
void-ptr974 marked this conversation as resolved.
// Verify: lastMark must NOT regress to 5. Should stay at 7.
LogMark markAfterFlush = readLogMark(ledgerDirMark);
assertEquals("lastMark must not regress after older checkpoint completes",
7, markAfterFlush.getLogFileId());
assertEquals(200, markAfterFlush.getLogFileOffset());

// Step 5: Simulate bookie restart — reset curMark to (0,0) first so readLog()
// actually loads from disk rather than being a no-op due to the in-memory value.
journal.getLastLogMark().getCurMark().setLogMark(0, 0);
journal.getLastLogMark().readLog();
LogMark restartMark = journal.getLastLogMark().getCurMark();
assertEquals("Reloaded lastMark should be 7", 7, restartMark.getLogFileId());

// The journal file pointed to by lastMark must exist
File markedJournal = new File(journalDir,
Long.toHexString(restartMark.getLogFileId()) + ".txn");
assertTrue("Journal file " + restartMark.getLogFileId() + " must exist for recovery",
markedJournal.exists());

// Verify that listJournalIds finds the expected journal for replay
List<Long> replayLogs = Journal.listJournalIds(journalDir,
journalId -> journalId >= restartMark.getLogFileId());
assertTrue("Replay journal list must contain the marked journal",
replayLogs.size() > 0 && replayLogs.get(0) == restartMark.getLogFileId());
} finally {
bookie.getLedgerStorage().shutdown();
Comment thread
void-ptr974 marked this conversation as resolved.
}
}
}
Loading