Skip to content

Commit 4b388cb

Browse files
vertex-sdk-botcopybara-github
authored andcommitted
feat: enable a2a streaming for agents deployed to Agent Engine.
PiperOrigin-RevId: 882204780
1 parent 0d6c54e commit 4b388cb

1 file changed

Lines changed: 26 additions & 4 deletions

File tree

  • vertexai/preview/reasoning_engines/templates

vertexai/preview/reasoning_engines/templates/a2a.py

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import os
1818
from typing import Any, Callable, Dict, List, Mapping, Optional, TYPE_CHECKING
19+
from collections.abc import AsyncIterator
1920

2021

2122
if TYPE_CHECKING:
@@ -55,6 +56,7 @@ def create_agent_card(
5556
agent_card: Optional[Dict[str, Any]] = None,
5657
default_input_modes: Optional[List[str]] = None,
5758
default_output_modes: Optional[List[str]] = None,
59+
streaming: bool = False,
5860
) -> "AgentCard":
5961
"""Creates an AgentCard object.
6062
@@ -73,6 +75,8 @@ def create_agent_card(
7375
default to ["text/plain"].
7476
default_output_modes (Optional[List[str]]): A list of output modes,
7577
default to ["application/json"].
78+
streaming (bool): Whether to enable streaming for the agent.
79+
Defaults to False.
7680
7781
Returns:
7882
AgentCard: A fully constructed AgentCard object.
@@ -96,8 +100,7 @@ def create_agent_card(
96100
version="1.0.0",
97101
default_input_modes=default_input_modes or ["text/plain"],
98102
default_output_modes=default_output_modes or ["application/json"],
99-
# Agent Engine does not support streaming yet
100-
capabilities=AgentCapabilities(streaming=False),
103+
capabilities=AgentCapabilities(streaming=streaming),
101104
skills=skills,
102105
preferred_transport=TransportProtocol.http_json, # Http Only.
103106
supports_authenticated_extended_card=True,
@@ -185,8 +188,6 @@ def __init__(
185188
raise ValueError(
186189
"Only HTTP+JSON is supported for preferred transport on agent card "
187190
)
188-
if agent_card.capabilities and agent_card.capabilities.streaming:
189-
raise ValueError("Streaming is not supported by Agent Engine")
190191

191192
self._tmpl_attrs: dict[str, Any] = {
192193
"project": initializer.global_config.project,
@@ -334,6 +335,27 @@ def register_operations(self) -> Dict[str, List[str]]:
334335
"on_cancel_task",
335336
]
336337
}
338+
if self.agent_card.capabilities and self.agent_card.capabilities.streaming:
339+
routes["a2a_extension"].append("on_message_send_stream")
340+
routes["a2a_extension"].append("on_resubscribe_to_task")
337341
if self.agent_card.supports_authenticated_extended_card:
338342
routes["a2a_extension"].append("handle_authenticated_agent_card")
339343
return routes
344+
345+
async def on_message_send_stream(
346+
self,
347+
request: "Request",
348+
context: "ServerCallContext",
349+
) -> AsyncIterator[str]:
350+
"""Handles A2A streaming requests via SSE."""
351+
async for chunk in self.rest_handler.on_message_send_stream(request, context):
352+
yield chunk
353+
354+
async def on_resubscribe_to_task(
355+
self,
356+
request: "Request",
357+
context: "ServerCallContext",
358+
) -> AsyncIterator[str]:
359+
"""Handles A2A task resubscription requests via SSE."""
360+
async for chunk in self.rest_handler.on_resubscribe_to_task(request, context):
361+
yield chunk

0 commit comments

Comments
 (0)