Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
0.4.0
-----
* Fix ReadStatusTracker to distinguish clean completion from error termination in BufferingCommitLogReader (CASSANALYTICS-129)
* Fixing CdcTests.testMockedCdc broken due to incorrect position update in BufferingCommitLogReader (CASSANALYTICS-127)
* Commitlog reading not progressing in CDC due to incorrect CommitLogReader.isFullyRead (CASSANALYTICS-124)
* Incorrect hash code calculation in PartitionUpdateWrapper.Digest (CASSANALYTICS-125)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,14 @@ private void readCommitLogSegment() throws IOException
break;
}
}

// If the segment is flagged complete in the index but no end-of-segment marker was

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So my understanding of this is that we have both:
a. a path that can terminate gracefully with the LEGACY_END_OF_SEGMENT_MARKER, and
b. a path that terminates gracefully when our idx file says we're completed and we trust it

We have an issue where above in the code we have if we're < 4 bytes from the end of a segment on a legacy CommitLog - this isn't an error case at all but if we find it, we flag statusTracker to error:

                // We rely on reading serialized size == 0 (LEGACY_END_OF_SEGMENT_MARKER) to identify the end
                // of a segment, which happens naturally due to the 0 padding of the empty segment on creation.
                // However, it's possible with 2.1 era commitlogs that the last mutation ended less than 4 bytes
                // from the end of the file, which means that we'll be unable to read an a full int and instead
                // read an EOF here
                if (end - reader.getFilePointer() < 4)
                {
                    logger.trace("Not enough bytes left for another mutation in this CommitLog section, continuing");
                    statusTracker.requestTermination();
                    return;
                }

So in this case, we'll requestTermination on the statusTracker which sets:

        public void requestTermination()
        {
            error = true;
        }

That'll then run afoul of the logic you have here, since the statusTracker.shouldContinue() will return false and thus we not set position to log.maxOffset. Overloading the statusTracker.requestTermination to mean both "things are fine" and "things are failed" is making this incorrect I think.

If we augment ReadStatusTracker to have both a markAsError() method that sets error = true; and a separate markCleanCompletion() that sets another boolean, we'll cleanly work around this situation and can check the combination of:
if (log.completed() && statusTracker.noErrors() && statusTracker.isCompleted()) (or whatever better names you'd pick), then I think we'd be solid here and be able to handle both the legacy CDC path and the new index based.

// encountered, advance position to maxOffset so isFullyRead() returns true and CDC
// can move on to the next commit log.
if (log.completed() && statusTracker.noErrors())
{
this.position = (int) log.maxOffset();
}
}
// Unfortunately CommitLogSegmentReader.SegmentIterator (for-loop) cannot throw a checked exception,
// so we check to see if a RuntimeException is wrapping an IOException.
Expand Down Expand Up @@ -425,7 +433,7 @@ private void readSection(FileDataInput reader,
if (end - reader.getFilePointer() < 4)
{
logger.trace("Not enough bytes left for another mutation in this CommitLog section, continuing");
statusTracker.requestTermination();
statusTracker.markCleanCompletion();
return;
}

Expand All @@ -437,7 +445,7 @@ private void readSection(FileDataInput reader,
// Mark the log as fully consumed so isFullyRead() returns true.
// The guard above ensures this is not overridden after readSection returns.
this.position = (int) log.maxOffset();
statusTracker.requestTermination();
statusTracker.markCleanCompletion();
return;
}

Expand Down Expand Up @@ -673,10 +681,12 @@ private static class ReadStatusTracker
public String errorContext = "";
public boolean tolerateErrorsInSection;
private boolean error;
private boolean cleanCompletion;

private ReadStatusTracker(int mutationLimit, boolean tolerateErrorsInSection)
{
this.error = false;
this.cleanCompletion = false;
this.mutationsLeft = mutationLimit;
this.tolerateErrorsInSection = tolerateErrorsInSection;
}
Expand All @@ -692,13 +702,23 @@ public void addProcessedMutation()

public boolean shouldContinue()
{
return !error && mutationsLeft != 0;
return !error && !cleanCompletion && mutationsLeft != 0;
}

public boolean noErrors()
{
return !error;
}

public void requestTermination()
{
error = true;
}

public void markCleanCompletion()
{
cleanCompletion = true;
}
}

/**
Expand Down