Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
import io.a2a.extras.queuemanager.replicated.core.ReplicatedEventQueueItem;
import io.a2a.jsonrpc.common.json.JsonUtil;
import io.a2a.server.PublicAgentCard;
import io.a2a.server.events.EventQueue;
import io.a2a.server.events.QueueClosedEvent;
import io.a2a.server.events.QueueManager;
import io.a2a.spec.A2AClientException;
import io.a2a.spec.AgentCard;
import io.a2a.spec.Message;
Expand All @@ -46,6 +48,8 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Integration test for Kafka replication functionality.
Expand All @@ -54,6 +58,8 @@
@QuarkusTest
public class KafkaReplicationIntegrationTest {

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaReplicationIntegrationTest.class);

@Inject
@PublicAgentCard
AgentCard agentCard;
Expand All @@ -65,6 +71,9 @@ public class KafkaReplicationIntegrationTest {
@Channel("replicated-events-out")
Emitter<String> testEmitter;

@Inject
QueueManager queueManager;

private Client streamingClient;
private Client nonStreamingClient;
private Client pollingClient;
Expand Down Expand Up @@ -367,8 +376,11 @@ public void testQueueClosedEventTerminatesRemoteSubscribers() throws Exception {
// Resubscribe to the task - this creates a streaming subscription
streamingClient.resubscribe(new TaskIdParams(taskId), List.of(consumer), errorHandler);

// Wait a moment to ensure the streaming subscription is fully established
Thread.sleep(2000);
// Wait for the EventConsumer to start polling (replaces unreliable Thread.sleep)
// This ensures the consumer is ready to receive the QueueClosedEvent
EventQueue queue = queueManager.get(taskId);
assertNotNull(queue, "Queue should exist for task " + taskId);
queueManager.awaitQueuePollerStart(queue);

// Now manually send a QueueClosedEvent to Kafka to simulate queue closure on another node
QueueClosedEvent closedEvent = new QueueClosedEvent(taskId);
Expand All @@ -384,6 +396,10 @@ public void testQueueClosedEventTerminatesRemoteSubscribers() throws Exception {
"Streaming subscription should complete when QueueClosedEvent is received");

// Verify the stream completed normally (not with an error)
if (!streamCompleted.get()) {
LOGGER.error("Stream did not complete normally! streamErrored={}, errorRef={}",
streamErrored.get(), errorRef.get(), errorRef.get());
}
assertTrue(streamCompleted.get(), "Stream should complete normally when QueueClosedEvent is received");
assertFalse(streamErrored.get(), "Stream should not error on QueueClosedEvent");
assertNull(errorRef.get(), "Should not receive error when stream completes gracefully");
Expand Down
48 changes: 24 additions & 24 deletions server-common/src/main/java/io/a2a/server/events/EventQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,10 @@ public void enqueueLocalOnly(EventQueueItem item) {
@Override
@Nullable
public EventQueueItem dequeueEventItem(int waitMilliSeconds) throws EventQueueClosedException {
// CRITICAL: Signal polling started BEFORE any early returns
// This ensures awaitQueuePollerStart() unblocks even if queue is already closed
signalQueuePollerStarted();

// For immediate close: exit immediately even if queue is not empty (race with MainEventBusProcessor)
// For graceful close: only exit when queue is empty (wait for all events to be consumed)
// BUT: if awaiting final event, keep polling even if closed and empty
Expand All @@ -672,32 +676,28 @@ public EventQueueItem dequeueEventItem(int waitMilliSeconds) throws EventQueueCl
queue.size());
throw new EventQueueClosedException();
}
try {
if (waitMilliSeconds <= 0) {
EventQueueItem item = queue.poll();
if (item != null) {
Event event = item.getEvent();
LOGGER.debug("Dequeued event item (no wait) {} {}", this, event instanceof Throwable ? event.toString() : event);
}
return item;
if (waitMilliSeconds <= 0) {
EventQueueItem item = queue.poll();
if (item != null) {
Event event = item.getEvent();
LOGGER.debug("Dequeued event item (no wait) {} {}", this, event instanceof Throwable ? event.toString() : event);
}
try {
LOGGER.trace("Polling ChildQueue {} (wait={}ms)", System.identityHashCode(this), waitMilliSeconds);
EventQueueItem item = queue.poll(waitMilliSeconds, TimeUnit.MILLISECONDS);
if (item != null) {
Event event = item.getEvent();
LOGGER.debug("Dequeued event item (waiting) {} {}", this, event instanceof Throwable ? event.toString() : event);
} else {
LOGGER.trace("Dequeue timeout (null) from ChildQueue {}", System.identityHashCode(this));
}
return item;
} catch (InterruptedException e) {
LOGGER.debug("Interrupted dequeue (waiting) {}", this);
Thread.currentThread().interrupt();
return null;
return item;
}
try {
LOGGER.trace("Polling ChildQueue {} (wait={}ms)", System.identityHashCode(this), waitMilliSeconds);
EventQueueItem item = queue.poll(waitMilliSeconds, TimeUnit.MILLISECONDS);
if (item != null) {
Event event = item.getEvent();
LOGGER.debug("Dequeued event item (waiting) {} {}", this, event instanceof Throwable ? event.toString() : event);
} else {
LOGGER.trace("Dequeue timeout (null) from ChildQueue {}", System.identityHashCode(this));
}
} finally {
signalQueuePollerStarted();
return item;
} catch (InterruptedException e) {
LOGGER.debug("Interrupted dequeue (waiting) {}", this);
Thread.currentThread().interrupt();
return null;
}
}

Expand Down