Skip to content

CASSANALYTICS-126: Flush event consumer before persisting CDC state to prevent data loss on failure#178

Open
jyothsnakonisa wants to merge 1 commit intoapache:trunkfrom
jyothsnakonisa:flush
Open

CASSANALYTICS-126: Flush event consumer before persisting CDC state to prevent data loss on failure#178
jyothsnakonisa wants to merge 1 commit intoapache:trunkfrom
jyothsnakonisa:flush

Conversation

@jyothsnakonisa
Copy link
Contributor

No description provided.

return;
}

asyncExecutor.cancelTimer(this.timerId);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should switch the order here I think. If we throw an exception during the flush() call below, the timerId will be set to -1 at this point allowing subsequent start() calls even though the flush failed. My thinking is we should have a flow of:

  1. flush
  2. If that succeeds, THEN we stop the timer
    Since this is in a synchronized block we're not at risk of any kernel scheduling or races there. But should be in line with the spirit of what we're trying to accomplish on this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CdcPublisher.stop() method eventually call this stop method in state persister, CdcPublisher.stop() gets called in the following scenarios.

  1. TokenRanges changes (so that sidecar starts consumers for new token ranges)
  2. Cdc & kafka config changes.

In both the scenarios, a subsequent start is called after stop.
Now the question is, if flush fails, do we not allow restart of iterators to handle above changes? If we don't allow restart, it will remain in the broken state forever.

With the reversed ordering, if flush() throws and cancelTimer() would never be called, leaving the periodic timer still firing and timerId would not be reset, blocking any subsequent start() calls. So I think we should keep this order to be able to restart

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if flush fails, do we not allow restart of iterators to handle above changes? If we don't allow restart, it will remain in the broken state forever.

Right now, if we get an OutOfMemoryError, we swallow it. java.io.IOException? Swallow it.

This code here:

   protected void persistToCassandra(boolean force)
    {
        // clean-up finished futures
        activeFlush.removeIf(wrapper -> {
            if (wrapper.allDone())
            {
                try
                {
                    wrapper.await();
                    sidecarCdcStats.capturePersistSucceeded(System.nanoTime() - wrapper.startTimeNanos);
                }
                catch (InterruptedException e)
                {
                    LOGGER.warn("Persist failed with InterruptedException", e);
                    Thread.currentThread().interrupt();
                    sidecarCdcStats.capturePersistFailed(e);
                }
                catch (Throwable throwable)
                {
                    LOGGER.warn("Persist failed", throwable);
                    sidecarCdcStats.capturePersistFailed(throwable);
                }
                return true;
            }
            return false;
        });

means that we can have all manner of nasty exceptions happen inside the wrapped persistToCassandra calls. So right now, .stop() isn't greedy in the passthrough to flushActiveSafe, but if anything goes haywire in the persistToCassandra world we'll swallow it, log about it, and keep moving along, allowing subsequent restart of the SidecarStatePersister since this.timerId will be -1.

So my question really is: do we always want the SidecarStatePersister in a "restartable" state after calling .stop(), regardless of whether there are serious underlying exceptions thrown by the activeFlush members? flushActiveSafe has allowances for ExecutionException and InterruptedException but otherwise will bubble exceptions up and leave the object in a broken state if things otherwise fail in unexpected ways. Do we not want to do the same for the state persistence?

.collect(Collectors.toSet());
}

protected void run() throws NotEnoughReplicasException

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was tracing the code here, and it looks like the caller of run() will basically absorb all Throwables that bubble up, log them, and then continue executing. That feels like it's a huge problematic gap in our flow. The code I'm looking at:

    protected void runSafe(CompletableFuture<Void> future)
    {
        try
        {
            run();
            completeActiveFuture(future);
            scheduleNextRun();
        }
        catch (NotEnoughReplicasException e)
        {
            // NotEnoughReplicasException can occur when too many replicas are down
            // OR if there are no new commit logs to read if writes are idle on the cluster
            completeActiveFuture(future);
            scheduleRun(cdcOptions.sleepWhenInsufficientReplicas().toMillis());
        }
        catch (Throwable t)
        {
            completeActiveFuture(future);

            if (handleError(t))
            {
                LOGGER.warn("CdcConsumer epoch failed with recoverable error, scheduling next run jobId={} partition={} epoch={}",
                            jobId, partitionId, currentState.epoch, t);
                scheduleNextRun();
            }
            else
            {
                LOGGER.error("CdcConsumer epoch failed with unrecoverable error jobId={} partition={} epoch={}",
                             jobId, partitionId, currentState.epoch, t);
                stop();
            }
        }
    }

handleError just logs the message and keeps on trucking. OutOfMemory? We keep going. Disk failures? Keep going.

We have the JVMStabilityInspector in Cassandra that I ended up writing for just this type of problem - wrapping intelligence and logic in a central place around "is this a recoverable error or not? And should users be able to configure it as such?"

So that said - I don't think we need to go THAT far (certainly not in this patch), but just wondering if there's something we should do here w/augmenting the handleError path or the exception handling path in runSafe that's less "absorb and allow all errors" and has a little more intelligence around our exception handling situation to prevent the situation that gave rise to this ticket: data loss from exception cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the intention behind silently logging and continuing in case of failures is not to block retrying. However I agree on your point that for irrecoverable errors may be we should stop()

EX: if there is an out of memory exception during run method, do we stop consumers completely? what if the memory usage goes down, how does the sidecar start consumers again in that case?

We have to think about those scenarios before changing the behavior to fail on irrecoverable errors.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EX: if there is an out of memory exception during run method, do we stop consumers completely? what if the memory usage goes down, how does the sidecar start consumers again in that case?

In my experience, once you've hit an OOM you can no longer trust the java process you're operating in to be in a good state. Any and all manner of problems come up once you have a non-deterministic "blast" of items that failed to allocate. Similar to my comment above about ordering and greedy resetting of the this.timerId to -1 before we've actually tried to stop things and see if they shut down gracefully or explode on execution, I think we face the same risk here. Catching all Throwables in this way is "fine" if you have something where you can reasonably recover from a failure state, but what do we do here if we're in a hybrid failure state (i.e. some persisting to disk failed, some didn't)? The logic around persistToCassandra will remove an activeState even if it's in a catastrophically failing state and simply re-enqueue it for the next attempt.

This whole thing seems engineered to try and always keep the persistToCassandra futures around until we know they're durably done, so we move to just catch everything and absorb them on failure. I guess my concern is the degenerate degradation case where we end up with a half-dead zombie process because we over-allocated and then end up strewing OOM's everywhere. For instance - if we're mid OOM, the following code's not going to re-enqueue the calls to persist:

        states.stream()
              .map(this::persistToCassandra)
              .filter(Objects::nonNull)
              .forEach(activeFlush::add);

The design here of the handoff between activeFlush and states plus our greedy "catch everything" exposes us to some nasty problems. =/

public void close()
{
this.stop();
statePersister.stop();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we also need to stop the statePersistor in the Cdc#stop method. Right now we have:

    public void stop(boolean blocking) throws ExecutionException, InterruptedException
    {
        if (isRunning.get() && isRunning.compareAndSet(true, false))
        {
            LOGGER.info("Stopping CDC Consumer jobId={} partitionId={}", jobId, partitionId);
            CompletableFuture<Void> activeFuture = active.get();
            if (activeFuture != null && blocking)
            {
                // block until active future completes
                long timeout = cdcOptions.stopTimeout().toMillis();
                try
                {
                    activeFuture.get(timeout, TimeUnit.MILLISECONDS);
                }
                catch (TimeoutException e)
                {
                    LOGGER.warn("Failed to cleanly shutdown active future after {} millis", timeout);
                    stats.cdcConsumerStopTimeout();
                }
            }
            LOGGER.info("Stopped CDC Consumer jobId={} partitionId={}", jobId, partitionId);
        }
    }

Tracing the paths that lead to here, it's the exception path from runSafe we'll never hit because handleError eats everything. So I guess the question is: if someone calls stop() on Cdc, do they also expect the StatePersister to flush and stop as well? Or should those be able to happen separately? I think it's the former and, if so, we should wire that child stop call to the parent (i.e. if Cdc stops, StatePersister stops).

Then a call to close would call Cdc#stop which would then also stop children.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you are right statePersister.close() should be called when closing CDC, If you closely notice there is another method which does that. With that said, having public stop method in Cdc is confusing. I will make stop method in CDC private, so that only close() method is available which does both CDC.stop() and statePersister.stop()

public void close()
    {
        this.stop();
        statePersister.stop();
    }

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In runsafe() we have the following:

            else
            {
                LOGGER.error("CdcConsumer epoch failed with unrecoverable error jobId={} partition={} epoch={}",
                             jobId, partitionId, currentState.epoch, t);
                stop();
            }

That seems to go through the stop() path w/out triggering the statePersister stop? Best I can tell, the runSafe path still has a hole there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants