From b20f7e4091a08990018e6dae5da400ea5955f117 Mon Sep 17 00:00:00 2001
From: Kabir Khan
Date: Thu, 5 Feb 2026 17:05:03 +0000
Subject: [PATCH 1/3] fix: Race condition in
testNonBlockingWithMultipleMessages
---
.../server/grpc/quarkus/A2ATestResource.java | 16 ++++++++++++
.../server/apps/quarkus/A2ATestRoutes.java | 14 +++++++++++
.../server/rest/quarkus/A2ATestRoutes.java | 14 +++++++++++
.../apps/common/AbstractA2AServerTest.java | 22 ++++++++++++++++
.../a2a/server/apps/common/TestUtilsBean.java | 25 +++++++++++++++++++
5 files changed, 91 insertions(+)
diff --git a/reference/grpc/src/test/java/io/a2a/server/grpc/quarkus/A2ATestResource.java b/reference/grpc/src/test/java/io/a2a/server/grpc/quarkus/A2ATestResource.java
index 4136eb70b..eff180172 100644
--- a/reference/grpc/src/test/java/io/a2a/server/grpc/quarkus/A2ATestResource.java
+++ b/reference/grpc/src/test/java/io/a2a/server/grpc/quarkus/A2ATestResource.java
@@ -137,4 +137,20 @@ public Response savePushNotificationConfigInStore(@PathParam("taskId") String ta
testUtilsBean.saveTaskPushNotificationConfig(taskId, notificationConfig);
return Response.ok().build();
}
+
+ @POST
+ @Path("/queue/awaitPollerStart/{taskId}")
+ public Response awaitQueuePollerStart(@PathParam("taskId") String taskId) throws InterruptedException {
+ testUtilsBean.awaitQueuePollerStart(taskId);
+ return Response.ok().build();
+ }
+
+ @POST
+ @Path("/queue/awaitChildCountStable/{taskId}/{expectedCount}/{timeoutMs}")
+ public Response awaitChildQueueCountStable(@PathParam("taskId") String taskId,
+ @PathParam("expectedCount") int expectedCount,
+ @PathParam("timeoutMs") long timeoutMs) throws InterruptedException {
+ boolean stable = testUtilsBean.awaitChildQueueCountStable(taskId, expectedCount, timeoutMs);
+ return Response.ok(String.valueOf(stable), TEXT_PLAIN).build();
+ }
}
diff --git a/reference/jsonrpc/src/test/java/io/a2a/server/apps/quarkus/A2ATestRoutes.java b/reference/jsonrpc/src/test/java/io/a2a/server/apps/quarkus/A2ATestRoutes.java
index 12eae5a4d..27081a4df 100644
--- a/reference/jsonrpc/src/test/java/io/a2a/server/apps/quarkus/A2ATestRoutes.java
+++ b/reference/jsonrpc/src/test/java/io/a2a/server/apps/quarkus/A2ATestRoutes.java
@@ -185,6 +185,20 @@ public void saveTaskPushNotificationConfig(@Param String taskId, @Body String bo
}
}
+ @Route(path = "/test/queue/awaitChildCountStable/:taskId/:expectedCount/:timeoutMs", methods = {Route.HttpMethod.POST}, type = Route.HandlerType.BLOCKING)
+ public void awaitChildQueueCountStable(@Param("taskId") String taskId, @Param("expectedCount") String expectedCountStr, @Param("timeoutMs") String timeoutMsStr, RoutingContext rc) {
+ try {
+ int expectedCount = Integer.parseInt(expectedCountStr);
+ long timeoutMs = Long.parseLong(timeoutMsStr);
+ boolean stable = testUtilsBean.awaitChildQueueCountStable(taskId, expectedCount, timeoutMs);
+ rc.response()
+ .setStatusCode(200)
+ .end(String.valueOf(stable));
+ } catch (Throwable t) {
+ errorResponse(t, rc);
+ }
+ }
+
private void errorResponse(Throwable t, RoutingContext rc) {
t.printStackTrace();
rc.response()
diff --git a/reference/rest/src/test/java/io/a2a/server/rest/quarkus/A2ATestRoutes.java b/reference/rest/src/test/java/io/a2a/server/rest/quarkus/A2ATestRoutes.java
index 57d6965a4..dcd9d58d6 100644
--- a/reference/rest/src/test/java/io/a2a/server/rest/quarkus/A2ATestRoutes.java
+++ b/reference/rest/src/test/java/io/a2a/server/rest/quarkus/A2ATestRoutes.java
@@ -185,6 +185,20 @@ public void saveTaskPushNotificationConfig(@Param String taskId, @Body String bo
}
}
+ @Route(path = "/test/queue/awaitChildCountStable/:taskId/:expectedCount/:timeoutMs", methods = {Route.HttpMethod.POST}, type = Route.HandlerType.BLOCKING)
+ public void awaitChildQueueCountStable(@Param("taskId") String taskId, @Param("expectedCount") String expectedCountStr, @Param("timeoutMs") String timeoutMsStr, RoutingContext rc) {
+ try {
+ int expectedCount = Integer.parseInt(expectedCountStr);
+ long timeoutMs = Long.parseLong(timeoutMsStr);
+ boolean stable = testUtilsBean.awaitChildQueueCountStable(taskId, expectedCount, timeoutMs);
+ rc.response()
+ .setStatusCode(200)
+ .end(String.valueOf(stable));
+ } catch (Throwable t) {
+ errorResponse(t, rc);
+ }
+ }
+
private void errorResponse(Throwable t, RoutingContext rc) {
t.printStackTrace();
rc.response()
diff --git a/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java b/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java
index 5522e4dbf..897e310db 100644
--- a/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java
+++ b/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java
@@ -1401,6 +1401,11 @@ public void testNonBlockingWithMultipleMessages() throws Exception {
assertTrue(subscriptionLatch.await(15, TimeUnit.SECONDS));
+ // CRITICAL: Wait for subscribeToTask's child queue poller to start
+ // This ensures it's ready to receive events from the upcoming sendMessage
+ assertTrue(awaitChildQueueCountStable(taskId, 1, 5000),
+ "subscribeToTask child queue should be created and stable");
+
// 3. Send second streaming message to same taskId
Message message2 = Message.builder(MESSAGE)
.taskId(multiEventTaskId) // Same taskId
@@ -1492,6 +1497,23 @@ public void testNonBlockingWithMultipleMessages() throws Exception {
}
}
+ private boolean awaitChildQueueCountStable(String taskId, int expectedCount, long timeoutMs) throws IOException, InterruptedException {
+ HttpClient client = HttpClient.newBuilder()
+ .version(HttpClient.Version.HTTP_2)
+ .build();
+ HttpRequest request = HttpRequest.newBuilder()
+ .uri(URI.create("http://localhost:" + serverPort + "/test/queue/awaitChildCountStable/" +
+ taskId + "/" + expectedCount + "/" + timeoutMs))
+ .POST(HttpRequest.BodyPublishers.noBody())
+ .build();
+
+ HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString(StandardCharsets.UTF_8));
+ if (response.statusCode() != 200) {
+ throw new RuntimeException(response.statusCode() + ": Awaiting child queue count failed! " + response.body());
+ }
+ return Boolean.parseBoolean(response.body());
+ }
+
@Test
@Timeout(value = 1, unit = TimeUnit.MINUTES)
public void testInputRequiredWorkflow() throws Exception {
diff --git a/tests/server-common/src/test/java/io/a2a/server/apps/common/TestUtilsBean.java b/tests/server-common/src/test/java/io/a2a/server/apps/common/TestUtilsBean.java
index 45483f214..997d3dbbe 100644
--- a/tests/server-common/src/test/java/io/a2a/server/apps/common/TestUtilsBean.java
+++ b/tests/server-common/src/test/java/io/a2a/server/apps/common/TestUtilsBean.java
@@ -61,4 +61,29 @@ public void deleteTaskPushNotificationConfig(String taskId, String configId) {
public void saveTaskPushNotificationConfig(String taskId, PushNotificationConfig notificationConfig) {
pushNotificationConfigStore.setInfo(taskId, notificationConfig);
}
+
+ public void awaitQueuePollerStart(String taskId) throws InterruptedException {
+ queueManager.awaitQueuePollerStart(queueManager.get(taskId));
+ }
+
+ public boolean awaitChildQueueCountStable(String taskId, int expectedCount, long timeoutMs) throws InterruptedException {
+ long endTime = System.currentTimeMillis() + timeoutMs;
+ int consecutiveMatches = 0;
+ final int requiredMatches = 3; // Count must match 3 times in a row (150ms) to be considered stable
+
+ while (System.currentTimeMillis() < endTime) {
+ int count = queueManager.getActiveChildQueueCount(taskId);
+ if (count == expectedCount) {
+ consecutiveMatches++;
+ if (consecutiveMatches >= requiredMatches) {
+ // Count is stable - all child queues exist and haven't closed
+ return true;
+ }
+ } else {
+ consecutiveMatches = 0; // Reset if count changes
+ }
+ Thread.sleep(50);
+ }
+ return false;
+ }
}
From ab477240155639ea0fea6fecbbda724aa261217d Mon Sep 17 00:00:00 2001
From: Kabir Khan
Date: Wed, 4 Feb 2026 16:05:45 +0000
Subject: [PATCH 2/3] feat: Implement PushNotifications as per the 1.0 spec
---
...otificationConfigStoreIntegrationTest.java | 30 +--
.../jpa/MockPushNotificationSender.java | 23 +-
.../server/events/MainEventBusProcessor.java | 41 ++--
.../tasks/BasePushNotificationSender.java | 52 ++++-
.../server/tasks/PushNotificationSender.java | 36 +++-
.../AbstractA2ARequestHandlerTest.java | 11 +-
.../tasks/PushNotificationSenderTest.java | 201 +++++++++++++++---
.../grpc/handler/GrpcHandlerTest.java | 47 ++--
.../jsonrpc/handler/JSONRPCHandlerTest.java | 48 +++--
9 files changed, 347 insertions(+), 142 deletions(-)
diff --git a/extras/push-notification-config-store-database-jpa/src/test/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStoreIntegrationTest.java b/extras/push-notification-config-store-database-jpa/src/test/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStoreIntegrationTest.java
index 0366e236a..96f8f3adc 100644
--- a/extras/push-notification-config-store-database-jpa/src/test/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStoreIntegrationTest.java
+++ b/extras/push-notification-config-store-database-jpa/src/test/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStoreIntegrationTest.java
@@ -151,11 +151,12 @@ public void testJpaDatabasePushNotificationConfigStoreIntegration() throws Excep
assertTrue(updateLatch.await(10, TimeUnit.SECONDS), "Timeout waiting for task update");
// Step 5: Poll for the async notification to be captured
+ // With the new StreamingEventKind support, we receive all event types (Task, Message, TaskArtifactUpdateEvent, etc.)
long end = System.currentTimeMillis() + 5000;
boolean notificationReceived = false;
while (System.currentTimeMillis() < end) {
- if (!mockPushNotificationSender.getCapturedTasks().isEmpty()) {
+ if (!mockPushNotificationSender.getCapturedEvents().isEmpty()) {
notificationReceived = true;
break;
}
@@ -165,17 +166,22 @@ public void testJpaDatabasePushNotificationConfigStoreIntegration() throws Excep
assertTrue(notificationReceived, "Timeout waiting for push notification.");
// Step 6: Verify the captured notification
- Queue capturedTasks = mockPushNotificationSender.getCapturedTasks();
-
- // Verify the notification contains the correct task with artifacts
- Task notifiedTaskWithArtifact = capturedTasks.stream()
- .filter(t -> taskId.equals(t.id()) && t.artifacts() != null && t.artifacts().size() > 0)
- .findFirst()
- .orElse(null);
-
- assertNotNull(notifiedTaskWithArtifact, "Notification should contain the updated task with artifacts");
- assertEquals(taskId, notifiedTaskWithArtifact.id());
- assertEquals(1, notifiedTaskWithArtifact.artifacts().size(), "Task should have one artifact from the update");
+ // Check if we received events for this task (could be Task, TaskArtifactUpdateEvent, etc.)
+ Queue capturedEvents = mockPushNotificationSender.getCapturedEvents();
+
+ // Look for Task events with artifacts OR TaskArtifactUpdateEvent for this task
+ boolean hasTaskWithArtifact = capturedEvents.stream()
+ .filter(e -> e instanceof Task)
+ .map(e -> (Task) e)
+ .anyMatch(t -> taskId.equals(t.id()) && t.artifacts() != null && t.artifacts().size() > 0);
+
+ boolean hasArtifactUpdateEvent = capturedEvents.stream()
+ .filter(e -> e instanceof io.a2a.spec.TaskArtifactUpdateEvent)
+ .map(e -> (io.a2a.spec.TaskArtifactUpdateEvent) e)
+ .anyMatch(e -> taskId.equals(e.taskId()));
+
+ assertTrue(hasTaskWithArtifact || hasArtifactUpdateEvent,
+ "Notification should contain either Task with artifacts or TaskArtifactUpdateEvent for task " + taskId);
// Step 7: Clean up - delete the push notification configuration
client.deleteTaskPushNotificationConfigurations(
diff --git a/extras/push-notification-config-store-database-jpa/src/test/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/MockPushNotificationSender.java b/extras/push-notification-config-store-database-jpa/src/test/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/MockPushNotificationSender.java
index 0a6bba415..2275388a9 100644
--- a/extras/push-notification-config-store-database-jpa/src/test/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/MockPushNotificationSender.java
+++ b/extras/push-notification-config-store-database-jpa/src/test/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/MockPushNotificationSender.java
@@ -8,6 +8,7 @@
import jakarta.enterprise.inject.Alternative;
import io.a2a.server.tasks.PushNotificationSender;
+import io.a2a.spec.StreamingEventKind;
import io.a2a.spec.Task;
/**
@@ -19,18 +20,30 @@
@Priority(100)
public class MockPushNotificationSender implements PushNotificationSender {
- private final Queue capturedTasks = new ConcurrentLinkedQueue<>();
+ private final Queue capturedEvents = new ConcurrentLinkedQueue<>();
@Override
- public void sendNotification(Task task) {
- capturedTasks.add(task);
+ public void sendNotification(StreamingEventKind event) {
+ capturedEvents.add(event);
}
+ public Queue getCapturedEvents() {
+ return capturedEvents;
+ }
+
+ /**
+ * For backward compatibility - provides access to Task events only.
+ */
public Queue getCapturedTasks() {
- return capturedTasks;
+ Queue tasks = new ConcurrentLinkedQueue<>();
+ capturedEvents.stream()
+ .filter(e -> e instanceof Task)
+ .map(e -> (Task) e)
+ .forEach(tasks::add);
+ return tasks;
}
public void clear() {
- capturedTasks.clear();
+ capturedEvents.clear();
}
}
diff --git a/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java b/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java
index 8b3dc6fa3..19d11b819 100644
--- a/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java
+++ b/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java
@@ -16,6 +16,7 @@
import io.a2a.spec.InternalError;
import io.a2a.spec.Message;
import io.a2a.spec.Task;
+import io.a2a.spec.StreamingEventKind;
import io.a2a.spec.TaskArtifactUpdateEvent;
import io.a2a.spec.TaskStatusUpdateEvent;
import org.slf4j.Logger;
@@ -211,16 +212,11 @@ private void processEvent(MainEventBusContext context) {
// Step 2: Send push notification AFTER successful persistence (only from active node)
// Skip push notifications for replicated events to avoid duplicate notifications in multi-instance deployments
- if (eventToDistribute == event && !isReplicated) {
- // Capture task state immediately after persistence, before going async
- // This ensures we send the task as it existed when THIS event was processed,
- // not whatever state might exist later when the async callback executes
- Task taskSnapshot = taskStore.get(taskId);
- if (taskSnapshot != null) {
- sendPushNotification(taskId, taskSnapshot);
- } else {
- LOGGER.warn("Task {} not found in TaskStore after successful persistence, skipping push notification", taskId);
- }
+ // Push notifications are sent for all StreamingEventKind events (Task, Message, TaskStatusUpdateEvent, TaskArtifactUpdateEvent)
+ // per A2A spec section 4.3.3
+ if (eventToDistribute == event && !isReplicated && event instanceof StreamingEventKind streamingEvent) {
+ // Send the streaming event directly - it will be wrapped in StreamResponse format by PushNotificationSender
+ sendPushNotification(taskId, streamingEvent);
}
// Step 3: Then distribute to ChildQueues (clients see either event or error AFTER persistence attempt)
@@ -304,7 +300,7 @@ private boolean updateTaskStore(String taskId, Event event, boolean isReplicated
}
/**
- * Sends push notification for the task AFTER persistence.
+ * Sends push notification for the streaming event AFTER persistence.
*
* This is called after updateTaskStore() to ensure the notification contains
* the latest persisted state, avoiding race conditions.
@@ -315,10 +311,15 @@ private boolean updateTaskStore(String taskId, Event event, boolean isReplicated
* PushNotificationSender.sendNotification() was causing streaming delays.
*
*
- * IMPORTANT: The task parameter is a snapshot captured immediately after
- * persistence. This ensures we send the task state as it existed when THIS event
- * was processed, not whatever state might exist in TaskStore when the async
- * callback executes (subsequent events may have already updated the store).
+ * IMPORTANT: The event parameter is the actual event being processed.
+ * This ensures we send the event as it was when processed, not whatever state
+ * might exist in TaskStore when the async callback executes (subsequent events
+ * may have already updated the store).
+ *
+ *
+ * Supports all StreamingEventKind event types per A2A spec section 4.3.3:
+ * Task, Message, TaskStatusUpdateEvent, TaskArtifactUpdateEvent.
+ * The event will be automatically wrapped in StreamResponse format by JsonUtil.
*
*
* NOTE: Tests can inject a synchronous executor via setPushNotificationExecutor()
@@ -326,16 +327,16 @@ private boolean updateTaskStore(String taskId, Event event, boolean isReplicated
*
*
* @param taskId the task ID
- * @param task the task snapshot to send (captured immediately after persistence)
+ * @param event the streaming event to send (Task, Message, TaskStatusUpdateEvent, or TaskArtifactUpdateEvent)
*/
- private void sendPushNotification(String taskId, Task task) {
+ private void sendPushNotification(String taskId, StreamingEventKind event) {
Runnable pushTask = () -> {
try {
- if (task != null) {
+ if (event != null) {
LOGGER.debug("Sending push notification for task {}", taskId);
- pushSender.sendNotification(task);
+ pushSender.sendNotification(event);
} else {
- LOGGER.debug("Skipping push notification - task snapshot is null for task {}", taskId);
+ LOGGER.debug("Skipping push notification - event is null for task {}", taskId);
}
} catch (Exception e) {
LOGGER.error("Error sending push notification for task {}", taskId, e);
diff --git a/server-common/src/main/java/io/a2a/server/tasks/BasePushNotificationSender.java b/server-common/src/main/java/io/a2a/server/tasks/BasePushNotificationSender.java
index d6d4a2369..82aa6ce68 100644
--- a/server-common/src/main/java/io/a2a/server/tasks/BasePushNotificationSender.java
+++ b/server-common/src/main/java/io/a2a/server/tasks/BasePushNotificationSender.java
@@ -5,6 +5,7 @@
import static io.a2a.common.A2AHeaders.X_A2A_NOTIFICATION_TOKEN;
import io.a2a.spec.TaskPushNotificationConfig;
+import jakarta.annotation.Nullable;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@@ -20,8 +21,12 @@
import io.a2a.jsonrpc.common.json.JsonUtil;
import io.a2a.spec.ListTaskPushNotificationConfigParams;
import io.a2a.spec.ListTaskPushNotificationConfigResult;
+import io.a2a.spec.Message;
import io.a2a.spec.PushNotificationConfig;
+import io.a2a.spec.StreamingEventKind;
import io.a2a.spec.Task;
+import io.a2a.spec.TaskArtifactUpdateEvent;
+import io.a2a.spec.TaskStatusUpdateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,11 +67,17 @@ public BasePushNotificationSender(PushNotificationConfigStore configStore, A2AHt
}
@Override
- public void sendNotification(Task task) {
+ public void sendNotification(StreamingEventKind event) {
+ String taskId = extractTaskId(event);
+ if (taskId == null) {
+ LOGGER.warn("Cannot send push notification: event does not contain taskId");
+ return;
+ }
+
List configs = new ArrayList<>();
String nextPageToken = null;
do {
- ListTaskPushNotificationConfigResult pageResult = configStore.getInfo(new ListTaskPushNotificationConfigParams(task.id(),
+ ListTaskPushNotificationConfigResult pageResult = configStore.getInfo(new ListTaskPushNotificationConfigParams(taskId,
DEFAULT_PAGE_SIZE, nextPageToken, ""));
if (!pageResult.configs().isEmpty()) {
configs.addAll(pageResult.configs());
@@ -76,7 +87,7 @@ public void sendNotification(Task task) {
List> dispatchResults = configs
.stream()
- .map(pushConfig -> dispatch(task, pushConfig.pushNotificationConfig()))
+ .map(pushConfig -> dispatch(event, pushConfig.pushNotificationConfig()))
.toList();
CompletableFuture allFutures = CompletableFuture.allOf(dispatchResults.toArray(new CompletableFuture[0]));
CompletableFuture dispatchResult = allFutures.thenApply(v -> dispatchResults.stream()
@@ -84,18 +95,37 @@ public void sendNotification(Task task) {
try {
boolean allSent = dispatchResult.get();
if (!allSent) {
- LOGGER.warn("Some push notifications failed to send for taskId: " + task.id());
+ LOGGER.warn("Some push notifications failed to send for taskId: " + taskId);
}
} catch (InterruptedException | ExecutionException e) {
- LOGGER.warn("Some push notifications failed to send for taskId " + task.id() + ": {}", e.getMessage(), e);
+ LOGGER.warn("Some push notifications failed to send for taskId " + taskId + ": {}", e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Extracts the task ID from a StreamingEventKind event.
+ *
+ * @param event the streaming event
+ * @return the task ID, or null if not available
+ */
+ private @Nullable String extractTaskId(StreamingEventKind event) {
+ if (event instanceof Task task) {
+ return task.id();
+ } else if (event instanceof Message message) {
+ return message.taskId();
+ } else if (event instanceof TaskStatusUpdateEvent statusUpdate) {
+ return statusUpdate.taskId();
+ } else if (event instanceof TaskArtifactUpdateEvent artifactUpdate) {
+ return artifactUpdate.taskId();
}
+ return null;
}
- private CompletableFuture dispatch(Task task, PushNotificationConfig pushInfo) {
- return CompletableFuture.supplyAsync(() -> dispatchNotification(task, pushInfo));
+ private CompletableFuture dispatch(StreamingEventKind event, PushNotificationConfig pushInfo) {
+ return CompletableFuture.supplyAsync(() -> dispatchNotification(event, pushInfo));
}
- private boolean dispatchNotification(Task task, PushNotificationConfig pushInfo) {
+ private boolean dispatchNotification(StreamingEventKind event, PushNotificationConfig pushInfo) {
String url = pushInfo.url();
String token = pushInfo.token();
@@ -106,9 +136,11 @@ private boolean dispatchNotification(Task task, PushNotificationConfig pushInfo)
String body;
try {
- body = JsonUtil.toJson(task);
+ // JsonUtil.toJson automatically wraps StreamingEventKind in StreamResponse format
+ // (task/message/statusUpdate/artifactUpdate) per A2A spec section 4.3.3
+ body = JsonUtil.toJson(event);
} catch (Throwable throwable) {
- LOGGER.debug("Error writing value as string: {}", throwable.getMessage(), throwable);
+ LOGGER.debug("Error serializing StreamingEventKind to JSON: {}", throwable.getMessage(), throwable);
return false;
}
diff --git a/server-common/src/main/java/io/a2a/server/tasks/PushNotificationSender.java b/server-common/src/main/java/io/a2a/server/tasks/PushNotificationSender.java
index 2013d6a22..ef54266a5 100644
--- a/server-common/src/main/java/io/a2a/server/tasks/PushNotificationSender.java
+++ b/server-common/src/main/java/io/a2a/server/tasks/PushNotificationSender.java
@@ -1,5 +1,6 @@
package io.a2a.server.tasks;
+import io.a2a.spec.StreamingEventKind;
import io.a2a.spec.Task;
/**
@@ -27,7 +28,8 @@
* {@link BasePushNotificationSender} provides HTTP webhook delivery:
*
* - Retrieves webhook URLs from {@link PushNotificationConfigStore}
- * - Sends HTTP POST requests with task JSON payload
+ * - Wraps events in StreamResponse format (per A2A spec section 4.3.3)
+ * - Sends HTTP POST requests with StreamResponse JSON payload
* - Logs errors but doesn't fail the request
*
*
@@ -47,11 +49,12 @@
* @Priority(100)
* public class KafkaPushNotificationSender implements PushNotificationSender {
* @Inject
- * KafkaProducer producer;
+ * KafkaProducer producer;
*
* @Override
- * public void sendNotification(Task task) {
- * producer.send("task-updates", task.id(), task);
+ * public void sendNotification(StreamingEventKind event) {
+ * String taskId = extractTaskId(event);
+ * producer.send("task-updates", taskId, event);
* }
* }
* }
@@ -78,18 +81,31 @@
public interface PushNotificationSender {
/**
- * Sends a push notification containing the latest task state.
+ * Sends a push notification containing a streaming event.
*
- * Called after the task has been persisted to {@link TaskStore}. Retrieve push
- * notification URLs or messaging configurations from {@link PushNotificationConfigStore}
- * using {@code task.id()}.
+ * Called after the event has been persisted to {@link TaskStore}. The event is wrapped
+ * in a StreamResponse format (per A2A spec section 4.3.3) with the appropriate oneof
+ * field set (task, message, statusUpdate, or artifactUpdate).
+ *
+ *
+ * Retrieve push notification URLs or messaging configurations from
+ * {@link PushNotificationConfigStore} using the task ID extracted from the event.
+ *
+ *
+ * Supported event types:
+ *
+ * - {@link Task} - wrapped in StreamResponse.task
+ * - {@link io.a2a.spec.Message} - wrapped in StreamResponse.message
+ * - {@link io.a2a.spec.TaskStatusUpdateEvent} - wrapped in StreamResponse.statusUpdate
+ * - {@link io.a2a.spec.TaskArtifactUpdateEvent} - wrapped in StreamResponse.artifactUpdate
+ *
*
*
* Error Handling: Log errors but don't throw exceptions. Notifications are
* best-effort and should not fail the primary request.
*
*
- * @param task the task with current state and artifacts to send
+ * @param event the streaming event to send (Task, Message, TaskStatusUpdateEvent, or TaskArtifactUpdateEvent)
*/
- void sendNotification(Task task);
+ void sendNotification(StreamingEventKind event);
}
diff --git a/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java b/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java
index 968937b89..e428c679e 100644
--- a/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java
+++ b/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java
@@ -41,6 +41,7 @@
import io.a2a.spec.AgentInterface;
import io.a2a.spec.Event;
import io.a2a.spec.Message;
+import io.a2a.spec.StreamingEventKind;
import io.a2a.spec.Task;
import io.a2a.spec.TaskState;
import io.a2a.spec.TaskStatus;
@@ -189,7 +190,7 @@ public boolean isReplicated() {
@Dependent
@IfBuildProfile("test")
protected static class TestHttpClient implements A2AHttpClient {
- public final List tasks = Collections.synchronizedList(new ArrayList<>());
+ public final List events = Collections.synchronizedList(new ArrayList<>());
public volatile CountDownLatch latch;
@Override
@@ -218,8 +219,10 @@ public PostBuilder body(String body) {
@Override
public A2AHttpResponse post() throws IOException, InterruptedException {
try {
- Task task = JsonUtil.fromJson(body, Task.class);
- tasks.add(task);
+ // Parse StreamResponse format to extract the streaming event
+ // The body contains a wrapper with one of: task, message, statusUpdate, artifactUpdate
+ StreamingEventKind event = JsonUtil.fromJson(body, StreamingEventKind.class);
+ events.add(event);
return new A2AHttpResponse() {
@Override
public int status() {
@@ -237,7 +240,7 @@ public String body() {
}
};
} catch (JsonProcessingException e) {
- throw new IOException("Failed to parse task JSON", e);
+ throw new IOException("Failed to parse StreamingEventKind JSON", e);
} finally {
if (latch != null) {
latch.countDown();
diff --git a/server-common/src/test/java/io/a2a/server/tasks/PushNotificationSenderTest.java b/server-common/src/test/java/io/a2a/server/tasks/PushNotificationSenderTest.java
index 7bb67f681..e58ad384b 100644
--- a/server-common/src/test/java/io/a2a/server/tasks/PushNotificationSenderTest.java
+++ b/server-common/src/test/java/io/a2a/server/tasks/PushNotificationSenderTest.java
@@ -21,10 +21,17 @@
import io.a2a.common.A2AHeaders;
import io.a2a.jsonrpc.common.json.JsonProcessingException;
import io.a2a.jsonrpc.common.json.JsonUtil;
+import io.a2a.spec.Artifact;
+import io.a2a.spec.Message;
+import io.a2a.spec.Part;
import io.a2a.spec.PushNotificationConfig;
+import io.a2a.spec.StreamingEventKind;
import io.a2a.spec.Task;
+import io.a2a.spec.TaskArtifactUpdateEvent;
import io.a2a.spec.TaskState;
import io.a2a.spec.TaskStatus;
+import io.a2a.spec.TaskStatusUpdateEvent;
+import io.a2a.spec.TextPart;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -35,12 +42,14 @@ public class PushNotificationSenderTest {
private BasePushNotificationSender sender;
/**
- * Simple test implementation of A2AHttpClient that captures HTTP calls for verification
+ * Simple test implementation of A2AHttpClient that captures HTTP calls for verification.
+ * Now captures StreamingEventKind events wrapped in StreamResponse format.
*/
private static class TestHttpClient implements A2AHttpClient {
- final List tasks = Collections.synchronizedList(new ArrayList<>());
+ final List events = Collections.synchronizedList(new ArrayList<>());
final List urls = Collections.synchronizedList(new ArrayList<>());
final List