diff --git a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarStatePersister.java b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarStatePersister.java index ad4c34d50..52d00813e 100644 --- a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarStatePersister.java +++ b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarStatePersister.java @@ -134,6 +134,7 @@ public synchronized void start() /** * Stop the SidecarStatePersister gracefully, blocking to await for any pending flushes to complete. */ + @Override public void stop() { stop(true); diff --git a/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/Cdc.java b/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/Cdc.java index b3409cdfd..00d0b7695 100644 --- a/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/Cdc.java +++ b/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/Cdc.java @@ -318,6 +318,18 @@ protected void run() throws NotEnoughReplicasException eventConsumer.accept(event); } + // flush before persisting state; if delivery fails this throws, + // skipping persist() so the micro-batch is retried on the next run + try + { + eventConsumer.flush(); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted during event consumer flush", e); + } + // persist end state CdcState endState = it.endState(); persist(endState, tokenRange); @@ -442,5 +454,6 @@ protected void refreshSchema() public void close() { this.stop(); + statePersister.stop(); } } diff --git a/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/api/EventConsumer.java b/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/api/EventConsumer.java index a0ccddee6..0890b0f47 100644 --- a/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/api/EventConsumer.java +++ b/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/api/EventConsumer.java @@ -25,4 +25,17 @@ public interface EventConsumer extends Consumer { + /** + * Flush any pending events to the transport layer. + * Called after each micro-batch and before CDC state is persisted, to ensure + * all events have been durably delivered before the commit-log position advances. + * If delivery fails, implementations must throw so that state is NOT persisted + * and the micro-batch will be retried on the next run. + * + * @throws InterruptedException if the calling thread is interrupted while flushing + */ + default void flush() throws InterruptedException + { + // no-op by default + } } diff --git a/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/api/StatePersister.java b/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/api/StatePersister.java index 1e95e6e93..648a83760 100644 --- a/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/api/StatePersister.java +++ b/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/api/StatePersister.java @@ -76,6 +76,15 @@ default CdcState loadCanonicalState(String jobId, int partitionId, @Nullable Tok .orElse(CdcState.BLANK); } + /** + * Stop the StatePersister, flushing any buffered state to persistent storage before returning. + * Implementations that buffer state asynchronously must override this to ensure no state is lost on shutdown. + */ + default void stop() + { + // no-op by default + } + /** * Load last CDC state from persistant storage after a bounce, restart or configuration change. *