diff --git a/README.md b/README.md
index bd86952da..48d28a8cd 100644
--- a/README.md
+++ b/README.md
@@ -143,7 +143,7 @@ public class WeatherAgentCardProducer {
import io.a2a.server.agentexecution.AgentExecutor;
import io.a2a.server.agentexecution.RequestContext;
import io.a2a.server.events.EventQueue;
-import io.a2a.server.tasks.TaskUpdater;
+import io.a2a.server.tasks.AgentEmitter;
import io.a2a.spec.JSONRPCError;
import io.a2a.spec.Message;
import io.a2a.spec.Part;
@@ -173,14 +173,12 @@ public class WeatherAgentExecutorProducer {
}
@Override
- public void execute(RequestContext context, EventQueue eventQueue) throws JSONRPCError {
- TaskUpdater updater = new TaskUpdater(context, eventQueue);
-
+ public void execute(RequestContext context, AgentEmitter agentEmitter) throws JSONRPCError {
// mark the task as submitted and start working on it
if (context.getTask() == null) {
- updater.submit();
+ agentEmitter.submit();
}
- updater.startWork();
+ agentEmitter.startWork();
// extract the text from the message
String userMessage = extractTextFromMessage(context.getMessage());
@@ -189,16 +187,16 @@ public class WeatherAgentExecutorProducer {
String response = weatherAgent.chat(userMessage);
// create the response part
- TextPart responsePart = new TextPart(response, null);
+ TextPart responsePart = new TextPart(response);
List
- *
*
@@ -56,14 +56,12 @@
* }
*
* @Override
- * public void execute(RequestContext context, EventQueue eventQueue) {
- * TaskUpdater updater = new TaskUpdater(context, eventQueue);
- *
+ * public void execute(RequestContext context, AgentEmitter emitter) {
* // Initialize task if this is a new conversation
* if (context.getTask() == null) {
- * updater.submit();
+ * emitter.submit();
* }
- * updater.startWork();
+ * emitter.startWork();
*
* // Extract user input from the message
* String userMessage = context.getUserInput("\n");
@@ -72,14 +70,14 @@
* String weatherData = weatherService.getWeather(userMessage);
*
* // Return result as artifact
- * updater.addArtifact(List.of(new TextPart(weatherData, null)));
- * updater.complete();
+ * emitter.addArtifact(List.of(new TextPart(weatherData, null)));
+ * emitter.complete();
* }
*
* @Override
- * public void cancel(RequestContext context, EventQueue eventQueue) {
+ * public void cancel(RequestContext context, AgentEmitter emitter) {
* // Clean up resources and mark as canceled
- * new TaskUpdater(context, eventQueue).cancel();
+ * emitter.cancel();
* }
* }
* }
@@ -87,16 +85,15 @@
* Streaming Results
* For long-running operations or LLM streaming, enqueue multiple artifacts:
* {@code
- * updater.startWork();
+ * emitter.startWork();
* for (String chunk : llmService.stream(userInput)) {
- * updater.addArtifact(List.of(new TextPart(chunk, null)));
+ * emitter.addArtifact(List.of(new TextPart(chunk, null)));
* }
- * updater.complete(); // Final event closes the queue
+ * emitter.complete(); // Final event closes the queue
* }
*
* @see RequestContext
- * @see io.a2a.server.tasks.TaskUpdater
- * @see io.a2a.server.events.EventQueue
+ * @see AgentEmitter
* @see io.a2a.server.requesthandlers.DefaultRequestHandler
* @see io.a2a.spec.AgentCard
*/
@@ -111,15 +108,15 @@ public interface AgentExecutor {
*
* Important: Don't throw exceptions for business logic errors. Instead, use - * {@code updater.fail(errorMessage)} to communicate failures to the client gracefully. + * {@code emitter.fail(errorMessage)} to communicate failures to the client gracefully. * Only throw {@link A2AError} for truly exceptional conditions. *
* * @param context the request context containing the message, task state, and configuration - * @param eventQueue the queue for enqueueing status updates and artifacts + * @param emitter the agent emitter for sending messages, updating status, and streaming artifacts * @throws A2AError if execution fails catastrophically (exception propagates to client) */ - void execute(RequestContext context, EventQueue eventQueue) throws A2AError; + void execute(RequestContext context, AgentEmitter emitter) throws A2AError; /** * Cancels an ongoing agent execution. @@ -128,11 +125,11 @@ public interface AgentExecutor { * You should: *- * Note: The {@link #execute(RequestContext, EventQueue)} method may still be + * Note: The {@link #execute(RequestContext, AgentEmitter)} method may still be * running on another thread. Use appropriate synchronization or interruption mechanisms * if your agent maintains cancellable state. *
@@ -146,9 +143,9 @@ public interface AgentExecutor { * * * @param context the request context for the task being canceled - * @param eventQueue the queue for enqueueing the cancellation event + * @param emitter the agent emitter for sending the cancellation event * @throws io.a2a.spec.TaskNotCancelableError if this agent does not support cancellation * @throws A2AError if cancellation is supported but failed to execute */ - void cancel(RequestContext context, EventQueue eventQueue) throws A2AError; + void cancel(RequestContext context, AgentEmitter emitter) throws A2AError; } diff --git a/server-common/src/main/java/io/a2a/server/agentexecution/RequestContext.java b/server-common/src/main/java/io/a2a/server/agentexecution/RequestContext.java index e79298703..82a8b9e17 100644 --- a/server-common/src/main/java/io/a2a/server/agentexecution/RequestContext.java +++ b/server-common/src/main/java/io/a2a/server/agentexecution/RequestContext.java @@ -35,7 +35,7 @@ * *
{@code
- * public void execute(RequestContext context, EventQueue queue) {
+ * public void execute(RequestContext context, AgentEmitter emitter) {
* // Check if this is a new conversation or continuation
* Task existingTask = context.getTask();
* if (existingTask == null) {
diff --git a/server-common/src/main/java/io/a2a/server/events/EventConsumer.java b/server-common/src/main/java/io/a2a/server/events/EventConsumer.java
index 7c7b28452..239385a39 100644
--- a/server-common/src/main/java/io/a2a/server/events/EventConsumer.java
+++ b/server-common/src/main/java/io/a2a/server/events/EventConsumer.java
@@ -2,6 +2,7 @@
import java.util.concurrent.Flow;
+import io.a2a.spec.A2AError;
import io.a2a.spec.A2AServerException;
import io.a2a.spec.Event;
import io.a2a.spec.Message;
@@ -136,6 +137,10 @@ public Flow.Publisher consumeAll() {
LOGGER.debug("Received QueueClosedEvent for task {}, treating as final event",
((QueueClosedEvent) event).getTaskId());
isFinalEvent = true;
+ } else if (event instanceof A2AError) {
+ // A2AError events are terminal - they trigger automatic FAILED state transition
+ LOGGER.debug("Received A2AError event, treating as final event");
+ isFinalEvent = true;
}
// Only send event if it's not a QueueClosedEvent
diff --git a/server-common/src/main/java/io/a2a/server/events/EventQueue.java b/server-common/src/main/java/io/a2a/server/events/EventQueue.java
index 4899a926d..53ae22da4 100644
--- a/server-common/src/main/java/io/a2a/server/events/EventQueue.java
+++ b/server-common/src/main/java/io/a2a/server/events/EventQueue.java
@@ -13,6 +13,7 @@
import io.a2a.server.tasks.TaskStateProvider;
import io.a2a.spec.Event;
import io.a2a.spec.Task;
+import io.a2a.spec.TaskArtifactUpdateEvent;
import io.a2a.spec.TaskStatusUpdateEvent;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
@@ -431,6 +432,9 @@ public void enqueueItem(EventQueueItem item) {
// We bypass the parent's closed check and enqueue directly
Event event = item.getEvent();
+ // Validate event taskId matches queue taskId
+ validateEventIds(event);
+
// Check if this is a final event BEFORE submitting to MainEventBus
// If it is, notify all children to expect it (so they wait for MainEventBusProcessor)
if (isFinalEvent(event)) {
@@ -458,6 +462,47 @@ public void enqueueItem(EventQueueItem item) {
mainEventBus.submit(taskId, this, item);
}
+ /**
+ * Validates that events with taskId fields match this queue's taskId.
+ *
+ * Validation Rules:
+ *
+ * - Task, TaskStatusUpdateEvent, TaskArtifactUpdateEvent: MUST match queue taskId
+ * - Message: taskId is OPTIONAL, not validated (can exist without tasks)
+ * - Other events: no validation
+ * - Null queue taskId: skip validation (initialization phase)
+ *
+ *
+ * @param event the event to validate
+ * @throws IllegalArgumentException if event has mismatched taskId
+ */
+ private void validateEventIds(Event event) {
+ if (taskId == null) {
+ return; // Allow any event during initialization
+ }
+
+ String eventTaskId = null;
+ String eventType = null;
+
+ if (event instanceof Task task) {
+ eventTaskId = task.id();
+ eventType = "Task";
+ } else if (event instanceof TaskStatusUpdateEvent statusEvent) {
+ eventTaskId = statusEvent.taskId();
+ eventType = "TaskStatusUpdateEvent";
+ } else if (event instanceof TaskArtifactUpdateEvent artifactEvent) {
+ eventTaskId = artifactEvent.taskId();
+ eventType = "TaskArtifactUpdateEvent";
+ }
+ // Note: Message.taskId is NOT validated - messages can exist independently
+
+ if (eventTaskId != null && !eventTaskId.equals(taskId)) {
+ throw new IllegalArgumentException(
+ String.format("Event taskId mismatch: queue=%s, event=%s, eventType=%s",
+ taskId, eventTaskId, eventType));
+ }
+ }
+
/**
* Checks if an event represents a final task state.
*/
diff --git a/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java b/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java
index 8b3dc6fa3..2a95a849d 100644
--- a/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java
+++ b/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java
@@ -11,6 +11,7 @@
import io.a2a.server.tasks.PushNotificationSender;
import io.a2a.server.tasks.TaskManager;
import io.a2a.server.tasks.TaskStore;
+import io.a2a.spec.A2AError;
import io.a2a.spec.A2AServerException;
import io.a2a.spec.Event;
import io.a2a.spec.InternalError;
@@ -372,7 +373,7 @@ private String extractContextId(Event event) {
* Checks if an event represents a final task state.
*
* @param event the event to check
- * @return true if the event represents a final state (COMPLETED, FAILED, CANCELED, REJECTED, UNKNOWN)
+ * @return true if the event represents a final state (COMPLETED, FAILED, CANCELED, REJECTED, UNKNOWN, or A2AError)
*/
private boolean isFinalEvent(Event event) {
if (event instanceof Task task) {
@@ -380,6 +381,9 @@ private boolean isFinalEvent(Event event) {
&& task.status().state().isFinal();
} else if (event instanceof TaskStatusUpdateEvent statusUpdate) {
return statusUpdate.isFinal();
+ } else if (event instanceof A2AError) {
+ // A2AError events are terminal - they trigger FAILED state transition
+ return true;
}
return false;
}
diff --git a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java
index 31a1a6670..bd9c27500 100644
--- a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java
+++ b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java
@@ -36,6 +36,7 @@
import io.a2a.server.events.EventQueueItem;
import io.a2a.server.events.MainEventBusProcessor;
import io.a2a.server.events.QueueManager;
+import io.a2a.server.tasks.AgentEmitter;
import io.a2a.server.tasks.PushNotificationConfigStore;
import io.a2a.server.tasks.PushNotificationSender;
import io.a2a.server.tasks.ResultAggregator;
@@ -98,7 +99,7 @@
* Transport calls {@link #onMessageSend(MessageSendParams, ServerCallContext)}
* Initialize {@link TaskManager} and {@link RequestContext}
* Create or tap {@link EventQueue} via {@link QueueManager}
- * Execute {@link AgentExecutor#execute(RequestContext, EventQueue)} asynchronously in background thread pool
+ * Execute {@link AgentExecutor#execute(RequestContext, AgentEmitter)} asynchronously in background thread pool
* Consume events from queue on Vert.x worker thread via {@link EventConsumer}
* For blocking=true: wait for agent completion and full event consumption
* Return {@link Task} or {@link Message} to transport
@@ -109,7 +110,7 @@
*
* - Transport calls {@link #onMessageSendStream(MessageSendParams, ServerCallContext)}
* - Initialize components (same as blocking)
- * - Execute {@link AgentExecutor#execute(RequestContext, EventQueue)} asynchronously
+ * - Execute {@link AgentExecutor#execute(RequestContext, AgentEmitter)} asynchronously
* - Return {@link java.util.concurrent.Flow.Publisher Flow.Publisher}<StreamingEventKind> immediately
* - Events stream to client as they arrive in the queue
* - On client disconnect: continue consumption in background (fire-and-forget)
@@ -128,7 +129,7 @@
* Threading Model
*
* - Vert.x worker threads: Execute request handler methods (onMessageSend, etc.)
- * - Agent-executor pool (@Internal): Execute {@link AgentExecutor#execute(RequestContext, EventQueue)}
+ * - Agent-executor pool (@Internal): Execute {@link AgentExecutor#execute(RequestContext, AgentEmitter)}
* - Background cleanup: {@link java.util.concurrent.CompletableFuture CompletableFuture} async tasks
*
*
@@ -378,14 +379,14 @@ public Task onCancelTask(TaskIdParams params, ServerCallContext context) throws
ResultAggregator resultAggregator = new ResultAggregator(taskManager, null, executor, eventConsumerExecutor);
EventQueue queue = queueManager.createOrTap(task.id());
- agentExecutor.cancel(
- requestContextBuilder.get()
- .setTaskId(task.id())
- .setContextId(task.contextId())
- .setTask(task)
- .setServerCallContext(context)
- .build(),
- queue);
+ RequestContext cancelRequestContext = requestContextBuilder.get()
+ .setTaskId(task.id())
+ .setContextId(task.contextId())
+ .setTask(task)
+ .setServerCallContext(context)
+ .build();
+ AgentEmitter emitter = new AgentEmitter(cancelRequestContext, queue);
+ agentExecutor.cancel(cancelRequestContext, emitter);
Optional.ofNullable(runningAgents.get(task.id()))
.ifPresent(cf -> cf.cancel(true));
@@ -439,15 +440,14 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
boolean interruptedOrNonBlocking = false;
- EnhancedRunnable producerRunnable = registerAndExecuteAgentAsync(queueTaskId, mss.requestContext, queue);
+ // Create consumer BEFORE starting agent - callback is registered inside registerAndExecuteAgentAsync
+ EventConsumer consumer = new EventConsumer(queue);
+
+ EnhancedRunnable producerRunnable = registerAndExecuteAgentAsync(queueTaskId, mss.requestContext, queue, consumer.createAgentRunnableDoneCallback());
+
ResultAggregator.EventTypeAndInterrupt etai = null;
EventKind kind = null; // Declare outside try block so it's in scope for return
try {
- EventConsumer consumer = new EventConsumer(queue);
-
- // This callback must be added before we start consuming. Otherwise,
- // any errors thrown by the producerRunnable are not picked up by the consumer
- producerRunnable.addDoneCallback(consumer.createAgentRunnableDoneCallback());
// Get agent future before consuming (for blocking calls to wait for agent completion)
CompletableFuture agentFuture = runningAgents.get(queueTaskId);
@@ -621,11 +621,10 @@ public Flow.Publisher onMessageSendStream(
ResultAggregator resultAggregator = new ResultAggregator(mss.taskManager, null, executor, eventConsumerExecutor);
- EnhancedRunnable producerRunnable = registerAndExecuteAgentAsync(queueTaskId, mss.requestContext, queue);
-
- // Move consumer creation and callback registration outside try block
+ // Create consumer BEFORE starting agent - callback is registered inside registerAndExecuteAgentAsync
EventConsumer consumer = new EventConsumer(queue);
- producerRunnable.addDoneCallback(consumer.createAgentRunnableDoneCallback());
+
+ EnhancedRunnable producerRunnable = registerAndExecuteAgentAsync(queueTaskId, mss.requestContext, queue, consumer.createAgentRunnableDoneCallback());
// Store cancel callback in context for closeHandler to access
// When client disconnects, closeHandler can call this to stop EventConsumer polling loop
@@ -863,21 +862,33 @@ private boolean shouldAddPushInfo(MessageSendParams params) {
*
* This design avoids blocking agent-executor threads waiting for consumer polling to start,
* eliminating cascading delays when Vert.x worker threads are busy.
+ *
+ * @param doneCallback Callback to invoke when agent completes - MUST be added before starting CompletableFuture
*/
- private EnhancedRunnable registerAndExecuteAgentAsync(String taskId, RequestContext requestContext, EventQueue queue) {
+ private EnhancedRunnable registerAndExecuteAgentAsync(String taskId, RequestContext requestContext, EventQueue queue, EnhancedRunnable.DoneCallback doneCallback) {
LOGGER.debug("Registering agent execution for task {}, runningAgents.size() before: {}", taskId, runningAgents.size());
logThreadStats("AGENT START");
EnhancedRunnable runnable = new EnhancedRunnable() {
@Override
public void run() {
LOGGER.debug("Agent execution starting for task {}", taskId);
- agentExecutor.execute(requestContext, queue);
+ AgentEmitter emitter = new AgentEmitter(requestContext, queue);
+ try {
+ agentExecutor.execute(requestContext, emitter);
+ } catch (A2AError e) {
+ LOGGER.debug("Agent execution failed for task {} {}", taskId, e);
+ emitter.fail(e);
+ }
LOGGER.debug("Agent execution completed for task {}", taskId);
// The consumer (running on the Vert.x worker thread) handles queue lifecycle.
// This avoids blocking agent-executor threads waiting for worker threads.
}
};
+ // CRITICAL: Add callback BEFORE starting CompletableFuture to avoid race condition
+ // If agent completes very fast, whenComplete can fire before caller adds callbacks
+ runnable.addDoneCallback(doneCallback);
+
CompletableFuture cf = CompletableFuture.runAsync(runnable, executor)
.whenComplete((v, err) -> {
if (err != null) {
diff --git a/server-common/src/main/java/io/a2a/server/tasks/AgentEmitter.java b/server-common/src/main/java/io/a2a/server/tasks/AgentEmitter.java
new file mode 100644
index 000000000..f45cbfeea
--- /dev/null
+++ b/server-common/src/main/java/io/a2a/server/tasks/AgentEmitter.java
@@ -0,0 +1,621 @@
+package io.a2a.server.tasks;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import io.a2a.server.agentexecution.RequestContext;
+import io.a2a.server.events.EventQueue;
+import io.a2a.spec.A2AError;
+import io.a2a.spec.Artifact;
+import io.a2a.spec.Event;
+import io.a2a.spec.Message;
+import io.a2a.spec.Part;
+import io.a2a.spec.Task;
+import io.a2a.spec.TaskArtifactUpdateEvent;
+import io.a2a.spec.TaskState;
+import io.a2a.spec.TaskStatus;
+import io.a2a.spec.TaskStatusUpdateEvent;
+import io.a2a.spec.TextPart;
+import org.jspecify.annotations.Nullable;
+
+/**
+ * Helper for emitting events from AgentExecutor implementations.
+ *
+ * AgentEmitter provides a simplified API for agents to communicate with clients through
+ * the A2A protocol. It handles both task lifecycle management and direct message sending,
+ * automatically populating events with correct task and context IDs from the RequestContext.
+ *
+ *
Core Capabilities
+ *
+ * - Task Lifecycle: {@link #submit()}, {@link #startWork()}, {@link #complete()},
+ * {@link #fail()}, {@link #cancel()}, {@link #reject()}
+ * - Message Sending: {@link #sendMessage(String)}, {@link #sendMessage(List)},
+ * {@link #sendMessage(List, Map)}
+ * - Artifact Streaming: {@link #addArtifact(List)}, {@link #addArtifact(List, String, String, Map)}
+ * - Auth/Input Requirements: {@link #requiresAuth()}, {@link #requiresInput()}
+ * - Custom Events: {@link #taskBuilder()}, {@link #messageBuilder()}, {@link #addTask(Task)}, {@link #emitEvent(Event)}
+ *
+ *
+ * Usage Patterns
+ *
+ * Simple Message Response (No Task)
+ * {@code
+ * public void execute(RequestContext context, AgentEmitter emitter) {
+ * String response = processRequest(context.getUserInput("\n"));
+ * emitter.sendMessage(response);
+ * }
+ * }
+ *
+ * Task Lifecycle with Artifacts
+ * {@code
+ * public void execute(RequestContext context, AgentEmitter emitter) {
+ * if (context.getTask() == null) {
+ * emitter.submit(); // Create task in SUBMITTED state
+ * }
+ * emitter.startWork(); // Transition to WORKING
+ *
+ * // Process and stream results
+ * List> results = doWork(context.getUserInput("\n"));
+ * emitter.addArtifact(results);
+ *
+ * emitter.complete(); // Mark as COMPLETED
+ * }
+ * }
+ *
+ * Streaming Response (LLM)
+ * {@code
+ * public void execute(RequestContext context, AgentEmitter emitter) {
+ * emitter.startWork();
+ *
+ * for (String chunk : llmService.stream(context.getUserInput("\n"))) {
+ * emitter.addArtifact(List.of(new TextPart(chunk)));
+ * }
+ *
+ * emitter.complete();
+ * }
+ * }
+ *
+ * Event ID Management
+ * All emitted events are automatically populated with:
+ *
+ * - taskId: From RequestContext (may be null for message-only responses)
+ * - contextId: From RequestContext
+ * - messageId: Generated UUID for messages
+ * - artifactId: Generated UUID for artifacts (unless explicitly provided)
+ *
+ *
+ * Events are validated by the EventQueue to ensure taskId correctness.
+ *
+ * @see io.a2a.server.agentexecution.AgentExecutor
+ * @see RequestContext
+ * @see EventQueue
+ * @since 1.0.0
+ */
+public class AgentEmitter {
+ private final EventQueue eventQueue;
+ private final @Nullable String taskId;
+ private final @Nullable String contextId;
+ private volatile boolean terminalStateReached = false;
+
+ /**
+ * Creates a new AgentEmitter for the given request context and event queue.
+ *
+ * @param context the request context containing task and context IDs
+ * @param eventQueue the event queue for enqueueing events
+ */
+ public AgentEmitter(RequestContext context, EventQueue eventQueue) {
+ this.eventQueue = eventQueue;
+ this.taskId = context.getTaskId();
+ this.contextId = context.getContextId();
+ }
+
+ private void updateStatus(TaskState taskState) {
+ updateStatus(taskState, null, taskState.isFinal());
+ }
+
+ /**
+ * Updates the task status to the given state with an optional message.
+ *
+ * @param taskState the new task state
+ * @param message optional message to include with the status update
+ */
+ public void updateStatus(TaskState taskState, @Nullable Message message) {
+ updateStatus(taskState, message, taskState.isFinal());
+ }
+
+ /**
+ * Updates the task status to the given state with an optional message and finality flag.
+ *
+ * @param state the new task state
+ * @param message optional message to include with the status update
+ * @param isFinal whether this is a final status (prevents further updates)
+ */
+ private void updateStatus(TaskState state, @Nullable Message message, boolean isFinal) {
+ if (isFinal) {
+ if (terminalStateReached) {
+ throw new IllegalStateException("Cannot update task status - terminal state already reached");
+ }
+ terminalStateReached = true;
+ } else {
+ if (terminalStateReached) {
+ throw new IllegalStateException("Cannot update task status - terminal state already reached");
+ }
+ }
+
+ TaskStatusUpdateEvent event = TaskStatusUpdateEvent.builder()
+ .taskId(taskId)
+ .contextId(contextId)
+ .status(new TaskStatus(state, message, null))
+ .build();
+ eventQueue.enqueueEvent(event);
+ }
+
+ /**
+ * Returns the context ID for this emitter.
+ *
+ * @return the context ID, or null if not available
+ */
+ public @Nullable String getContextId() {
+ return this.contextId;
+ }
+
+ /**
+ * Returns the task ID for this emitter.
+ *
+ * @return the task ID, or null if no task is associated
+ */
+ public @Nullable String getTaskId() {
+ return this.taskId;
+ }
+
+ /**
+ * Adds an artifact with the given parts to the task.
+ *
+ * @param parts the parts to include in the artifact
+ */
+ public void addArtifact(List> parts) {
+ addArtifact(parts, null, null, null);
+ }
+
+ /**
+ * Adds an artifact with the given parts, artifact ID, name, and metadata.
+ *
+ * @param parts the parts to include in the artifact
+ * @param artifactId optional artifact ID (generated if null)
+ * @param name optional artifact name
+ * @param metadata optional metadata map
+ */
+ public void addArtifact(List> parts, @Nullable String artifactId, @Nullable String name, @Nullable Map metadata) {
+ addArtifact(parts, artifactId, name, metadata, null, null);
+ }
+
+ /**
+ * Adds an artifact with all optional parameters.
+ *
+ * @param parts the parts to include in the artifact
+ * @param artifactId optional artifact ID (generated if null)
+ * @param name optional artifact name
+ * @param metadata optional metadata map
+ * @param append whether to append to an existing artifact
+ * @param lastChunk whether this is the last chunk in a streaming sequence
+ */
+ public void addArtifact(List> parts, @Nullable String artifactId, @Nullable String name, @Nullable Map metadata,
+ @Nullable Boolean append, @Nullable Boolean lastChunk) {
+ if (artifactId == null) {
+ artifactId = UUID.randomUUID().toString();
+ }
+ TaskArtifactUpdateEvent event = TaskArtifactUpdateEvent.builder()
+ .taskId(taskId)
+ .contextId(contextId)
+ .artifact(
+ Artifact.builder()
+ .artifactId(artifactId)
+ .name(name)
+ .parts(parts)
+ .metadata(metadata)
+ .build()
+ )
+ .append(append)
+ .lastChunk(lastChunk)
+ .build();
+ eventQueue.enqueueEvent(event);
+ }
+
+ /**
+ * Marks the task as COMPLETED.
+ */
+ public void complete() {
+ complete(null);
+ }
+
+ /**
+ * Marks the task as COMPLETED with an optional message.
+ *
+ * @param message optional message to include with completion
+ */
+ public void complete(@Nullable Message message) {
+ updateStatus(TaskState.COMPLETED, message);
+ }
+
+ /**
+ * Marks the task as FAILED.
+ */
+ public void fail() {
+ fail((Message) null);
+ }
+
+ /**
+ * Marks the task as FAILED with an optional message.
+ *
+ * @param message optional message to include with failure
+ */
+ public void fail(@Nullable Message message) {
+ updateStatus(TaskState.FAILED, message);
+ }
+
+ /**
+ * Enqueues an A2A error event which will automatically transition the task to FAILED.
+ *
+ * Use this when you need to fail the task with a specific A2A error (such as
+ * {@link io.a2a.spec.UnsupportedOperationError}, {@link io.a2a.spec.InvalidRequestError},
+ * {@link io.a2a.spec.TaskNotFoundError}, etc.) that should be sent to the client.
+ *
+ *
+ * The error event is enqueued and the MainEventBusProcessor will automatically transition
+ * the task to FAILED state. This ensures thread-safe state transitions without race conditions,
+ * as the single-threaded MainEventBusProcessor handles all state updates.
+ *
+ *
+ * Error events are terminal (stop event consumption) and trigger automatic FAILED state transition.
+ * The error details are sent to the originating client only, while the FAILED status is replicated
+ * to all nodes in multi-instance deployments.
+ *
+ * Example usage:
+ *
{@code
+ * public void execute(RequestContext context, AgentEmitter emitter) {
+ * if (!isSupported(context.getMessage())) {
+ * emitter.fail(new UnsupportedOperationError("Feature not supported"));
+ * return;
+ * }
+ * // ... normal processing
+ * }
+ * }
+ *
+ * @param error the A2A error to enqueue and send to the client
+ * @since 1.0.0
+ */
+ public void fail(A2AError error) {
+ eventQueue.enqueueEvent(error);
+ // Status transition happens automatically in MainEventBusProcessor
+ // This eliminates race conditions from concurrent terminal state updates
+ }
+
+ /**
+ * Marks the task as SUBMITTED.
+ */
+ public void submit() {
+ submit(null);
+ }
+
+ /**
+ * Marks the task as SUBMITTED with an optional message.
+ *
+ * @param message optional message to include
+ */
+ public void submit(@Nullable Message message) {
+ updateStatus(TaskState.SUBMITTED, message);
+ }
+
+ /**
+ * Marks the task as WORKING (actively being processed).
+ */
+ public void startWork() {
+ startWork(null);
+ }
+
+ /**
+ * Marks the task as WORKING with an optional message.
+ *
+ * @param message optional message to include
+ */
+ public void startWork(@Nullable Message message) {
+ updateStatus(TaskState.WORKING, message);
+ }
+
+ /**
+ * Marks the task as CANCELED.
+ */
+ public void cancel() {
+ cancel(null);
+ }
+
+ /**
+ * Marks the task as CANCELED with an optional message.
+ *
+ * @param message optional message to include
+ */
+ public void cancel(@Nullable Message message) {
+ updateStatus(TaskState.CANCELED, message);
+ }
+
+ /**
+ * Marks the task as REJECTED.
+ */
+ public void reject() {
+ reject(null);
+ }
+
+ /**
+ * Marks the task as REJECTED with an optional message.
+ *
+ * @param message optional message to include
+ */
+ public void reject(@Nullable Message message) {
+ updateStatus(TaskState.REJECTED, message);
+ }
+
+ /**
+ * Marks the task as INPUT_REQUIRED, indicating the agent needs user input to continue.
+ */
+ public void requiresInput() {
+ requiresInput(null, false);
+ }
+
+ /**
+ * Marks the task as INPUT_REQUIRED with an optional message.
+ *
+ * @param message optional message to include
+ */
+ public void requiresInput(@Nullable Message message) {
+ requiresInput(message, false);
+ }
+
+ /**
+ * Marks the task as INPUT_REQUIRED with a finality flag.
+ *
+ * @param isFinal whether this is a final status (prevents further updates)
+ */
+ public void requiresInput(boolean isFinal) {
+ requiresInput(null, isFinal);
+ }
+
+ /**
+ * Marks the task as INPUT_REQUIRED with an optional message and finality flag.
+ *
+ * @param message optional message to include
+ * @param isFinal whether this is a final status (prevents further updates)
+ */
+ public void requiresInput(@Nullable Message message, boolean isFinal) {
+ updateStatus(TaskState.INPUT_REQUIRED, message, isFinal);
+ }
+
+ /**
+ * Marks the task as AUTH_REQUIRED, indicating the agent needs authentication to continue.
+ */
+ public void requiresAuth() {
+ requiresAuth(null, false);
+ }
+
+ /**
+ * Marks the task as AUTH_REQUIRED with an optional message.
+ *
+ * @param message optional message to include
+ */
+ public void requiresAuth(@Nullable Message message) {
+ requiresAuth(message, false);
+ }
+
+ /**
+ * Marks the task as AUTH_REQUIRED with a finality flag.
+ *
+ * @param isFinal whether this is a final status (prevents further updates)
+ */
+ public void requiresAuth(boolean isFinal) {
+ requiresAuth(null, isFinal);
+ }
+
+ /**
+ * Marks the task as AUTH_REQUIRED with an optional message and finality flag.
+ *
+ * @param message optional message to include
+ * @param isFinal whether this is a final status (prevents further updates)
+ */
+ public void requiresAuth(@Nullable Message message, boolean isFinal) {
+ updateStatus(TaskState.AUTH_REQUIRED, message, isFinal);
+ }
+
+ /**
+ * Creates a new agent message with the given parts and metadata.
+ * Pre-populates the message with agent role, task ID, context ID, and a generated message ID.
+ *
+ * @param parts the parts to include in the message
+ * @param metadata optional metadata to attach to the message
+ * @return a new Message object ready to be sent
+ */
+ public Message newAgentMessage(List> parts, @Nullable Map metadata) {
+ return Message.builder()
+ .role(Message.Role.AGENT)
+ .taskId(taskId)
+ .contextId(contextId)
+ .messageId(UUID.randomUUID().toString())
+ .metadata(metadata)
+ .parts(parts)
+ .build();
+ }
+
+ /**
+ * Sends a simple text message to the client.
+ * Convenience method for agents that respond with plain text without creating a task.
+ *
+ * @param text the text content to send
+ */
+ public void sendMessage(String text) {
+ sendMessage(List.of(new TextPart(text)));
+ }
+
+ /**
+ * Sends a message with custom parts (text, images, etc.) to the client.
+ * Use this for rich responses that don't require task lifecycle management.
+ *
+ * @param parts the message parts to send
+ */
+ public void sendMessage(List> parts) {
+ sendMessage(parts, null);
+ }
+
+ /**
+ * Sends a message with parts and metadata to the client.
+ * Creates an agent message with the current task and context IDs (if available)
+ * and enqueues it to the event queue.
+ *
+ * @param parts the message parts to send
+ * @param metadata optional metadata to attach to the message
+ */
+ public void sendMessage(List> parts, @Nullable Map metadata) {
+ Message message = newAgentMessage(parts, metadata);
+ eventQueue.enqueueEvent(message);
+ }
+
+ /**
+ * Sends an existing Message object directly to the client.
+ *
+ * Use this when you need to forward or echo an existing message without creating a new one.
+ * The message is enqueued as-is, preserving its messageId, metadata, and all other fields.
+ *
+ *
+ * Note: This is typically used for forwarding user messages or preserving specific
+ * message properties. For most cases, prefer {@link #sendMessage(String)} or
+ * {@link #sendMessage(List)} which create new agent messages with generated IDs.
+ *
+ * Example usage:
+ *
{@code
+ * public void execute(RequestContext context, AgentEmitter emitter) {
+ * // Echo the user's message back
+ * emitter.sendMessage(context.getMessage());
+ * }
+ * }
+ *
+ * @param message the message to send to the client
+ * @since 1.0.0
+ */
+ public void sendMessage(Message message) {
+ eventQueue.enqueueEvent(message);
+ }
+
+ /**
+ * Adds a custom Task object to be sent to the client.
+ *
+ * Use this when you need to create a Task with specific fields (history, artifacts, etc.)
+ * that the convenience methods like {@link #submit()}, {@link #startWork()}, or
+ * {@link #complete()} don't provide.
+ *
+ *
+ * Typical usage pattern: Build a task with {@link #taskBuilder()}, customize it,
+ * then add it with this method.
+ *
+ * Example usage:
+ *
{@code
+ * public void execute(RequestContext context, AgentEmitter emitter) {
+ * // Create a task with specific status and history
+ * Task task = emitter.taskBuilder()
+ * .status(new TaskStatus(TaskState.SUBMITTED))
+ * .history(List.of(context.getMessage()))
+ * .build();
+ * emitter.addTask(task);
+ * }
+ * }
+ *
+ * @param task the task to add
+ * @since 1.0.0
+ */
+ public void addTask(Task task) {
+ eventQueue.enqueueEvent(task);
+ }
+
+ /**
+ * Emits a custom Event object to the client.
+ *
+ * This is a general-purpose method for emitting any Event type. Most agents should use the
+ * convenience methods ({@link #sendMessage(String)}, {@link #addTask(Task)},
+ * {@link #addArtifact(List)}, {@link #complete()}, etc.), but this method provides flexibility
+ * for agents that need to create and emit custom events using the event builders.
+ *
+ * Example usage:
+ *
{@code
+ * public void execute(RequestContext context, AgentEmitter emitter) {
+ * // Create a custom TaskStatusUpdateEvent
+ * TaskStatusUpdateEvent event = TaskStatusUpdateEvent.builder()
+ * .taskId(context.getTaskId())
+ * .contextId(context.getContextId())
+ * .status(new TaskStatus(TaskState.WORKING))
+ * .isFinal(false)
+ * .build();
+ * emitter.emitEvent(event);
+ * }
+ * }
+ *
+ * @param event the event to emit
+ * @since 1.0.0
+ */
+ public void emitEvent(Event event) {
+ eventQueue.enqueueEvent(event);
+ }
+
+ /**
+ * Creates a Task.Builder pre-populated with the correct task and context IDs.
+ * Agents can customize other Task fields (status, artifacts, etc.) before calling build().
+ *
+ * Example usage:
+ *
{@code
+ * Task task = emitter.taskBuilder()
+ * .status(new TaskStatus(TaskState.WORKING))
+ * .build();
+ * }
+ *
+ * @return a Task.Builder with id and contextId already set
+ */
+ public Task.Builder taskBuilder() {
+ return Task.builder()
+ .id(taskId)
+ .contextId(contextId);
+ }
+
+ /**
+ * Creates a Message.Builder pre-populated with agent defaults.
+ * Sets taskId only if non-null (messages can exist independently of tasks).
+ *
+ * Pre-populated fields:
+ *
+ * - taskId - set only if this AgentEmitter has a non-null taskId
+ * - contextId - current context ID
+ * - role - Message.Role.AGENT
+ * - messageId - generated UUID
+ *
+ *
+ * Example usage:
+ *
{@code
+ * Message msg = emitter.messageBuilder()
+ * .parts(List.of(new TextPart("Hello")))
+ * .metadata(Map.of("key", "value"))
+ * .build();
+ * }
+ *
+ * @return a Message.Builder with common agent fields already set
+ */
+ public Message.Builder messageBuilder() {
+ Message.Builder builder = Message.builder()
+ .contextId(contextId)
+ .role(Message.Role.AGENT)
+ .messageId(UUID.randomUUID().toString());
+
+ // Only set taskId if present (messages can exist without tasks)
+ if (taskId != null) {
+ builder.taskId(taskId);
+ }
+
+ return builder;
+ }
+
+}
diff --git a/server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java b/server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java
index 506b3f3b6..b36868a63 100644
--- a/server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java
+++ b/server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java
@@ -166,7 +166,8 @@ public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer,
// Determine interrupt behavior
boolean shouldInterrupt = false;
boolean isFinalEvent = (event instanceof Task task && task.status().state().isFinal())
- || (event instanceof TaskStatusUpdateEvent tsue && tsue.isFinal());
+ || (event instanceof TaskStatusUpdateEvent tsue && tsue.isFinal())
+ || (event instanceof A2AError); // A2AError events are terminal
boolean isAuthRequired = (event instanceof Task task && task.status().state() == TaskState.AUTH_REQUIRED)
|| (event instanceof TaskStatusUpdateEvent tsue && tsue.status().state() == TaskState.AUTH_REQUIRED);
diff --git a/server-common/src/main/java/io/a2a/server/tasks/TaskManager.java b/server-common/src/main/java/io/a2a/server/tasks/TaskManager.java
index 948ec596c..abb764755 100644
--- a/server-common/src/main/java/io/a2a/server/tasks/TaskManager.java
+++ b/server-common/src/main/java/io/a2a/server/tasks/TaskManager.java
@@ -1,5 +1,6 @@
package io.a2a.server.tasks;
+import static io.a2a.spec.TaskState.FAILED;
import static io.a2a.spec.TaskState.SUBMITTED;
import static io.a2a.util.Assert.checkNotNullParam;
import static io.a2a.util.Utils.appendArtifactToTask;
@@ -10,6 +11,7 @@
import java.util.List;
import java.util.Map;
+import io.a2a.spec.A2AError;
import io.a2a.spec.A2AServerException;
import io.a2a.spec.Event;
import io.a2a.spec.InternalError;
@@ -112,6 +114,42 @@ public boolean process(Event event, boolean isReplicated) throws A2AServerExcept
isFinal = saveTaskEvent(taskStatusUpdateEvent, isReplicated);
} else if (event instanceof TaskArtifactUpdateEvent taskArtifactUpdateEvent) {
isFinal = saveTaskEvent(taskArtifactUpdateEvent, isReplicated);
+ } else if (event instanceof A2AError) {
+ // A2AError events trigger automatic transition to FAILED state
+ // Error details are NOT persisted in TaskStore (client-specific)
+ // Only the FAILED status is persisted and replicated across nodes
+
+ // A2AError events don't have taskId/contextId fields, so we need to ensure
+ // we have these from the existing task or TaskManager state
+ if (taskId == null) {
+ // No task context - A2AError event will be distributed to clients but no state update
+ LOGGER.debug("A2AError event without task context - skipping state update");
+ return true; // Return true (is final) to stop event consumption
+ }
+
+ // Ensure we have contextId - get from existing task if not set
+ String errorContextId = contextId;
+ if (errorContextId == null) {
+ Task existingTask = getTask();
+ if (existingTask != null) {
+ errorContextId = existingTask.contextId();
+ }
+ }
+
+ // Only create status update if we have contextId
+ if (errorContextId != null) {
+ LOGGER.debug("A2AError event detected, transitioning task {} to FAILED", taskId);
+ TaskStatusUpdateEvent failedEvent = TaskStatusUpdateEvent.builder()
+ .taskId(taskId)
+ .contextId(errorContextId)
+ .status(new TaskStatus(FAILED))
+ .build();
+ isFinal = saveTaskEvent(failedEvent, isReplicated);
+ } else {
+ // Can't update status without contextId, but error is still terminal
+ LOGGER.debug("A2AError event for task {} without contextId - skipping state update", taskId);
+ isFinal = true;
+ }
}
return isFinal;
}
diff --git a/server-common/src/main/java/io/a2a/server/tasks/TaskUpdater.java b/server-common/src/main/java/io/a2a/server/tasks/TaskUpdater.java
deleted file mode 100644
index c28e55c5c..000000000
--- a/server-common/src/main/java/io/a2a/server/tasks/TaskUpdater.java
+++ /dev/null
@@ -1,190 +0,0 @@
-package io.a2a.server.tasks;
-
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import io.a2a.server.agentexecution.RequestContext;
-import io.a2a.server.events.EventQueue;
-import io.a2a.spec.Artifact;
-import io.a2a.spec.Message;
-import io.a2a.spec.Part;
-import io.a2a.spec.TaskArtifactUpdateEvent;
-import io.a2a.spec.TaskState;
-import io.a2a.spec.TaskStatus;
-import io.a2a.spec.TaskStatusUpdateEvent;
-import org.jspecify.annotations.Nullable;
-
-public class TaskUpdater {
- private final EventQueue eventQueue;
- private final @Nullable String taskId;
- private final @Nullable String contextId;
- private final AtomicBoolean terminalStateReached = new AtomicBoolean(false);
- private final Object stateLock = new Object();
-
- public TaskUpdater(RequestContext context, EventQueue eventQueue) {
- this.eventQueue = eventQueue;
- this.taskId = context.getTaskId();
- this.contextId = context.getContextId();
- }
-
- private void updateStatus(TaskState taskState) {
- updateStatus(taskState, null, taskState.isFinal());
- }
-
- public void updateStatus(TaskState taskState, @Nullable Message message) {
- updateStatus(taskState, message, taskState.isFinal());
- }
-
- public void updateStatus(TaskState state, @Nullable Message message, boolean isFinal) {
- synchronized (stateLock) {
- // Check if we're already in a terminal state
- if (terminalStateReached.get()) {
- throw new IllegalStateException("Cannot update task status - terminal state already reached");
- }
-
- // If this is a final state, set the flag
- if (isFinal) {
- terminalStateReached.set(true);
- }
-
- TaskStatusUpdateEvent event = TaskStatusUpdateEvent.builder()
- .taskId(taskId)
- .contextId(contextId)
- .status(new TaskStatus(state, message, null))
- .build();
- eventQueue.enqueueEvent(event);
- }
- }
-
- public @Nullable String getContextId() {
- return this.contextId;
- }
-
- public @Nullable String getTaskId() {
- return this.taskId;
- }
-
- public void addArtifact(List> parts) {
- addArtifact(parts, null, null, null);
- }
-
- public void addArtifact(List> parts, @Nullable String artifactId, @Nullable String name, @Nullable Map metadata) {
- addArtifact(parts, artifactId, name, metadata, null, null);
- }
-
- public void addArtifact(List> parts, @Nullable String artifactId, @Nullable String name, @Nullable Map metadata,
- @Nullable Boolean append, @Nullable Boolean lastChunk) {
- if (artifactId == null) {
- artifactId = UUID.randomUUID().toString();
- }
- TaskArtifactUpdateEvent event = TaskArtifactUpdateEvent.builder()
- .taskId(taskId)
- .contextId(contextId)
- .artifact(
- Artifact.builder()
- .artifactId(artifactId)
- .name(name)
- .parts(parts)
- .metadata(metadata)
- .build()
- )
- .append(append)
- .lastChunk(lastChunk)
- .build();
- eventQueue.enqueueEvent(event);
- }
-
- public void complete() {
- complete(null);
- }
-
- public void complete(@Nullable Message message) {
- updateStatus(TaskState.COMPLETED, message);
- }
-
- public void fail() {
- fail(null);
- }
-
- public void fail(@Nullable Message message) {
- updateStatus(TaskState.FAILED, message);
- }
-
- public void submit() {
- submit(null);
- }
-
- public void submit(@Nullable Message message) {
- updateStatus(TaskState.SUBMITTED, message);
- }
-
- public void startWork() {
- startWork(null);
- }
-
- public void startWork(@Nullable Message message) {
- updateStatus(TaskState.WORKING, message);
- }
-
- public void cancel() {
- cancel(null);
- }
-
- public void cancel(@Nullable Message message) {
- updateStatus(TaskState.CANCELED, message);
- }
-
- public void reject() {
- reject(null);
- }
-
- public void reject(@Nullable Message message) {
- updateStatus(TaskState.REJECTED, message);
- }
-
- public void requiresInput() {
- requiresInput(null, false);
- }
-
- public void requiresInput(@Nullable Message message) {
- requiresInput(message, false);
- }
-
- public void requiresInput(boolean isFinal) {
- requiresInput(null, isFinal);
- }
-
- public void requiresInput(@Nullable Message message, boolean isFinal) {
- updateStatus(TaskState.INPUT_REQUIRED, message, isFinal);
- }
-
- public void requiresAuth() {
- requiresAuth(null, false);
- }
-
- public void requiresAuth(@Nullable Message message) {
- requiresAuth(message, false);
- }
-
- public void requiresAuth(boolean isFinal) {
- requiresAuth(null, isFinal);
- }
-
- public void requiresAuth(@Nullable Message message, boolean isFinal) {
- updateStatus(TaskState.AUTH_REQUIRED, message, isFinal);
- }
-
- public Message newAgentMessage(List> parts, @Nullable Map metadata) {
- return Message.builder()
- .role(Message.Role.AGENT)
- .taskId(taskId)
- .contextId(contextId)
- .messageId(UUID.randomUUID().toString())
- .metadata(metadata)
- .parts(parts)
- .build();
- }
-
-}
diff --git a/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java b/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java
index 968937b89..1089c7e6f 100644
--- a/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java
+++ b/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java
@@ -23,7 +23,7 @@
import io.a2a.jsonrpc.common.json.JsonUtil;
import io.a2a.server.agentexecution.AgentExecutor;
import io.a2a.server.agentexecution.RequestContext;
-import io.a2a.server.events.EventQueue;
+import io.a2a.server.tasks.AgentEmitter;
import io.a2a.server.events.EventQueueItem;
import io.a2a.server.events.EventQueueUtil;
import io.a2a.server.events.InMemoryQueueManager;
@@ -86,16 +86,16 @@ public class AbstractA2ARequestHandlerTest {
public void init() {
executor = new AgentExecutor() {
@Override
- public void execute(RequestContext context, EventQueue eventQueue) throws A2AError {
+ public void execute(RequestContext context, AgentEmitter agentEmitter) throws A2AError {
if (agentExecutorExecute != null) {
- agentExecutorExecute.invoke(context, eventQueue);
+ agentExecutorExecute.invoke(context, agentEmitter);
}
}
@Override
- public void cancel(RequestContext context, EventQueue eventQueue) throws A2AError {
+ public void cancel(RequestContext context, AgentEmitter agentEmitter) throws A2AError {
if (agentExecutorCancel != null) {
- agentExecutorCancel.invoke(context, eventQueue);
+ agentExecutorCancel.invoke(context, agentEmitter);
}
}
};
@@ -165,7 +165,7 @@ private static String loadPreferredTransportFromProperties() {
}
protected interface AgentExecutorMethod {
- void invoke(RequestContext context, EventQueue eventQueue) throws A2AError;
+ void invoke(RequestContext context, AgentEmitter agentEmitter) throws A2AError;
}
/**
diff --git a/server-common/src/test/java/io/a2a/server/tasks/TaskUpdaterTest.java b/server-common/src/test/java/io/a2a/server/tasks/AgentEmitterTest.java
similarity index 87%
rename from server-common/src/test/java/io/a2a/server/tasks/TaskUpdaterTest.java
rename to server-common/src/test/java/io/a2a/server/tasks/AgentEmitterTest.java
index a5ec77f01..853037a9c 100644
--- a/server-common/src/test/java/io/a2a/server/tasks/TaskUpdaterTest.java
+++ b/server-common/src/test/java/io/a2a/server/tasks/AgentEmitterTest.java
@@ -30,7 +30,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-public class TaskUpdaterTest {
+public class AgentEmitterTest {
public static final String TEST_TASK_ID = "test-task-id";
public static final String TEST_TASK_CONTEXT_ID = "test-task-context-id";
@@ -48,7 +48,7 @@ public class TaskUpdaterTest {
EventQueue eventQueue;
private MainEventBus mainEventBus;
private MainEventBusProcessor mainEventBusProcessor;
- private TaskUpdater taskUpdater;
+ private AgentEmitter agentEmitter;
@@ -69,7 +69,7 @@ public void init() {
.setTaskId(TEST_TASK_ID)
.setContextId(TEST_TASK_CONTEXT_ID)
.build();
- taskUpdater = new TaskUpdater(context, eventQueue);
+ agentEmitter = new AgentEmitter(context, eventQueue);
}
@AfterEach
@@ -81,7 +81,7 @@ public void cleanup() {
@Test
public void testAddArtifactWithCustomIdAndName() throws Exception {
- taskUpdater.addArtifact(SAMPLE_PARTS, "custom-artifact-id", "Custom Artifact", null);
+ agentEmitter.addArtifact(SAMPLE_PARTS, "custom-artifact-id", "Custom Artifact", null);
EventQueueItem item = eventQueue.dequeueEventItem(5000);
assertNotNull(item);
Event event = item.getEvent();
@@ -101,147 +101,147 @@ public void testAddArtifactWithCustomIdAndName() throws Exception {
@Test
public void testCompleteWithoutMessage() throws Exception {
- taskUpdater.complete();
+ agentEmitter.complete();
checkTaskStatusUpdateEventOnQueue(true, TaskState.COMPLETED, null);
}
@Test
public void testCompleteWithMessage() throws Exception {
- taskUpdater.complete(SAMPLE_MESSAGE);
+ agentEmitter.complete(SAMPLE_MESSAGE);
checkTaskStatusUpdateEventOnQueue(true, TaskState.COMPLETED, SAMPLE_MESSAGE);
}
@Test
public void testSubmitWithoutMessage() throws Exception {
- taskUpdater.submit();
+ agentEmitter.submit();
checkTaskStatusUpdateEventOnQueue(false, TaskState.SUBMITTED, null);
}
@Test
public void testSubmitWithMessage() throws Exception {
- taskUpdater.submit(SAMPLE_MESSAGE);
+ agentEmitter.submit(SAMPLE_MESSAGE);
checkTaskStatusUpdateEventOnQueue(false, TaskState.SUBMITTED, SAMPLE_MESSAGE);
}
@Test
public void testStartWorkWithoutMessage() throws Exception {
- taskUpdater.startWork();
+ agentEmitter.startWork();
checkTaskStatusUpdateEventOnQueue(false, TaskState.WORKING, null);
}
@Test
public void testStartWorkWithMessage() throws Exception {
- taskUpdater.startWork(SAMPLE_MESSAGE);
+ agentEmitter.startWork(SAMPLE_MESSAGE);
checkTaskStatusUpdateEventOnQueue(false, TaskState.WORKING, SAMPLE_MESSAGE);
}
@Test
public void testFailedWithoutMessage() throws Exception {
- taskUpdater.fail();
+ agentEmitter.fail();
checkTaskStatusUpdateEventOnQueue(true, TaskState.FAILED, null);
}
@Test
public void testFailedWithMessage() throws Exception {
- taskUpdater.fail(SAMPLE_MESSAGE);
+ agentEmitter.fail(SAMPLE_MESSAGE);
checkTaskStatusUpdateEventOnQueue(true, TaskState.FAILED, SAMPLE_MESSAGE);
}
@Test
public void testCanceledWithoutMessage() throws Exception {
- taskUpdater.cancel();
+ agentEmitter.cancel();
checkTaskStatusUpdateEventOnQueue(true, TaskState.CANCELED, null);
}
@Test
public void testCanceledWithMessage() throws Exception {
- taskUpdater.cancel(SAMPLE_MESSAGE);
+ agentEmitter.cancel(SAMPLE_MESSAGE);
checkTaskStatusUpdateEventOnQueue(true, TaskState.CANCELED, SAMPLE_MESSAGE);
}
@Test
public void testRejectWithoutMessage() throws Exception {
- taskUpdater.reject();
+ agentEmitter.reject();
checkTaskStatusUpdateEventOnQueue(true, TaskState.REJECTED, null);
}
@Test
public void testRejectWithMessage() throws Exception {
- taskUpdater.reject(SAMPLE_MESSAGE);
+ agentEmitter.reject(SAMPLE_MESSAGE);
checkTaskStatusUpdateEventOnQueue(true, TaskState.REJECTED, SAMPLE_MESSAGE);
}
@Test
public void testRequiresInputWithoutMessage() throws Exception {
- taskUpdater.requiresInput();
+ agentEmitter.requiresInput();
checkTaskStatusUpdateEventOnQueue(false, TaskState.INPUT_REQUIRED, null);
}
@Test
public void testRequiresInputWithMessage() throws Exception {
- taskUpdater.requiresInput(SAMPLE_MESSAGE);
+ agentEmitter.requiresInput(SAMPLE_MESSAGE);
checkTaskStatusUpdateEventOnQueue(false, TaskState.INPUT_REQUIRED, SAMPLE_MESSAGE);
}
@Test
public void testRequiresInputWithFinalTrue() throws Exception {
- taskUpdater.requiresInput(true);
+ agentEmitter.requiresInput(true);
checkTaskStatusUpdateEventOnQueue(false, TaskState.INPUT_REQUIRED, null);
}
@Test
public void testRequiresInputWithMessageAndFinalTrue() throws Exception {
- taskUpdater.requiresInput(SAMPLE_MESSAGE, true);
+ agentEmitter.requiresInput(SAMPLE_MESSAGE, true);
checkTaskStatusUpdateEventOnQueue(false, TaskState.INPUT_REQUIRED, SAMPLE_MESSAGE);
}
@Test
public void testRequiresAuthWithoutMessage() throws Exception {
- taskUpdater.requiresAuth();
+ agentEmitter.requiresAuth();
checkTaskStatusUpdateEventOnQueue(false, TaskState.AUTH_REQUIRED, null);
}
@Test
public void testRequiresAuthWithMessage() throws Exception {
- taskUpdater.requiresAuth(SAMPLE_MESSAGE);
+ agentEmitter.requiresAuth(SAMPLE_MESSAGE);
checkTaskStatusUpdateEventOnQueue(false, TaskState.AUTH_REQUIRED, SAMPLE_MESSAGE);
}
@Test
public void testRequiresAuthWithFinalTrue() throws Exception {
- taskUpdater.requiresAuth(true);
+ agentEmitter.requiresAuth(true);
checkTaskStatusUpdateEventOnQueue(false, TaskState.AUTH_REQUIRED, null);
}
@Test
public void testRequiresAuthWithMessageAndFinalTrue() throws Exception {
- taskUpdater.requiresAuth(SAMPLE_MESSAGE, true);
+ agentEmitter.requiresAuth(SAMPLE_MESSAGE, true);
checkTaskStatusUpdateEventOnQueue(false, TaskState.AUTH_REQUIRED, SAMPLE_MESSAGE);
}
@Test
public void testNonTerminalStateUpdatesAllowed() throws Exception {
// Non-terminal states should be allowed multiple times
- taskUpdater.submit();
+ agentEmitter.submit();
checkTaskStatusUpdateEventOnQueue(false, TaskState.SUBMITTED, null);
- taskUpdater.startWork();
+ agentEmitter.startWork();
checkTaskStatusUpdateEventOnQueue(false, TaskState.WORKING, null);
- taskUpdater.requiresInput();
+ agentEmitter.requiresInput();
checkTaskStatusUpdateEventOnQueue(false, TaskState.INPUT_REQUIRED, null);
- taskUpdater.requiresAuth();
+ agentEmitter.requiresAuth();
checkTaskStatusUpdateEventOnQueue(false, TaskState.AUTH_REQUIRED, null);
// Should still be able to complete
- taskUpdater.complete();
+ agentEmitter.complete();
checkTaskStatusUpdateEventOnQueue(true, TaskState.COMPLETED, null);
}
@Test
public void testNewAgentMessage() throws Exception {
- Message message = taskUpdater.newAgentMessage(SAMPLE_PARTS, null);
+ Message message = agentEmitter.newAgentMessage(SAMPLE_PARTS, null);
assertEquals(AGENT, message.role());
assertEquals(TEST_TASK_ID, message.taskId());
@@ -254,7 +254,7 @@ public void testNewAgentMessage() throws Exception {
@Test
public void testNewAgentMessageWithMetadata() throws Exception {
Map metadata = Map.of("key", "value");
- Message message = taskUpdater.newAgentMessage(SAMPLE_PARTS, metadata);
+ Message message = agentEmitter.newAgentMessage(SAMPLE_PARTS, metadata);
assertEquals(AGENT, message.role());
assertEquals(TEST_TASK_ID, message.taskId());
@@ -266,7 +266,7 @@ public void testNewAgentMessageWithMetadata() throws Exception {
@Test
public void testAddArtifactWithAppendTrue() throws Exception {
- taskUpdater.addArtifact(SAMPLE_PARTS, "artifact-id", "Test Artifact", null, true, null);
+ agentEmitter.addArtifact(SAMPLE_PARTS, "artifact-id", "Test Artifact", null, true, null);
EventQueueItem item = eventQueue.dequeueEventItem(5000);
assertNotNull(item);
Event event = item.getEvent();
@@ -287,7 +287,7 @@ public void testAddArtifactWithAppendTrue() throws Exception {
@Test
public void testAddArtifactWithLastChunkTrue() throws Exception {
- taskUpdater.addArtifact(SAMPLE_PARTS, "artifact-id", "Test Artifact", null, null, true);
+ agentEmitter.addArtifact(SAMPLE_PARTS, "artifact-id", "Test Artifact", null, null, true);
EventQueueItem item = eventQueue.dequeueEventItem(5000);
assertNotNull(item);
Event event = item.getEvent();
@@ -304,7 +304,7 @@ public void testAddArtifactWithLastChunkTrue() throws Exception {
@Test
public void testAddArtifactWithAppendAndLastChunk() throws Exception {
- taskUpdater.addArtifact(SAMPLE_PARTS, "artifact-id", "Test Artifact", null, true, false);
+ agentEmitter.addArtifact(SAMPLE_PARTS, "artifact-id", "Test Artifact", null, true, false);
EventQueueItem item = eventQueue.dequeueEventItem(5000);
assertNotNull(item);
Event event = item.getEvent();
@@ -320,7 +320,7 @@ public void testAddArtifactWithAppendAndLastChunk() throws Exception {
@Test
public void testAddArtifactGeneratesIdWhenNull() throws Exception {
- taskUpdater.addArtifact(SAMPLE_PARTS, null, "Test Artifact", null);
+ agentEmitter.addArtifact(SAMPLE_PARTS, null, "Test Artifact", null);
EventQueueItem item = eventQueue.dequeueEventItem(5000);
assertNotNull(item);
Event event = item.getEvent();
@@ -340,11 +340,11 @@ public void testAddArtifactGeneratesIdWhenNull() throws Exception {
@Test
public void testTerminalStateProtectionAfterComplete() throws Exception {
// Complete the task first
- taskUpdater.complete();
+ agentEmitter.complete();
checkTaskStatusUpdateEventOnQueue(true, TaskState.COMPLETED, null);
// Try to update status again - should throw RuntimeException
- RuntimeException exception = assertThrows(RuntimeException.class, () -> taskUpdater.startWork());
+ RuntimeException exception = assertThrows(RuntimeException.class, () -> agentEmitter.startWork());
assertEquals("Cannot update task status - terminal state already reached", exception.getMessage());
// Verify no additional events were queued
@@ -354,11 +354,11 @@ public void testTerminalStateProtectionAfterComplete() throws Exception {
@Test
public void testTerminalStateProtectionAfterFail() throws Exception {
// Fail the task first
- taskUpdater.fail();
+ agentEmitter.fail();
checkTaskStatusUpdateEventOnQueue(true, TaskState.FAILED, null);
// Try to update status again - should throw RuntimeException
- RuntimeException exception = assertThrows(RuntimeException.class, () -> taskUpdater.complete());
+ RuntimeException exception = assertThrows(RuntimeException.class, () -> agentEmitter.complete());
assertEquals("Cannot update task status - terminal state already reached", exception.getMessage());
// Verify no additional events were queued
@@ -368,11 +368,11 @@ public void testTerminalStateProtectionAfterFail() throws Exception {
@Test
public void testTerminalStateProtectionAfterReject() throws Exception {
// Reject the task first
- taskUpdater.reject();
+ agentEmitter.reject();
checkTaskStatusUpdateEventOnQueue(true, TaskState.REJECTED, null);
// Try to update status again - should throw RuntimeException
- RuntimeException exception = assertThrows(RuntimeException.class, () -> taskUpdater.startWork());
+ RuntimeException exception = assertThrows(RuntimeException.class, () -> agentEmitter.startWork());
assertEquals("Cannot update task status - terminal state already reached", exception.getMessage());
// Verify no additional events were queued
@@ -382,11 +382,11 @@ public void testTerminalStateProtectionAfterReject() throws Exception {
@Test
public void testTerminalStateProtectionAfterCancel() throws Exception {
// Cancel the task first
- taskUpdater.cancel();
+ agentEmitter.cancel();
checkTaskStatusUpdateEventOnQueue(true, TaskState.CANCELED, null);
// Try to update status again - should throw RuntimeException
- RuntimeException exception = assertThrows(RuntimeException.class, () -> taskUpdater.submit());
+ RuntimeException exception = assertThrows(RuntimeException.class, () -> agentEmitter.submit());
assertEquals("Cannot update task status - terminal state already reached", exception.getMessage());
// Verify no additional events were queued
@@ -398,7 +398,7 @@ public void testConcurrentCompletionAttempts() throws Exception {
// This test simulates race condition between multiple completion attempts
Thread thread1 = new Thread(() -> {
try {
- taskUpdater.complete();
+ agentEmitter.complete();
} catch (RuntimeException e) {
// Expected for one of the threads
}
@@ -406,7 +406,7 @@ public void testConcurrentCompletionAttempts() throws Exception {
Thread thread2 = new Thread(() -> {
try {
- taskUpdater.fail();
+ agentEmitter.fail();
} catch (RuntimeException e) {
// Expected for one of the threads
}
diff --git a/tck/src/main/java/io/a2a/tck/server/AgentExecutorProducer.java b/tck/src/main/java/io/a2a/tck/server/AgentExecutorProducer.java
index 848ced9ec..9d16a86ea 100644
--- a/tck/src/main/java/io/a2a/tck/server/AgentExecutorProducer.java
+++ b/tck/src/main/java/io/a2a/tck/server/AgentExecutorProducer.java
@@ -1,20 +1,20 @@
package io.a2a.tck.server;
+import java.util.List;
+
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import io.a2a.server.agentexecution.AgentExecutor;
import io.a2a.server.agentexecution.RequestContext;
-import io.a2a.server.events.EventQueue;
-import io.a2a.server.tasks.TaskUpdater;
+import io.a2a.server.tasks.AgentEmitter;
import io.a2a.spec.A2AError;
import io.a2a.spec.Task;
import io.a2a.spec.TaskNotCancelableError;
import io.a2a.spec.TaskState;
import io.a2a.spec.TaskStatus;
import io.a2a.spec.TaskStatusUpdateEvent;
-import java.util.List;
@ApplicationScoped
public class AgentExecutorProducer {
@@ -27,7 +27,7 @@ public AgentExecutor agentExecutor() {
private static class FireAndForgetAgentExecutor implements AgentExecutor {
@Override
- public void execute(RequestContext context, EventQueue eventQueue) throws A2AError {
+ public void execute(RequestContext context, AgentEmitter agentEmitter) throws A2AError {
Task task = context.getTask();
if (task == null) {
@@ -46,7 +46,7 @@ public void execute(RequestContext context, EventQueue eventQueue) throws A2AErr
.status(new TaskStatus(TaskState.SUBMITTED))
.history(List.of(context.getMessage()))
.build();
- eventQueue.enqueueEvent(task);
+ agentEmitter.addTask(task);
}
// Sleep to allow task state persistence before TCK subscribe test
@@ -59,10 +59,9 @@ public void execute(RequestContext context, EventQueue eventQueue) throws A2AErr
Thread.currentThread().interrupt();
}
}
- TaskUpdater updater = new TaskUpdater(context, eventQueue);
// Immediately set to WORKING state
- updater.startWork();
+ agentEmitter.startWork();
System.out.println("====> task set to WORKING, starting background execution");
// Method returns immediately - task continues in background
@@ -70,7 +69,7 @@ public void execute(RequestContext context, EventQueue eventQueue) throws A2AErr
}
@Override
- public void cancel(RequestContext context, EventQueue eventQueue) throws A2AError {
+ public void cancel(RequestContext context, AgentEmitter agentEmitter) throws A2AError {
System.out.println("====> task cancel request received");
Task task = context.getTask();
if (task == null) {
@@ -87,15 +86,7 @@ public void cancel(RequestContext context, EventQueue eventQueue) throws A2AErro
throw new TaskNotCancelableError();
}
- TaskUpdater updater = new TaskUpdater(context, eventQueue);
- updater.cancel();
- eventQueue.enqueueEvent(new TaskStatusUpdateEvent(
- task.id(),
- new TaskStatus(TaskState.CANCELED),
- task.contextId(),
- true, // isFinal - TaskState.CANCELED is a final state
- null));
-
+ agentEmitter.cancel();
System.out.println("====> task canceled");
}
diff --git a/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java b/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java
index 364d2275f..40839e7fb 100644
--- a/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java
+++ b/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java
@@ -7,8 +7,7 @@
import io.a2a.server.agentexecution.AgentExecutor;
import io.a2a.server.agentexecution.RequestContext;
-import io.a2a.server.events.EventQueue;
-import io.a2a.server.tasks.TaskUpdater;
+import io.a2a.server.tasks.AgentEmitter;
import io.a2a.spec.A2AError;
import io.a2a.spec.InvalidParamsError;
import io.a2a.spec.Message;
@@ -25,23 +24,22 @@ public class AgentExecutorProducer {
public AgentExecutor agentExecutor() {
return new AgentExecutor() {
@Override
- public void execute(RequestContext context, EventQueue eventQueue) throws A2AError {
- TaskUpdater updater = new TaskUpdater(context, eventQueue);
+ public void execute(RequestContext context, AgentEmitter agentEmitter) throws A2AError {
String taskId = context.getTaskId();
// Special handling for multi-event test
if (taskId != null && taskId.startsWith("multi-event-test")) {
// First call: context.getTask() == null (new task)
if (context.getTask() == null) {
- updater.startWork();
+ agentEmitter.startWork();
// Return immediately - queue stays open because task is in WORKING state
return;
} else {
// Second call: context.getTask() != null (existing task)
- updater.addArtifact(
+ agentEmitter.addArtifact(
List.of(new TextPart("Second message artifact")),
"artifact-2", "Second Artifact", null);
- updater.complete();
+ agentEmitter.complete();
return;
}
}
@@ -50,8 +48,8 @@ public void execute(RequestContext context, EventQueue eventQueue) throws A2AErr
if (taskId != null && taskId.startsWith("input-required-test")) {
// First call: context.getTask() == null (new task)
if (context.getTask() == null) {
- updater.startWork();
- updater.requiresInput(updater.newAgentMessage(
+ agentEmitter.startWork();
+ agentEmitter.requiresInput(agentEmitter.newAgentMessage(
List.of(new TextPart("Please provide additional information")),
context.getMessage().metadata()));
// Return immediately - queue stays open because task is in INPUT_REQUIRED state
@@ -62,23 +60,26 @@ public void execute(RequestContext context, EventQueue eventQueue) throws A2AErr
throw new InvalidParamsError("We didn't get the expected input");
}
// Second call: context.getTask() != null (input provided)
- updater.startWork();
- updater.complete();
+ agentEmitter.startWork();
+ agentEmitter.complete();
return;
}
}
if (context.getTaskId().equals("task-not-supported-123")) {
- eventQueue.enqueueEvent(new UnsupportedOperationError());
+ throw new UnsupportedOperationError();
+ }
+ if (context.getMessage() != null) {
+ agentEmitter.sendMessage(context.getMessage());
+ } else {
+ agentEmitter.addTask(context.getTask());
}
- eventQueue.enqueueEvent(context.getMessage() != null ? context.getMessage() : context.getTask());
}
@Override
- public void cancel(RequestContext context, EventQueue eventQueue) throws A2AError {
+ public void cancel(RequestContext context, AgentEmitter agentEmitter) throws A2AError {
if (context.getTask().id().equals("cancel-task-123")) {
- TaskUpdater taskUpdater = new TaskUpdater(context, eventQueue);
- taskUpdater.cancel();
+ agentEmitter.cancel();
} else if (context.getTask().id().equals("cancel-task-not-supported-123")) {
throw new UnsupportedOperationError();
}
diff --git a/transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcHandlerTest.java b/transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcHandlerTest.java
index 2bd8c8453..4eb05ecb8 100644
--- a/transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcHandlerTest.java
+++ b/transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcHandlerTest.java
@@ -23,7 +23,7 @@
import io.a2a.server.requesthandlers.AbstractA2ARequestHandlerTest;
import io.a2a.server.requesthandlers.DefaultRequestHandler;
import io.a2a.server.requesthandlers.RequestHandler;
-import io.a2a.server.tasks.TaskUpdater;
+import io.a2a.server.tasks.AgentEmitter;
import io.a2a.spec.AgentCapabilities;
import io.a2a.spec.AgentCard;
import io.a2a.spec.AgentExtension;
@@ -36,7 +36,6 @@
import io.a2a.spec.TaskStatusUpdateEvent;
import io.a2a.spec.TextPart;
import io.a2a.spec.UnsupportedOperationError;
-
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.internal.testing.StreamRecorder;
@@ -102,13 +101,12 @@ public void testOnCancelTaskSuccess() throws Exception {
GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor);
taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK, false);
- agentExecutorCancel = (context, eventQueue) -> {
+ agentExecutorCancel = (context, agentEmitter) -> {
// We need to cancel the task or the EventConsumer never finds a 'final' event.
// Looking at the Python implementation, they typically use AgentExecutors that
// don't support cancellation. So my theory is the Agent updates the task to the CANCEL status
io.a2a.spec.Task task = context.getTask();
- TaskUpdater taskUpdater = new TaskUpdater(context, eventQueue);
- taskUpdater.cancel();
+ agentEmitter.cancel();
};
CancelTaskRequest request = CancelTaskRequest.newBuilder()
@@ -133,7 +131,7 @@ public void testOnCancelTaskNotSupported() throws Exception {
GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor);
taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK, false);
- agentExecutorCancel = (context, eventQueue) -> {
+ agentExecutorCancel = (context, agentEmitter) -> {
throw new UnsupportedOperationError();
};
@@ -163,8 +161,8 @@ public void testOnCancelTaskNotFound() throws Exception {
@Test
public void testOnMessageNewMessageSuccess() throws Exception {
GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.sendMessage(context.getMessage());
};
StreamRecorder streamRecorder = sendMessageRequest(handler);
@@ -180,8 +178,8 @@ public void testOnMessageNewMessageSuccess() throws Exception {
public void testOnMessageNewMessageWithExistingTaskSuccess() throws Exception {
GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor);
taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK, false);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.sendMessage(context.getMessage());
};
StreamRecorder streamRecorder = sendMessageRequest(handler);
Assertions.assertNull(streamRecorder.getError());
@@ -195,8 +193,8 @@ public void testOnMessageNewMessageWithExistingTaskSuccess() throws Exception {
@Test
public void testOnMessageError() throws Exception {
GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(new UnsupportedOperationError());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.fail(new UnsupportedOperationError());
};
StreamRecorder streamRecorder = sendMessageRequest(handler);
assertGrpcError(streamRecorder, Status.Code.UNIMPLEMENTED);
@@ -225,8 +223,8 @@ public void testSetPushNotificationConfigSuccess() throws Exception {
@Test
public void testGetPushNotificationConfigSuccess() throws Exception {
GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage());
};
// first set the task push notification config
@@ -284,8 +282,8 @@ public void testOnSetPushNotificationNoPushNotifierConfig() throws Exception {
@Test
public void testOnMessageStreamNewMessageSuccess() throws Exception {
GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage());
};
StreamRecorder streamRecorder = sendStreamingMessageRequest(handler);
@@ -302,8 +300,8 @@ public void testOnMessageStreamNewMessageSuccess() throws Exception {
@Test
public void testOnMessageStreamNewMessageExistingTaskSuccess() throws Exception {
GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage());
};
io.a2a.spec.Task task = io.a2a.spec.Task.builder(AbstractA2ARequestHandlerTest.MINIMAL_TASK)
@@ -426,10 +424,10 @@ public void testOnMessageStreamNewMessageSendPushNotificationSuccess() throws Ex
.status(new io.a2a.spec.TaskStatus(io.a2a.spec.TaskState.COMPLETED))
.build());
- agentExecutorExecute = (context, eventQueue) -> {
+ agentExecutorExecute = (context, agentEmitter) -> {
// Hardcode the events to send here
for (Event event : events) {
- eventQueue.enqueueEvent(event);
+ agentEmitter.emitEvent(event);
}
};
@@ -507,8 +505,8 @@ public void testOnSubscribeExistingTaskSuccess() throws Exception {
taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK, false);
queueManager.createOrTap(AbstractA2ARequestHandlerTest.MINIMAL_TASK.id());
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.sendMessage(context.getMessage());
};
StreamRecorder streamRecorder = StreamRecorder.create();
@@ -625,8 +623,8 @@ public void testOnMessageStreamInternalError() throws Exception {
public void testListPushNotificationConfig() throws Exception {
GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor);
taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK, false);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage());
};
StreamRecorder pushRecorder = createTaskPushNotificationConfigRequest(handler,
@@ -651,8 +649,8 @@ public void testListPushNotificationConfigNotSupported() throws Exception {
AgentCard card = AbstractA2ARequestHandlerTest.createAgentCard(true, false);
GrpcHandler handler = new TestGrpcHandler(card, requestHandler, internalExecutor);
taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK, false);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage());
};
ListTaskPushNotificationConfigRequest request = ListTaskPushNotificationConfigRequest.newBuilder()
@@ -668,8 +666,8 @@ public void testListPushNotificationConfigNoPushConfigStore() {
DefaultRequestHandler requestHandler = DefaultRequestHandler.create(executor, taskStore, queueManager, null, mainEventBusProcessor, internalExecutor, internalExecutor);
GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor);
taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK, false);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage());
};
ListTaskPushNotificationConfigRequest request = ListTaskPushNotificationConfigRequest.newBuilder()
@@ -683,8 +681,8 @@ public void testListPushNotificationConfigNoPushConfigStore() {
@Test
public void testListPushNotificationConfigTaskNotFound() {
GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage());
};
ListTaskPushNotificationConfigRequest request = ListTaskPushNotificationConfigRequest.newBuilder()
@@ -699,8 +697,8 @@ public void testListPushNotificationConfigTaskNotFound() {
public void testDeletePushNotificationConfig() throws Exception {
GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor);
taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK, false);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage());
};
StreamRecorder pushRecorder = createTaskPushNotificationConfigRequest(handler, AbstractA2ARequestHandlerTest.MINIMAL_TASK.id(),
AbstractA2ARequestHandlerTest.MINIMAL_TASK.id());
@@ -722,8 +720,8 @@ public void testDeletePushNotificationConfigNotSupported() throws Exception {
AgentCard card = AbstractA2ARequestHandlerTest.createAgentCard(true, false);
GrpcHandler handler = new TestGrpcHandler(card, requestHandler, internalExecutor);
taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK, false);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage());
};
DeleteTaskPushNotificationConfigRequest request = DeleteTaskPushNotificationConfigRequest.newBuilder()
.setId(AbstractA2ARequestHandlerTest.MINIMAL_TASK.id())
@@ -761,15 +759,14 @@ public void testStreamingDoesNotBlockMainThread() throws Exception {
CountDownLatch streamStarted = new CountDownLatch(1);
GrpcHandler.setStreamingSubscribedRunnable(streamStarted::countDown);
CountDownLatch eventProcessed = new CountDownLatch(1);
-
- agentExecutorExecute = (context, eventQueue) -> {
+ agentExecutorExecute = (context, agentEmitter) -> {
// Wait a bit to ensure the main thread continues
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
- eventQueue.enqueueEvent(context.getMessage());
+ agentEmitter.sendMessage(context.getMessage());
};
// Start streaming with a custom StreamObserver
@@ -927,8 +924,8 @@ public ServerCallContext create(StreamObserver streamObserver) {
}
};
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.sendMessage(context.getMessage());
};
SendMessageRequest request = SendMessageRequest.newBuilder()
@@ -1068,8 +1065,8 @@ public ServerCallContext create(StreamObserver streamObserver) {
}
};
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.sendMessage(context.getMessage());
};
SendMessageRequest request = SendMessageRequest.newBuilder()
@@ -1119,8 +1116,8 @@ public ServerCallContext create(StreamObserver streamObserver) {
}
};
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.sendMessage(context.getMessage());
};
SendMessageRequest request = SendMessageRequest.newBuilder()
diff --git a/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java b/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java
index 3e925e266..f7ab20fba 100644
--- a/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java
+++ b/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java
@@ -22,6 +22,8 @@
import io.a2a.jsonrpc.common.wrappers.CancelTaskRequest;
import io.a2a.jsonrpc.common.wrappers.CancelTaskResponse;
+import io.a2a.jsonrpc.common.wrappers.CreateTaskPushNotificationConfigRequest;
+import io.a2a.jsonrpc.common.wrappers.CreateTaskPushNotificationConfigResponse;
import io.a2a.jsonrpc.common.wrappers.DeleteTaskPushNotificationConfigRequest;
import io.a2a.jsonrpc.common.wrappers.DeleteTaskPushNotificationConfigResponse;
import io.a2a.jsonrpc.common.wrappers.GetExtendedAgentCardRequest;
@@ -39,8 +41,6 @@
import io.a2a.jsonrpc.common.wrappers.SendMessageResponse;
import io.a2a.jsonrpc.common.wrappers.SendStreamingMessageRequest;
import io.a2a.jsonrpc.common.wrappers.SendStreamingMessageResponse;
-import io.a2a.jsonrpc.common.wrappers.CreateTaskPushNotificationConfigRequest;
-import io.a2a.jsonrpc.common.wrappers.CreateTaskPushNotificationConfigResponse;
import io.a2a.jsonrpc.common.wrappers.SubscribeToTaskRequest;
import io.a2a.server.ServerCallContext;
import io.a2a.server.auth.UnauthenticatedUser;
@@ -48,7 +48,6 @@
import io.a2a.server.requesthandlers.AbstractA2ARequestHandlerTest;
import io.a2a.server.requesthandlers.DefaultRequestHandler;
import io.a2a.server.tasks.ResultAggregator;
-import io.a2a.server.tasks.TaskUpdater;
import io.a2a.spec.AgentCapabilities;
import io.a2a.spec.AgentCard;
import io.a2a.spec.AgentExtension;
@@ -119,13 +118,12 @@ public void testOnCancelTaskSuccess() throws Exception {
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
taskStore.save(MINIMAL_TASK, false);
- agentExecutorCancel = (context, eventQueue) -> {
+ agentExecutorCancel = (context, agentEmitter) -> {
// We need to cancel the task or the EventConsumer never finds a 'final' event.
// Looking at the Python implementation, they typically use AgentExecutors that
// don't support cancellation. So my theory is the Agent updates the task to the CANCEL status
Task task = context.getTask();
- TaskUpdater taskUpdater = new TaskUpdater(context, eventQueue);
- taskUpdater.cancel();
+ agentEmitter.cancel();
};
CancelTaskRequest request = new CancelTaskRequest("111", new TaskIdParams(MINIMAL_TASK.id()));
@@ -144,7 +142,7 @@ public void testOnCancelTaskNotSupported() {
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
taskStore.save(MINIMAL_TASK, false);
- agentExecutorCancel = (context, eventQueue) -> {
+ agentExecutorCancel = (context, agentEmitter) -> {
throw new UnsupportedOperationError();
};
@@ -168,8 +166,8 @@ public void testOnCancelTaskNotFound() {
@Test
public void testOnMessageNewMessageSuccess() {
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.sendMessage(context.getMessage());
};
Message message = Message.builder(MESSAGE)
.taskId(MINIMAL_TASK.id())
@@ -185,8 +183,8 @@ public void testOnMessageNewMessageSuccess() {
public void testOnMessageNewMessageWithExistingTaskSuccess() {
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
taskStore.save(MINIMAL_TASK, false);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.sendMessage(context.getMessage());
};
Message message = Message.builder(MESSAGE)
.taskId(MINIMAL_TASK.id())
@@ -203,8 +201,8 @@ public void testOnMessageError() {
// See testMessageOnErrorMocks() for a test more similar to the Python implementation, using mocks for
// EventConsumer.consumeAll()
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(new UnsupportedOperationError());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.fail(new UnsupportedOperationError());
};
Message message = Message.builder(MESSAGE)
.taskId(MINIMAL_TASK.id())
@@ -242,8 +240,8 @@ public void testOnMessageErrorMocks() {
@Test
public void testOnMessageStreamNewMessageSuccess() throws InterruptedException {
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage());
};
Message message = Message.builder(MESSAGE)
@@ -323,13 +321,13 @@ public void testOnMessageStreamNewMessageMultipleEventsSuccess() throws Interrup
.build();
// Configure the agent executor to enqueue multiple events
- agentExecutorExecute = (context, eventQueue) -> {
+ agentExecutorExecute = (context, agentEmitter) -> {
// Enqueue the task with WORKING state
- eventQueue.enqueueEvent(taskEvent);
+ agentEmitter.emitEvent(taskEvent);
// Enqueue an artifact update event
- eventQueue.enqueueEvent(artifactEvent);
+ agentEmitter.emitEvent(artifactEvent);
// Enqueue a status update event to complete the task (this is the "final" event)
- eventQueue.enqueueEvent(statusEvent);
+ agentEmitter.emitEvent(statusEvent);
};
Message message = Message.builder(MESSAGE)
@@ -490,8 +488,8 @@ public void onComplete() {
@Test
public void testOnMessageStreamNewMessageExistingTaskSuccess() throws Exception {
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage());
};
Task task = Task.builder(MINIMAL_TASK)
@@ -664,8 +662,8 @@ public void testSetPushNotificationConfigSuccess() {
public void testGetPushNotificationConfigSuccess() {
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
taskStore.save(MINIMAL_TASK, false);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage());
};
TaskPushNotificationConfig taskPushConfig
@@ -716,10 +714,11 @@ public void testOnMessageStreamNewMessageSendPushNotificationSuccess() throws Ex
.status(new TaskStatus(TaskState.COMPLETED))
.build());
- agentExecutorExecute = (context, eventQueue) -> {
+
+ agentExecutorExecute = (context, agentEmitter) -> {
// Hardcode the events to send here
for (Event event : events) {
- eventQueue.enqueueEvent(event);
+ agentEmitter.emitEvent(event);
}
};
@@ -809,10 +808,10 @@ public void testOnSubscribeExistingTaskSuccess() {
taskStore.save(MINIMAL_TASK, false);
queueManager.createOrTap(MINIMAL_TASK.id());
- agentExecutorExecute = (context, eventQueue) -> {
+ agentExecutorExecute = (context, agentEmitter) -> {
// The only thing hitting the agent is the onMessageSend() and we should use the message
- eventQueue.enqueueEvent(context.getMessage());
- //eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
+ agentEmitter.sendMessage(context.getMessage());
+ //agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage());
};
SubscribeToTaskRequest request = new SubscribeToTaskRequest("1", new TaskIdParams(MINIMAL_TASK.id()));
@@ -1265,8 +1264,8 @@ public void testOnMessageSendTaskIdMismatch() {
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
taskStore.save(MINIMAL_TASK, false);
- agentExecutorExecute = ((context, eventQueue) -> {
- eventQueue.enqueueEvent(MINIMAL_TASK);
+ agentExecutorExecute = ((context, agentEmitter) -> {
+ agentEmitter.emitEvent(MINIMAL_TASK);
});
SendMessageRequest request = new SendMessageRequest("1",
new MessageSendParams(MESSAGE, null, null));
@@ -1281,8 +1280,8 @@ public void testOnMessageStreamTaskIdMismatch() throws InterruptedException {
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
taskStore.save(MINIMAL_TASK, false);
- agentExecutorExecute = ((context, eventQueue) -> {
- eventQueue.enqueueEvent(MINIMAL_TASK);
+ agentExecutorExecute = ((context, agentEmitter) -> {
+ agentEmitter.emitEvent(MINIMAL_TASK);
});
SendStreamingMessageRequest request = new SendStreamingMessageRequest("1", new MessageSendParams(MESSAGE, null, null));
@@ -1332,8 +1331,8 @@ public void onComplete() {
public void testListPushNotificationConfig() {
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
taskStore.save(MINIMAL_TASK, false);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage());
};
TaskPushNotificationConfig taskPushConfig
@@ -1363,8 +1362,8 @@ public void testListPushNotificationConfigNotSupported() {
AgentCard card = createAgentCard(true, false);
JSONRPCHandler handler = new JSONRPCHandler(card, requestHandler, internalExecutor);
taskStore.save(MINIMAL_TASK, false);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage());
};
TaskPushNotificationConfig taskPushConfig
@@ -1391,8 +1390,8 @@ public void testListPushNotificationConfigNoPushConfigStore() {
DefaultRequestHandler requestHandler = DefaultRequestHandler.create(executor, taskStore, queueManager, null, mainEventBusProcessor, internalExecutor, internalExecutor);
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
taskStore.save(MINIMAL_TASK, false);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage());
};
ListTaskPushNotificationConfigRequest listRequest
@@ -1408,8 +1407,8 @@ public void testListPushNotificationConfigNoPushConfigStore() {
@Test
public void testListPushNotificationConfigTaskNotFound() {
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage());
};
ListTaskPushNotificationConfigRequest listRequest
@@ -1426,8 +1425,8 @@ public void testListPushNotificationConfigTaskNotFound() {
public void testDeletePushNotificationConfig() {
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
taskStore.save(MINIMAL_TASK, false);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage());
};
TaskPushNotificationConfig taskPushConfig
@@ -1454,8 +1453,8 @@ public void testDeletePushNotificationConfigNotSupported() {
AgentCard card = createAgentCard(true, false);
JSONRPCHandler handler = new JSONRPCHandler(card, requestHandler, internalExecutor);
taskStore.save(MINIMAL_TASK, false);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage());
};
TaskPushNotificationConfig taskPushConfig
@@ -1483,8 +1482,8 @@ public void testDeletePushNotificationConfigNoPushConfigStore() {
DefaultRequestHandler.create(executor, taskStore, queueManager, null, mainEventBusProcessor, internalExecutor, internalExecutor);
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
taskStore.save(MINIMAL_TASK, false);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getTask() != null ? context.getTask() : context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.emitEvent(context.getTask() != null ? context.getTask() : context.getMessage());
};
TaskPushNotificationConfig taskPushConfig
@@ -1526,14 +1525,14 @@ public void testStreamingDoesNotBlockMainThread() throws Exception {
CountDownLatch streamStarted = new CountDownLatch(1);
CountDownLatch eventProcessed = new CountDownLatch(1);
- agentExecutorExecute = (context, eventQueue) -> {
+ agentExecutorExecute = (context, agentEmitter) -> {
// Wait a bit to ensure the main thread continues
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
- eventQueue.enqueueEvent(context.getMessage());
+ agentEmitter.sendMessage(context.getMessage());
};
Message message = Message.builder(MESSAGE)
@@ -1734,8 +1733,8 @@ public void testRequiredExtensionProvidedSuccess() {
requestedExtensions
);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.sendMessage(context.getMessage());
};
Message message = Message.builder(MESSAGE)
@@ -1892,8 +1891,8 @@ public void testCompatibleVersionSuccess() {
"1.1" // Compatible version (same major version)
);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.sendMessage(context.getMessage());
};
Message message = Message.builder(MESSAGE)
@@ -1928,8 +1927,8 @@ public void testNoVersionDefaultsToCurrentVersionSuccess() {
JSONRPCHandler handler = new JSONRPCHandler(agentCard, requestHandler, internalExecutor);
// Use default callContext (no version - should default to 1.0)
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.sendMessage(context.getMessage());
};
Message message = Message.builder(MESSAGE)
diff --git a/transport/rest/src/test/java/io/a2a/transport/rest/handler/RestHandlerTest.java b/transport/rest/src/test/java/io/a2a/transport/rest/handler/RestHandlerTest.java
index 2d1c19b84..b0dc58141 100644
--- a/transport/rest/src/test/java/io/a2a/transport/rest/handler/RestHandlerTest.java
+++ b/transport/rest/src/test/java/io/a2a/transport/rest/handler/RestHandlerTest.java
@@ -15,7 +15,7 @@
import io.a2a.server.ServerCallContext;
import io.a2a.server.auth.UnauthenticatedUser;
import io.a2a.server.requesthandlers.AbstractA2ARequestHandlerTest;
-import io.a2a.server.tasks.TaskUpdater;
+import io.a2a.server.tasks.AgentEmitter;
import io.a2a.spec.AgentCapabilities;
import io.a2a.spec.AgentCard;
import io.a2a.spec.AgentExtension;
@@ -87,8 +87,8 @@ public void testListTasksInvalidStatus() {
@Test
public void testSendMessage() throws InvalidProtocolBufferException {
RestHandler handler = new RestHandler(CARD, requestHandler, internalExecutor);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.sendMessage(context.getMessage());
};
String requestBody = """
{
@@ -167,13 +167,12 @@ public void testCancelTaskSuccess() {
RestHandler handler = new RestHandler(CARD, requestHandler, internalExecutor);
taskStore.save(MINIMAL_TASK, false);
- agentExecutorCancel = (context, eventQueue) -> {
+ agentExecutorCancel = (context, agentEmitter) -> {
// We need to cancel the task or the EventConsumer never finds a 'final' event.
// Looking at the Python implementation, they typically use AgentExecutors that
// don't support cancellation. So my theory is the Agent updates the task to the CANCEL status
Task task = context.getTask();
- TaskUpdater taskUpdater = new TaskUpdater(context, eventQueue);
- taskUpdater.cancel();
+ agentEmitter.cancel();
};
RestHandler.HTTPRestResponse response = handler.cancelTask(MINIMAL_TASK.id(), "", callContext);
@@ -197,8 +196,8 @@ public void testCancelTaskNotFound() {
@Test
public void testSendStreamingMessageSuccess() {
RestHandler handler = new RestHandler(CARD, requestHandler, internalExecutor);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.sendMessage(context.getMessage());
};
String requestBody = """
{
@@ -357,14 +356,14 @@ public void testStreamingDoesNotBlockMainThread() throws Exception {
AtomicBoolean eventReceived = new AtomicBoolean(false);
CountDownLatch streamStarted = new CountDownLatch(1);
CountDownLatch eventProcessed = new CountDownLatch(1);
- agentExecutorExecute = (context, eventQueue) -> {
+ agentExecutorExecute = (context, agentEmitter) -> {
// Wait a bit to ensure the main thread continues
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
- eventQueue.enqueueEvent(context.getMessage());
+ agentEmitter.sendMessage(context.getMessage());
};
String requestBody = """
@@ -602,8 +601,8 @@ public void testRequiredExtensionProvidedSuccess() {
requestedExtensions
);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.sendMessage(context.getMessage());
};
String requestBody = """
@@ -796,8 +795,8 @@ public void testCompatibleVersionSuccess() {
"1.1" // Compatible version (same major version)
);
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.sendMessage(context.getMessage());
};
String requestBody = """
@@ -844,8 +843,8 @@ public void testNoVersionDefaultsToCurrentVersionSuccess() {
RestHandler handler = new RestHandler(agentCard, requestHandler, internalExecutor);
// Use default callContext (no version - should default to 1.0)
- agentExecutorExecute = (context, eventQueue) -> {
- eventQueue.enqueueEvent(context.getMessage());
+ agentExecutorExecute = (context, agentEmitter) -> {
+ agentEmitter.sendMessage(context.getMessage());
};
String requestBody = """