diff --git a/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java b/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java index 7ca65cb35..b3d51ea4a 100644 --- a/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java +++ b/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java @@ -299,15 +299,15 @@ public void invokeJSONRPCHandler(@Body String body, RoutingContext rc) { .putHeader(CONTENT_TYPE, APPLICATION_JSON) .end(serializeResponse(error)); } else if (streaming) { - final Multi> finalStreamingResponse = streamingResponse; - executor.execute(() -> { - // Convert Multi to Multi with SSE formatting - AtomicLong eventIdCounter = new AtomicLong(0); - Multi sseEvents = finalStreamingResponse - .map(response -> SseFormatter.formatResponseAsSSE(response, eventIdCounter.getAndIncrement())); - // Write SSE-formatted strings to HTTP response - MultiSseSupport.writeSseStrings(sseEvents, rc, context); - }); + // Convert Multi to Multi with SSE formatting + // CRITICAL: Subscribe synchronously to avoid race condition where EventConsumer + // starts emitting events before MultiSseSupport subscribes. The executor.execute() + // wrapper caused 100-600ms delays before subscription, causing events to be lost. + AtomicLong eventIdCounter = new AtomicLong(0); + Multi sseEvents = streamingResponse + .map(response -> SseFormatter.formatResponseAsSSE(response, eventIdCounter.getAndIncrement())); + // Write SSE-formatted strings to HTTP response + MultiSseSupport.writeSseStrings(sseEvents, rc, context); } else { rc.response() @@ -783,7 +783,17 @@ public void onNext(String sseEvent) { if (headers.get(CONTENT_TYPE) == null) { headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS); } + // Additional SSE headers to prevent buffering + headers.set("Cache-Control", "no-cache"); + headers.set("X-Accel-Buffering", "no"); // Disable nginx buffering response.setChunked(true); + + // CRITICAL: Disable write queue max size to prevent buffering + // Vert.x buffers writes by default - we need immediate flushing for SSE + response.setWriteQueueMaxSize(1); + + // Send initial SSE comment to kickstart the stream + response.write(": SSE stream started\n\n"); } // Write SSE-formatted string to response diff --git a/reference/rest/src/main/java/io/a2a/server/rest/quarkus/A2AServerRoutes.java b/reference/rest/src/main/java/io/a2a/server/rest/quarkus/A2AServerRoutes.java index 03bdbd5e5..94deae2b3 100644 --- a/reference/rest/src/main/java/io/a2a/server/rest/quarkus/A2AServerRoutes.java +++ b/reference/rest/src/main/java/io/a2a/server/rest/quarkus/A2AServerRoutes.java @@ -221,15 +221,15 @@ public void sendMessageStreaming(@Body String body, RoutingContext rc) { if (error != null) { sendResponse(rc, error); } else if (streamingResponse != null) { - final HTTPRestStreamingResponse finalStreamingResponse = streamingResponse; - executor.execute(() -> { - // Convert Flow.Publisher (JSON) to Multi (SSE-formatted) - AtomicLong eventIdCounter = new AtomicLong(0); - Multi sseEvents = Multi.createFrom().publisher(finalStreamingResponse.getPublisher()) - .map(json -> SseFormatter.formatJsonAsSSE(json, eventIdCounter.getAndIncrement())); - // Write SSE-formatted strings to HTTP response - MultiSseSupport.writeSseStrings(sseEvents, rc, context); - }); + // Convert Flow.Publisher (JSON) to Multi (SSE-formatted) + // CRITICAL: Subscribe synchronously to avoid race condition where EventConsumer + // starts emitting events before MultiSseSupport subscribes. The executor.execute() + // wrapper caused 100-600ms delays before subscription, causing events to be lost. + AtomicLong eventIdCounter = new AtomicLong(0); + Multi sseEvents = Multi.createFrom().publisher(streamingResponse.getPublisher()) + .map(json -> SseFormatter.formatJsonAsSSE(json, eventIdCounter.getAndIncrement())); + // Write SSE-formatted strings to HTTP response + MultiSseSupport.writeSseStrings(sseEvents, rc, context); } } } @@ -431,15 +431,15 @@ public void subscribeToTask(RoutingContext rc) { if (error != null) { sendResponse(rc, error); } else if (streamingResponse != null) { - final HTTPRestStreamingResponse finalStreamingResponse = streamingResponse; - executor.execute(() -> { - // Convert Flow.Publisher (JSON) to Multi (SSE-formatted) - AtomicLong eventIdCounter = new AtomicLong(0); - Multi sseEvents = Multi.createFrom().publisher(finalStreamingResponse.getPublisher()) - .map(json -> SseFormatter.formatJsonAsSSE(json, eventIdCounter.getAndIncrement())); - // Write SSE-formatted strings to HTTP response - MultiSseSupport.writeSseStrings(sseEvents, rc, context); - }); + // Convert Flow.Publisher (JSON) to Multi (SSE-formatted) + // CRITICAL: Subscribe synchronously to avoid race condition where EventConsumer + // starts emitting events before MultiSseSupport subscribes. The executor.execute() + // wrapper caused 100-600ms delays before subscription, causing events to be lost. + AtomicLong eventIdCounter = new AtomicLong(0); + Multi sseEvents = Multi.createFrom().publisher(streamingResponse.getPublisher()) + .map(json -> SseFormatter.formatJsonAsSSE(json, eventIdCounter.getAndIncrement())); + // Write SSE-formatted strings to HTTP response + MultiSseSupport.writeSseStrings(sseEvents, rc, context); } } } diff --git a/server-common/src/main/java/io/a2a/server/events/EventConsumer.java b/server-common/src/main/java/io/a2a/server/events/EventConsumer.java index 0577e4f28..bfa0480a7 100644 --- a/server-common/src/main/java/io/a2a/server/events/EventConsumer.java +++ b/server-common/src/main/java/io/a2a/server/events/EventConsumer.java @@ -1,5 +1,6 @@ package io.a2a.server.events; +import java.util.concurrent.Executor; import java.util.concurrent.Flow; import io.a2a.spec.A2AError; @@ -19,22 +20,31 @@ public class EventConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(EventConsumer.class); private final EventQueue queue; + private final Executor executor; private volatile @Nullable Throwable error; private volatile boolean cancelled = false; private volatile boolean agentCompleted = false; private volatile int pollTimeoutsAfterAgentCompleted = 0; private volatile @Nullable TaskState lastSeenTaskState = null; + private volatile int pollTimeoutsWhileAwaitingFinal = 0; private static final String ERROR_MSG = "Agent did not return any response"; private static final int NO_WAIT = -1; private static final int QUEUE_WAIT_MILLISECONDS = 500; // In replicated scenarios, events can arrive hundreds of milliseconds after local agent completes // Grace period allows Kafka replication to deliver late-arriving events - // 3 timeouts * 500ms = 1500ms grace period for replication delays + // Calculation: MAX_POLL_TIMEOUTS_AFTER_AGENT_COMPLETED * QUEUE_WAIT_MILLISECONDS = 1500ms private static final int MAX_POLL_TIMEOUTS_AFTER_AGENT_COMPLETED = 3; + // Maximum time to wait for final event when awaitingFinalEvent is set + // If event doesn't arrive after this many timeouts, assume it won't arrive + // Calculation: MAX_POLL_TIMEOUTS_AWAITING_FINAL * QUEUE_WAIT_MILLISECONDS = 3000ms + private static final int MAX_AWAITING_FINAL_TIMEOUT_MS = 3000; + private static final int MAX_POLL_TIMEOUTS_AWAITING_FINAL = + MAX_AWAITING_FINAL_TIMEOUT_MS / QUEUE_WAIT_MILLISECONDS; - public EventConsumer(EventQueue queue) { + public EventConsumer(EventQueue queue, Executor executor) { this.queue = queue; + this.executor = executor; LOGGER.debug("EventConsumer created with queue {}", System.identityHashCode(queue)); } @@ -51,9 +61,12 @@ public Flow.Publisher consumeAll() { .withBackpressureStrategy(BackpressureStrategy.BUFFER) .withBufferSize(256); return ZeroPublisher.create(conf, tube -> { - boolean completed = false; - try { - while (true) { + // CRITICAL: Spawn polling loop on executor to avoid blocking the calling thread + // The lambda returns immediately, but polling continues on separate thread + executor.execute(() -> { + boolean completed = false; + try { + while (true) { // Check if cancelled by client disconnect if (cancelled) { LOGGER.debug("EventConsumer detected cancellation, exiting polling loop for queue {}", System.identityHashCode(queue)); @@ -82,8 +95,9 @@ public Flow.Publisher consumeAll() { item = queue.dequeueEventItem(QUEUE_WAIT_MILLISECONDS); if (item == null) { int queueSize = queue.size(); - LOGGER.debug("EventConsumer poll timeout (null item), agentCompleted={}, queue.size()={}, timeoutCount={}", - agentCompleted, queueSize, pollTimeoutsAfterAgentCompleted); + boolean awaitingFinal = queue.isAwaitingFinalEvent(); + LOGGER.debug("EventConsumer poll timeout (null item), agentCompleted={}, queue.size()={}, awaitingFinalEvent={}, timeoutCount={}, awaitingTimeoutCount={}", + agentCompleted, queueSize, awaitingFinal, pollTimeoutsAfterAgentCompleted, pollTimeoutsWhileAwaitingFinal); // If agent completed, a poll timeout means no more events are coming // MainEventBusProcessor has 500ms to distribute events from MainEventBus // If we timeout with agentCompleted=true, all events have been distributed @@ -94,8 +108,38 @@ public Flow.Publisher consumeAll() { // CRITICAL: Do NOT close if task is in interrupted state (INPUT_REQUIRED, AUTH_REQUIRED) // Per A2A spec, interrupted states are NOT terminal - the stream must stay open // for future state updates even after agent completes (agent will be re-invoked later). + // + // CRITICAL: Don't start timeout counter if we're awaiting a final event. + // The awaitingFinalEvent flag is set when MainQueue enqueues a final event + // but it hasn't been distributed to this ChildQueue yet. + // HOWEVER: If we've been waiting too long for the final event (>3s), give up and + // proceed with normal timeout logic to prevent infinite waiting. boolean isInterruptedState = lastSeenTaskState != null && lastSeenTaskState.isInterrupted(); - if (agentCompleted && queueSize == 0 && !isInterruptedState) { + + // Track how long we've been waiting for the final event. + // Three cases for the awaiting counter: + // awaitingFinal && queueSize == 0: final event enqueued in MainQueue but not yet + // distributed here — increment timeout counter and give up after MAX timeout. + // awaitingFinal && queueSize > 0: events are still in transit, do nothing — + // the counter is reset below once an event is successfully dequeued. + // !awaitingFinal: not waiting for anything — reset the counter (timeout case; + // the successful-dequeue reset happens below at the event-received path). + if (awaitingFinal && queueSize == 0) { + pollTimeoutsWhileAwaitingFinal++; + if (pollTimeoutsWhileAwaitingFinal >= MAX_POLL_TIMEOUTS_AWAITING_FINAL) { + LOGGER.debug("Waited {} timeouts for final event but it hasn't arrived - proceeding with normal timeout logic (queue={})", + pollTimeoutsWhileAwaitingFinal, System.identityHashCode(queue)); + // Clear the flag on the queue itself, not just the local variable + queue.clearAwaitingFinalEvent(); + awaitingFinal = false; // Also update local variable for this iteration + } + } else if (!awaitingFinal) { + // Poll timed out and we are not awaiting a final event: reset the counter. + // (The successful-dequeue reset is handled separately below.) + pollTimeoutsWhileAwaitingFinal = 0; + } + + if (agentCompleted && queueSize == 0 && !isInterruptedState && !awaitingFinal) { pollTimeoutsAfterAgentCompleted++; if (pollTimeoutsAfterAgentCompleted >= MAX_POLL_TIMEOUTS_AFTER_AGENT_COMPLETED) { LOGGER.debug("Agent completed with {} consecutive poll timeouts and empty queue, closing for graceful completion (queue={})", @@ -116,11 +160,16 @@ public Flow.Publisher consumeAll() { LOGGER.debug("Agent completed but queue has {} pending events, resetting timeout counter and continuing to poll (queue={})", queueSize, System.identityHashCode(queue)); pollTimeoutsAfterAgentCompleted = 0; // Reset counter when events arrive + } else if (agentCompleted && awaitingFinal) { + LOGGER.debug("Agent completed, awaiting final event (timeout {}/{}), continuing to poll (queue={})", + pollTimeoutsWhileAwaitingFinal, MAX_POLL_TIMEOUTS_AWAITING_FINAL, System.identityHashCode(queue)); + pollTimeoutsAfterAgentCompleted = 0; // Reset counter while awaiting final } continue; } - // Event received - reset timeout counter + // Event received - reset timeout counters pollTimeoutsAfterAgentCompleted = 0; + pollTimeoutsWhileAwaitingFinal = 0; event = item.getEvent(); LOGGER.debug("EventConsumer received event: {} (queue={})", event.getClass().getSimpleName(), System.identityHashCode(queue)); @@ -179,10 +228,11 @@ public Flow.Publisher consumeAll() { // the stream-end signal can reach the client BEFORE the buffered final event, // causing the client to close the connection and never receive the final event. // This is especially important in replicated scenarios where events arrive via Kafka - // and timing is less deterministic. A small delay ensures the buffer flushes. + // and timing is less deterministic. A delay ensures the buffer flushes. + // Increased to 150ms to account for CI environment latency and JVM scheduling delays. if (isFinalSent) { try { - Thread.sleep(50); // 50ms to allow SSE buffer flush + Thread.sleep(150); // 150ms to allow SSE buffer flush in CI environments } catch (InterruptedException e) { Thread.currentThread().interrupt(); } @@ -198,15 +248,17 @@ public Flow.Publisher consumeAll() { return; } } - } finally { - if (!completed) { - LOGGER.debug("EventConsumer finally block: calling tube.complete() for queue {}", System.identityHashCode(queue)); - tube.complete(); - LOGGER.debug("EventConsumer finally block: tube.complete() returned for queue {}", System.identityHashCode(queue)); - } else { - LOGGER.debug("EventConsumer finally block: completed=true, skipping tube.complete() for queue {}", System.identityHashCode(queue)); + } finally { + if (!completed) { + LOGGER.debug("EventConsumer finally block: calling tube.complete() for queue {}", System.identityHashCode(queue)); + tube.complete(); + LOGGER.debug("EventConsumer finally block: tube.complete() returned for queue {}", System.identityHashCode(queue)); + } else { + LOGGER.debug("EventConsumer finally block: completed=true, skipping tube.complete() for queue {}", System.identityHashCode(queue)); + } } - } + }); + // Lambda returns immediately - polling continues on executor thread }); } diff --git a/server-common/src/main/java/io/a2a/server/events/EventQueue.java b/server-common/src/main/java/io/a2a/server/events/EventQueue.java index 53ae22da4..6226f72e2 100644 --- a/server-common/src/main/java/io/a2a/server/events/EventQueue.java +++ b/server-common/src/main/java/io/a2a/server/events/EventQueue.java @@ -304,6 +304,36 @@ public void taskDone() { */ public abstract int size(); + /** + * Returns whether this queue is awaiting a final event to be delivered. + *

+ * This is used by EventConsumer to determine if it should keep polling even when + * the queue is empty. A final event may still be in-transit through MainEventBusProcessor. + *

+ *

+ * For MainQueue: always returns false (MainQueue cannot be consumed). + * For ChildQueue: returns true if {@link ChildQueue#expectFinalEvent()} was called + * but the final event hasn't been received yet. + *

+ * + * @return true if awaiting a final event, false otherwise + */ + public boolean isAwaitingFinalEvent() { + // Default implementation - overridden by ChildQueue + return false; + } + + /** + * Clears the awaiting final event flag. + *

+ * Default implementation is a no-op for queues that don't track this state. + * ChildQueue overrides this to actually clear the flag. + *

+ */ + public void clearAwaitingFinalEvent() { + // Default no-op implementation - overridden by ChildQueue + } + /** * Closes this event queue gracefully, allowing pending events to be consumed. */ @@ -757,6 +787,11 @@ public int size() { return queue.size(); } + @Override + public boolean isAwaitingFinalEvent() { + return awaitingFinalEvent; + } + @Override public void awaitQueuePollerStart() throws InterruptedException { parent.awaitQueuePollerStart(); @@ -790,6 +825,16 @@ void expectFinalEvent() { LOGGER.debug("ChildQueue {} now awaiting final event", System.identityHashCode(this)); } + /** + * Called by EventConsumer when it has waited too long for the final event. + * This allows normal timeout logic to proceed if the final event never arrives. + */ + @Override + public void clearAwaitingFinalEvent() { + awaitingFinalEvent = false; + LOGGER.debug("ChildQueue {} cleared awaitingFinalEvent flag (timeout)", System.identityHashCode(this)); + } + @Override public void close() { close(false); diff --git a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java index 9a480c44f..bc6651d11 100644 --- a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java +++ b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java @@ -12,6 +12,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; @@ -68,6 +69,7 @@ import io.a2a.spec.TaskStatusUpdateEvent; import io.a2a.spec.UnsupportedOperationError; +import io.a2a.util.Utils; import org.jspecify.annotations.NonNull; import org.jspecify.annotations.Nullable; import org.slf4j.Logger; @@ -380,6 +382,9 @@ public Task onCancelTask(CancelTaskParams params, ServerCallContext context) thr ResultAggregator resultAggregator = new ResultAggregator(taskManager, null, executor, eventConsumerExecutor); EventQueue queue = queueManager.createOrTap(task.id()); + EventConsumer consumer = new EventConsumer(queue, eventConsumerExecutor); + + // Call agentExecutor.cancel() to enqueue the CANCELED event RequestContext cancelRequestContext = requestContextBuilder.get() .setTaskId(task.id()) .setContextId(task.contextId()) @@ -387,29 +392,20 @@ public Task onCancelTask(CancelTaskParams params, ServerCallContext context) thr .setServerCallContext(context) .build(); AgentEmitter emitter = new AgentEmitter(cancelRequestContext, queue); - try { - agentExecutor.cancel(cancelRequestContext, emitter); - } catch (TaskNotCancelableError e) { - // Expected error - log at INFO level - LOGGER.info("Task {} is not cancelable", task.id()); - throw e; - } catch (A2AError e) { - // Other A2A errors - log at WARN level with stack trace - LOGGER.warn("Agent cancellation threw A2AError for task {}: {} - {}", - task.id(), e.getClass().getSimpleName(), e.getMessage(), e); - throw e; - } catch (Exception e) { - // Unexpected errors - log at ERROR level - LOGGER.error("Agent cancellation threw unexpected exception for task {}", task.id(), e); - throw new io.a2a.spec.InternalError("Agent cancellation failed: " + e.getMessage()); - } + agentExecutor.cancel(cancelRequestContext, emitter); + + // Cancel any running agent future Optional.ofNullable(runningAgents.get(task.id())) .ifPresent(cf -> cf.cancel(true)); - EventConsumer consumer = new EventConsumer(queue); - EventKind type = resultAggregator.consumeAll(consumer); - if (!(type instanceof Task tempTask)) { + // Consume events with blocking=true to wait for CANCELED state + // The latch in consumeAndBreakOnInterrupt ensures EventConsumer starts before we wait + // CANCELED is a final state, so loop will break naturally when event arrives + // If agentExecutor.cancel() threw TaskNotCancelableError, that A2AError event will also break the loop + ResultAggregator.EventTypeAndInterrupt etai = resultAggregator.consumeAndBreakOnInterrupt(consumer, true); + + if (!(etai.eventType() instanceof Task tempTask)) { throw new InternalError("Agent did not return valid response for cancel"); } @@ -459,7 +455,7 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte boolean interruptedOrNonBlocking = false; // Create consumer BEFORE starting agent - callback is registered inside registerAndExecuteAgentAsync - EventConsumer consumer = new EventConsumer(queue); + EventConsumer consumer = new EventConsumer(queue, eventConsumerExecutor); EnhancedRunnable producerRunnable = registerAndExecuteAgentAsync(queueTaskId, mss.requestContext, queue, consumer.createAgentRunnableDoneCallback()); @@ -653,7 +649,7 @@ public Flow.Publisher onMessageSendStream( ResultAggregator resultAggregator = new ResultAggregator(mss.taskManager, null, executor, eventConsumerExecutor); // Create consumer BEFORE starting agent - callback is registered inside registerAndExecuteAgentAsync - EventConsumer consumer = new EventConsumer(queue); + EventConsumer consumer = new EventConsumer(queue, eventConsumerExecutor); EnhancedRunnable producerRunnable = registerAndExecuteAgentAsync(queueTaskId, mss.requestContext, queue, consumer.createAgentRunnableDoneCallback()); @@ -844,7 +840,7 @@ public Flow.Publisher onSubscribeToTask(TaskIdParams params, queue.enqueueEventLocalOnly(task); LOGGER.debug("onSubscribeToTask - enqueued current task state as first event for taskId: {}", params.id()); - EventConsumer consumer = new EventConsumer(queue); + EventConsumer consumer = new EventConsumer(queue, eventConsumerExecutor); Flow.Publisher results = resultAggregator.consumeAndEmit(consumer); LOGGER.debug("onSubscribeToTask - returning publisher for taskId: {}", params.id()); return convertingProcessor(results, item -> (StreamingEventKind) item.getEvent()); diff --git a/server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java b/server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java index 06c689eb2..f1f15021e 100644 --- a/server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java +++ b/server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java @@ -9,6 +9,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.Flow; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import io.a2a.server.events.EventConsumer; @@ -58,54 +59,12 @@ public Flow.Publisher consumeAndEmit(EventConsumer consumer) { return true; }); - // Wrap the publisher to ensure subscription happens on eventConsumerExecutor - // This prevents EventConsumer polling loop from running on AgentExecutor threads - // which caused thread accumulation when those threads didn't timeout - return new Flow.Publisher() { - @Override - public void subscribe(Flow.Subscriber subscriber) { - // Submit subscription to eventConsumerExecutor to isolate polling work - eventConsumerExecutor.execute(() -> processed.subscribe(subscriber)); - } - }; + // No wrapper needed - EventConsumer.consumeAll() now handles thread offloading internally + // This ensures subscription happens immediately without delay, preventing race condition + // where EventConsumer starts emitting before subscriber is ready + return processed; } - public EventKind consumeAll(EventConsumer consumer) throws A2AError { - AtomicReference returnedEvent = new AtomicReference<>(); - Flow.Publisher allItems = consumer.consumeAll(); - AtomicReference error = new AtomicReference<>(); - consumer( - createTubeConfig(), - allItems, - (item) -> { - Event event = item.getEvent(); - if (event instanceof Message msg) { - message = msg; - if (returnedEvent.get() == null) { - returnedEvent.set(msg); - return false; - } - } - // TaskStore update moved to MainEventBusProcessor - return true; - }, - error::set); - - Throwable err = error.get(); - if (err != null) { - Utils.rethrow(err); - } - - EventKind result = returnedEvent.get(); - if (result != null) { - return result; - } - Task task = taskManager.getTask(); - if (task == null) { - throw new io.a2a.spec.InternalError("No task or message available after consuming all events"); - } - return task; - } public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer, boolean blocking) throws A2AError { Flow.Publisher allItems = consumer.consumeAll(); diff --git a/server-common/src/test/java/io/a2a/server/events/EventConsumerTest.java b/server-common/src/test/java/io/a2a/server/events/EventConsumerTest.java index 17242ba89..29c3c6ce9 100644 --- a/server-common/src/test/java/io/a2a/server/events/EventConsumerTest.java +++ b/server-common/src/test/java/io/a2a/server/events/EventConsumerTest.java @@ -72,7 +72,7 @@ public void init() { .taskId(TASK_ID) .mainEventBus(mainEventBus) .build().tap(); - eventConsumer = new EventConsumer(eventQueue); + eventConsumer = new EventConsumer(eventQueue, Runnable::run); } @AfterEach @@ -397,7 +397,7 @@ public void testConsumeAllStopsOnQueueClosed() throws Exception { EventQueue queue = EventQueueUtil.getEventQueueBuilder(mainEventBus) .mainEventBus(mainEventBus) .build().tap(); - EventConsumer consumer = new EventConsumer(queue); + EventConsumer consumer = new EventConsumer(queue, Runnable::run); // Close the queue immediately queue.close(); @@ -445,7 +445,7 @@ public void testConsumeAllHandlesQueueClosedException() throws Exception { EventQueue queue = EventQueueUtil.getEventQueueBuilder(mainEventBus) .mainEventBus(mainEventBus) .build().tap(); - EventConsumer consumer = new EventConsumer(queue); + EventConsumer consumer = new EventConsumer(queue, Runnable::run); // Add a message event (which will complete the stream) Event message = fromJson(MESSAGE_PAYLOAD, Message.class); @@ -499,7 +499,7 @@ public void testConsumeAllTerminatesOnQueueClosedEvent() throws Exception { EventQueue queue = EventQueueUtil.getEventQueueBuilder(mainEventBus) .mainEventBus(mainEventBus) .build().tap(); - EventConsumer consumer = new EventConsumer(queue); + EventConsumer consumer = new EventConsumer(queue, Runnable::run); // Enqueue a QueueClosedEvent (poison pill) QueueClosedEvent queueClosedEvent = new QueueClosedEvent(TASK_ID); diff --git a/server-common/src/test/java/io/a2a/server/tasks/ResultAggregatorTest.java b/server-common/src/test/java/io/a2a/server/tasks/ResultAggregatorTest.java index a347d9cd0..fcd3e9381 100644 --- a/server-common/src/test/java/io/a2a/server/tasks/ResultAggregatorTest.java +++ b/server-common/src/test/java/io/a2a/server/tasks/ResultAggregatorTest.java @@ -258,7 +258,7 @@ void testConsumeAndBreakNonBlocking() throws Exception { // Create real EventConsumer with the queue EventConsumer eventConsumer = - new EventConsumer(queue); + new EventConsumer(queue, Runnable::run); // Close queue after first event to simulate stream ending after processing queue.close(); @@ -306,7 +306,7 @@ void testConsumeAndBreakOnAuthRequired_Blocking() throws Exception { waitForEventProcessing(processor, () -> queue.enqueueEvent(authRequiredTask)); // Create EventConsumer - EventConsumer eventConsumer = new EventConsumer(queue); + EventConsumer eventConsumer = new EventConsumer(queue, Runnable::run); // Call consumeAndBreakOnInterrupt with blocking=true ResultAggregator.EventTypeAndInterrupt result = @@ -348,7 +348,7 @@ void testConsumeAndBreakOnAuthRequired_NonBlocking() throws Exception { waitForEventProcessing(processor, () -> queue.enqueueEvent(authRequiredTask)); // Create EventConsumer - EventConsumer eventConsumer = new EventConsumer(queue); + EventConsumer eventConsumer = new EventConsumer(queue, Runnable::run); // Call consumeAndBreakOnInterrupt with blocking=false ResultAggregator.EventTypeAndInterrupt result = @@ -396,7 +396,7 @@ void testAuthRequiredWithTaskStatusUpdateEvent() throws Exception { waitForEventProcessing(processor, () -> queue.enqueueEvent(authRequiredEvent)); // Create EventConsumer - EventConsumer eventConsumer = new EventConsumer(queue); + EventConsumer eventConsumer = new EventConsumer(queue, Runnable::run); // Call consumeAndBreakOnInterrupt ResultAggregator.EventTypeAndInterrupt result = @@ -442,7 +442,7 @@ void testAuthRequiredWithTaskEvent() throws Exception { waitForEventProcessing(processor, () -> queue.enqueueEvent(authRequiredTask)); // Create EventConsumer - EventConsumer eventConsumer = new EventConsumer(queue); + EventConsumer eventConsumer = new EventConsumer(queue, Runnable::run); // Call consumeAndBreakOnInterrupt ResultAggregator.EventTypeAndInterrupt result = diff --git a/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java b/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java index fc621bdb6..a6f838476 100644 --- a/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java +++ b/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java @@ -1921,7 +1921,11 @@ public void testAuthRequiredWorkflow() throws Exception { } }; - Consumer errorHandler = errorRef::set; + Consumer errorHandler = error -> { + if (!isStreamClosedError(error)) { + errorRef.set(error); + } + }; // Wait for subscription to be established CountDownLatch subscriptionLatch = new CountDownLatch(1);