From 58125f5817e8731a23430169fa2a64456265a6c5 Mon Sep 17 00:00:00 2001 From: Kabir Khan Date: Fri, 6 Feb 2026 11:03:58 +0000 Subject: [PATCH] fix: Eliminate intermittent failure in testQueueClosedEventTerminatesRemoteSubscribers Two-part fix for race condition in Kafka replication test: 1. EventQueue: Signal polling started BEFORE any early returns - Moved signalQueuePollerStarted() to beginning of dequeueEventItem() - Previously in finally block, which didn't execute if queue was closed - Race: If queue closed before first dequeue, EventQueueClosedException thrown BEFORE try block, so finally never ran and signal never sent - Now always signals even if queue immediately throws on closure 2. Test: Replace unreliable Thread.sleep with proper synchronization - Inject QueueManager to access the queue for the task - Replace Thread.sleep(2000) with queueManager.awaitQueuePollerStart() - Ensures EventConsumer is actually polling before sending QueueClosedEvent - Add debug logging to diagnose future failures Root cause: signalQueuePollerStarted() was only called in finally block, which was skipped when queue was already closed at first dequeue attempt. This caused awaitQueuePollerStart() to hang or timeout intermittently. --- .../KafkaReplicationIntegrationTest.java | 20 +++++++- .../java/io/a2a/server/events/EventQueue.java | 48 +++++++++---------- 2 files changed, 42 insertions(+), 26 deletions(-) diff --git a/extras/queue-manager-replicated/tests-single-instance/src/test/java/io/a2a/extras/queuemanager/replicated/tests/KafkaReplicationIntegrationTest.java b/extras/queue-manager-replicated/tests-single-instance/src/test/java/io/a2a/extras/queuemanager/replicated/tests/KafkaReplicationIntegrationTest.java index c4b23fb0d..d38f8244c 100644 --- a/extras/queue-manager-replicated/tests-single-instance/src/test/java/io/a2a/extras/queuemanager/replicated/tests/KafkaReplicationIntegrationTest.java +++ b/extras/queue-manager-replicated/tests-single-instance/src/test/java/io/a2a/extras/queuemanager/replicated/tests/KafkaReplicationIntegrationTest.java @@ -30,7 +30,9 @@ import io.a2a.extras.queuemanager.replicated.core.ReplicatedEventQueueItem; import io.a2a.jsonrpc.common.json.JsonUtil; import io.a2a.server.PublicAgentCard; +import io.a2a.server.events.EventQueue; import io.a2a.server.events.QueueClosedEvent; +import io.a2a.server.events.QueueManager; import io.a2a.spec.A2AClientException; import io.a2a.spec.AgentCard; import io.a2a.spec.Message; @@ -46,6 +48,8 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Integration test for Kafka replication functionality. @@ -54,6 +58,8 @@ @QuarkusTest public class KafkaReplicationIntegrationTest { + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaReplicationIntegrationTest.class); + @Inject @PublicAgentCard AgentCard agentCard; @@ -65,6 +71,9 @@ public class KafkaReplicationIntegrationTest { @Channel("replicated-events-out") Emitter testEmitter; + @Inject + QueueManager queueManager; + private Client streamingClient; private Client nonStreamingClient; private Client pollingClient; @@ -367,8 +376,11 @@ public void testQueueClosedEventTerminatesRemoteSubscribers() throws Exception { // Resubscribe to the task - this creates a streaming subscription streamingClient.resubscribe(new TaskIdParams(taskId), List.of(consumer), errorHandler); - // Wait a moment to ensure the streaming subscription is fully established - Thread.sleep(2000); + // Wait for the EventConsumer to start polling (replaces unreliable Thread.sleep) + // This ensures the consumer is ready to receive the QueueClosedEvent + EventQueue queue = queueManager.get(taskId); + assertNotNull(queue, "Queue should exist for task " + taskId); + queueManager.awaitQueuePollerStart(queue); // Now manually send a QueueClosedEvent to Kafka to simulate queue closure on another node QueueClosedEvent closedEvent = new QueueClosedEvent(taskId); @@ -384,6 +396,10 @@ public void testQueueClosedEventTerminatesRemoteSubscribers() throws Exception { "Streaming subscription should complete when QueueClosedEvent is received"); // Verify the stream completed normally (not with an error) + if (!streamCompleted.get()) { + LOGGER.error("Stream did not complete normally! streamErrored={}, errorRef={}", + streamErrored.get(), errorRef.get(), errorRef.get()); + } assertTrue(streamCompleted.get(), "Stream should complete normally when QueueClosedEvent is received"); assertFalse(streamErrored.get(), "Stream should not error on QueueClosedEvent"); assertNull(errorRef.get(), "Should not receive error when stream completes gracefully"); diff --git a/server-common/src/main/java/io/a2a/server/events/EventQueue.java b/server-common/src/main/java/io/a2a/server/events/EventQueue.java index 99f8bc2dc..122d4b0f8 100644 --- a/server-common/src/main/java/io/a2a/server/events/EventQueue.java +++ b/server-common/src/main/java/io/a2a/server/events/EventQueue.java @@ -662,6 +662,10 @@ public void enqueueLocalOnly(EventQueueItem item) { @Override @Nullable public EventQueueItem dequeueEventItem(int waitMilliSeconds) throws EventQueueClosedException { + // CRITICAL: Signal polling started BEFORE any early returns + // This ensures awaitQueuePollerStart() unblocks even if queue is already closed + signalQueuePollerStarted(); + // For immediate close: exit immediately even if queue is not empty (race with MainEventBusProcessor) // For graceful close: only exit when queue is empty (wait for all events to be consumed) // BUT: if awaiting final event, keep polling even if closed and empty @@ -672,32 +676,28 @@ public EventQueueItem dequeueEventItem(int waitMilliSeconds) throws EventQueueCl queue.size()); throw new EventQueueClosedException(); } - try { - if (waitMilliSeconds <= 0) { - EventQueueItem item = queue.poll(); - if (item != null) { - Event event = item.getEvent(); - LOGGER.debug("Dequeued event item (no wait) {} {}", this, event instanceof Throwable ? event.toString() : event); - } - return item; + if (waitMilliSeconds <= 0) { + EventQueueItem item = queue.poll(); + if (item != null) { + Event event = item.getEvent(); + LOGGER.debug("Dequeued event item (no wait) {} {}", this, event instanceof Throwable ? event.toString() : event); } - try { - LOGGER.trace("Polling ChildQueue {} (wait={}ms)", System.identityHashCode(this), waitMilliSeconds); - EventQueueItem item = queue.poll(waitMilliSeconds, TimeUnit.MILLISECONDS); - if (item != null) { - Event event = item.getEvent(); - LOGGER.debug("Dequeued event item (waiting) {} {}", this, event instanceof Throwable ? event.toString() : event); - } else { - LOGGER.trace("Dequeue timeout (null) from ChildQueue {}", System.identityHashCode(this)); - } - return item; - } catch (InterruptedException e) { - LOGGER.debug("Interrupted dequeue (waiting) {}", this); - Thread.currentThread().interrupt(); - return null; + return item; + } + try { + LOGGER.trace("Polling ChildQueue {} (wait={}ms)", System.identityHashCode(this), waitMilliSeconds); + EventQueueItem item = queue.poll(waitMilliSeconds, TimeUnit.MILLISECONDS); + if (item != null) { + Event event = item.getEvent(); + LOGGER.debug("Dequeued event item (waiting) {} {}", this, event instanceof Throwable ? event.toString() : event); + } else { + LOGGER.trace("Dequeue timeout (null) from ChildQueue {}", System.identityHashCode(this)); } - } finally { - signalQueuePollerStarted(); + return item; + } catch (InterruptedException e) { + LOGGER.debug("Interrupted dequeue (waiting) {}", this); + Thread.currentThread().interrupt(); + return null; } }