Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions vertexai/_genai/_agent_engines_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -633,9 +633,9 @@ def _generate_class_methods_spec_or_raise(
class_method = _to_proto(schema_dict)
class_method[_MODE_KEY_IN_SCHEMA] = mode
if hasattr(agent, "agent_card"):
class_method[_A2A_AGENT_CARD] = getattr(
agent, "agent_card"
).model_dump_json()
class_method[_A2A_AGENT_CARD] = json_format.MessageToJson(
getattr(agent, "agent_card")
)
class_methods_spec.append(class_method)

return class_methods_spec
Expand Down Expand Up @@ -1234,9 +1234,16 @@ def _upload_agent_engine(
cloudpickle.dump(agent, f)
except Exception as e:
url = "https://cloud.google.com/vertex-ai/generative-ai/docs/agent-engine/develop/custom#deployment-considerations"
raise TypeError(
f"Failed to serialize agent engine. Visit {url} for details."
) from e
error_msg = f"Failed to serialize agent engine. Visit {url} for details."
if "google._upb._message" in str(e) or "Descriptor" in str(e):
error_msg += (
" This is often caused by protobuf objects (like Part, AgentCard) "
"being imported at the global module level. Please move these "
"imports inside the functions or methods where they are used. "
"Alternatively, you can import the entire module: "
"`from a2a import types`."
)
raise TypeError(error_msg) from e
with blob.open("rb") as f:
try:
_ = cloudpickle.load(f)
Expand Down Expand Up @@ -1796,13 +1803,6 @@ async def _method(self, **kwargs) -> Any: # type: ignore[no-untyped-def]
if not hasattr(a2a_agent_card, "preferred_transport"):
a2a_agent_card.preferred_transport = TransportProtocol.http_json

# AE cannot support streaming yet. Turn off streaming for now.
if a2a_agent_card.capabilities and a2a_agent_card.capabilities.streaming:
raise ValueError(
"Streaming is not supported in Agent Engine, please change "
"a2a_agent_card.capabilities.streaming to False."
)

if not hasattr(a2a_agent_card.capabilities, "streaming"):
a2a_agent_card.capabilities.streaming = False

Expand Down
9 changes: 6 additions & 3 deletions vertexai/_genai/agent_engines.py
Original file line number Diff line number Diff line change
Expand Up @@ -1834,10 +1834,13 @@ def _create_config(
agent_card = getattr(agent, "agent_card")
if agent_card:
try:
agent_engine_spec["agent_card"] = agent_card.model_dump(
exclude_none=True
from google.protobuf import json_format
import json

agent_engine_spec["agent_card"] = json.loads(
json_format.MessageToJson(agent_card)
)
except TypeError as e:
except Exception as e:
raise ValueError(
f"Failed to convert agent card to dict (serialization error): {e}"
) from e
Expand Down
52 changes: 37 additions & 15 deletions vertexai/agent_engines/_agent_engines.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,23 +119,28 @@
try:
from a2a.types import (
AgentCard,
TransportProtocol,
AgentInterface,
Message,
TaskIdParams,
TaskQueryParams,
)
from a2a.utils.constants import TransportProtocol, PROTOCOL_VERSION_CURRENT
from a2a.client import ClientConfig, ClientFactory

AgentCard = AgentCard
AgentInterface = AgentInterface
TransportProtocol = TransportProtocol
PROTOCOL_VERSION_CURRENT = PROTOCOL_VERSION_CURRENT
Message = Message
ClientConfig = ClientConfig
ClientFactory = ClientFactory
TaskIdParams = TaskIdParams
TaskQueryParams = TaskQueryParams
except (ImportError, AttributeError):
AgentCard = None
AgentInterface = None
TransportProtocol = None
PROTOCOL_VERSION_CURRENT = None
Message = None
ClientConfig = None
ClientFactory = None
Expand Down Expand Up @@ -1216,9 +1221,16 @@ def _upload_agent_engine(
cloudpickle.dump(agent_engine, f)
except Exception as e:
url = "https://cloud.google.com/vertex-ai/generative-ai/docs/agent-engine/develop/custom#deployment-considerations"
raise TypeError(
f"Failed to serialize agent engine. Visit {url} for details."
) from e
error_msg = f"Failed to serialize agent engine. Visit {url} for details."
if "google._upb._message" in str(e) or "Descriptor" in str(e):
error_msg += (
" This is often caused by protobuf objects (like Part, AgentCard) "
"being imported at the global module level. Please move these "
"imports inside the functions or methods where they are used. "
"Alternatively, you can import the entire module: "
"`from a2a import types as a2a_types`."
)
raise TypeError(error_msg) from e
with blob.open("rb") as f:
try:
_ = cloudpickle.load(f)
Expand Down Expand Up @@ -1736,16 +1748,23 @@ async def _method(self, **kwargs) -> Any:

# A2A + AE integration currently only supports Rest API.
if (
a2a_agent_card.preferred_transport
and a2a_agent_card.preferred_transport != TransportProtocol.http_json
a2a_agent_card.supported_interfaces
and a2a_agent_card.supported_interfaces[0].protocol_binding
!= TransportProtocol.HTTP_JSON
):
raise ValueError(
"Only HTTP+JSON is supported for preferred transport on agent card "
"Only HTTP+JSON is supported for primary interface on agent card "
)

# Set preferred transport to HTTP+JSON if not set.
if not hasattr(a2a_agent_card, "preferred_transport"):
a2a_agent_card.preferred_transport = TransportProtocol.http_json
# Set primary interface to HTTP+JSON if not set.
if not a2a_agent_card.supported_interfaces:
a2a_agent_card.supported_interfaces = []
a2a_agent_card.supported_interfaces.append(
AgentInterface(
protocol_binding=TransportProtocol.HTTP_JSON,
protocol_version=PROTOCOL_VERSION_CURRENT,
)
)

# AE cannot support streaming yet. Turn off streaming for now.
if a2a_agent_card.capabilities and a2a_agent_card.capabilities.streaming:
Expand All @@ -1759,12 +1778,13 @@ async def _method(self, **kwargs) -> Any:

# agent_card is set on the class_methods before set_up is invoked.
# Ensure that the agent_card url is set correctly before the client is created.
a2a_agent_card.url = f"https://{initializer.global_config.api_endpoint}/v1beta1/{self.resource_name}/a2a"
url = f"https://{initializer.global_config.api_endpoint}/v1beta1/{self.resource_name}/a2a"
a2a_agent_card.supported_interfaces[0].url = url

# Using a2a client, inject the auth token from the global config.
config = ClientConfig(
supported_transports=[
TransportProtocol.http_json,
TransportProtocol.HTTP_JSON,
],
use_client_preference=True,
httpx_client=httpx.AsyncClient(
Expand Down Expand Up @@ -1977,9 +1997,11 @@ def _generate_class_methods_spec_or_raise(
class_method[_MODE_KEY_IN_SCHEMA] = mode
# A2A agent card is a special case, when running in A2A mode,
if hasattr(agent_engine, "agent_card"):
class_method[_A2A_AGENT_CARD] = getattr(
agent_engine, "agent_card"
).model_dump_json()
from google.protobuf import json_format

class_method[_A2A_AGENT_CARD] = json_format.MessageToJson(
getattr(agent_engine, "agent_card")
)
class_methods_spec.append(class_method)

return class_methods_spec
Expand Down
Loading
Loading