From a14f6f195ca88f789cf06bce5f7c0042d07df53c Mon Sep 17 00:00:00 2001 From: jkonisa Date: Wed, 11 Mar 2026 17:51:35 -0700 Subject: [PATCH] CASSANALYTICS-129 : Fix ReadStatusTracker to distinguish clean completion from error termination in BufferingCommitLogReader --- CHANGES.txt | 1 + .../commitlog/BufferingCommitLogReader.java | 26 ++++++++++++++++--- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index e987053d..a4bca96e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 0.4.0 ----- + * Fix ReadStatusTracker to distinguish clean completion from error termination in BufferingCommitLogReader (CASSANALYTICS-129) * Fixing CdcTests.testMockedCdc broken due to incorrect position update in BufferingCommitLogReader (CASSANALYTICS-127) * Commitlog reading not progressing in CDC due to incorrect CommitLogReader.isFullyRead (CASSANALYTICS-124) * Incorrect hash code calculation in PartitionUpdateWrapper.Digest (CASSANALYTICS-125) diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java index d6476087..b5e65f4b 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java @@ -316,6 +316,14 @@ private void readCommitLogSegment() throws IOException break; } } + + // If the segment is flagged complete in the index but no end-of-segment marker was + // encountered, advance position to maxOffset so isFullyRead() returns true and CDC + // can move on to the next commit log. + if (log.completed() && statusTracker.noErrors()) + { + this.position = (int) log.maxOffset(); + } } // Unfortunately CommitLogSegmentReader.SegmentIterator (for-loop) cannot throw a checked exception, // so we check to see if a RuntimeException is wrapping an IOException. @@ -425,7 +433,7 @@ private void readSection(FileDataInput reader, if (end - reader.getFilePointer() < 4) { logger.trace("Not enough bytes left for another mutation in this CommitLog section, continuing"); - statusTracker.requestTermination(); + statusTracker.markCleanCompletion(); return; } @@ -437,7 +445,7 @@ private void readSection(FileDataInput reader, // Mark the log as fully consumed so isFullyRead() returns true. // The guard above ensures this is not overridden after readSection returns. this.position = (int) log.maxOffset(); - statusTracker.requestTermination(); + statusTracker.markCleanCompletion(); return; } @@ -673,10 +681,12 @@ private static class ReadStatusTracker public String errorContext = ""; public boolean tolerateErrorsInSection; private boolean error; + private boolean cleanCompletion; private ReadStatusTracker(int mutationLimit, boolean tolerateErrorsInSection) { this.error = false; + this.cleanCompletion = false; this.mutationsLeft = mutationLimit; this.tolerateErrorsInSection = tolerateErrorsInSection; } @@ -692,13 +702,23 @@ public void addProcessedMutation() public boolean shouldContinue() { - return !error && mutationsLeft != 0; + return !error && !cleanCompletion && mutationsLeft != 0; + } + + public boolean noErrors() + { + return !error; } public void requestTermination() { error = true; } + + public void markCleanCompletion() + { + cleanCompletion = true; + } } /**