-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Python: Fix A2AAgent to surface message content from in-progress TaskStatusUpdateEvents #4798
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -91,9 +91,18 @@ def add_in_progress_task_response( | |||||||||||
| task_id: str, | ||||||||||||
| context_id: str = "test-context", | ||||||||||||
| state: TaskState = TaskState.working, | ||||||||||||
| text: str | None = None, | ||||||||||||
giles17 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
| role: A2ARole = A2ARole.agent, | ||||||||||||
| ) -> None: | ||||||||||||
| """Add a mock in-progress Task response (non-terminal).""" | ||||||||||||
| status = TaskStatus(state=state, message=None) | ||||||||||||
| message = None | ||||||||||||
| if text is not None: | ||||||||||||
| message = A2AMessage( | ||||||||||||
| message_id=str(uuid4()), | ||||||||||||
| role=role, | ||||||||||||
| parts=[Part(root=TextPart(text=text))], | ||||||||||||
| ) | ||||||||||||
| status = TaskStatus(state=state, message=message) | ||||||||||||
| task = Task(id=task_id, context_id=context_id, status=status) | ||||||||||||
| client_event = (task, None) | ||||||||||||
| self.responses.append(client_event) | ||||||||||||
|
|
@@ -102,9 +111,10 @@ async def send_message(self, message: Any) -> AsyncIterator[Any]: | |||||||||||
| """Mock send_message method that yields responses.""" | ||||||||||||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The semantics changed from 'pop one response per
Suggested change
|
||||||||||||
| self.call_count += 1 | ||||||||||||
|
|
||||||||||||
| if self.responses: | ||||||||||||
| response = self.responses.pop(0) | ||||||||||||
| # All queued responses are delivered as a single streaming batch per call. | ||||||||||||
| for response in self.responses: | ||||||||||||
| yield response | ||||||||||||
| self.responses.clear() | ||||||||||||
|
|
||||||||||||
| async def resubscribe(self, request: Any) -> AsyncIterator[Any]: | ||||||||||||
| """Mock resubscribe method that yields responses.""" | ||||||||||||
|
|
@@ -1039,3 +1049,144 @@ async def test_run_with_continuation_token_does_not_require_messages(mock_a2a_cl | |||||||||||
|
|
||||||||||||
|
|
||||||||||||
| # endregion | ||||||||||||
|
|
||||||||||||
| # region Streaming with in-progress message content | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
| async def test_streaming_working_updates_yield_message_content( | ||||||||||||
| a2a_agent: A2AAgent, mock_a2a_client: MockA2AClient | ||||||||||||
| ) -> None: | ||||||||||||
| """Test that streaming working updates with status.message yield content.""" | ||||||||||||
| mock_a2a_client.add_in_progress_task_response("task-w", context_id="ctx-w", text="Processing step 1...") | ||||||||||||
| mock_a2a_client.add_in_progress_task_response("task-w", context_id="ctx-w", text="Processing step 2...") | ||||||||||||
| mock_a2a_client.add_task_response("task-w", [{"id": "art-w", "content": "Final result"}]) | ||||||||||||
|
|
||||||||||||
| updates: list[AgentResponseUpdate] = [] | ||||||||||||
| async for update in a2a_agent.run("Hello", stream=True): | ||||||||||||
| updates.append(update) | ||||||||||||
|
|
||||||||||||
| assert len(updates) == 3 | ||||||||||||
| assert updates[0].contents[0].text == "Processing step 1..." | ||||||||||||
| assert updates[1].contents[0].text == "Processing step 2..." | ||||||||||||
| assert updates[2].contents[0].text == "Final result" | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
| async def test_streaming_single_working_update_with_message( | ||||||||||||
| a2a_agent: A2AAgent, mock_a2a_client: MockA2AClient | ||||||||||||
| ) -> None: | ||||||||||||
| """Test that a single working update with message content is not dropped.""" | ||||||||||||
| mock_a2a_client.add_in_progress_task_response("task-s", context_id="ctx-s", text="Thinking...") | ||||||||||||
| mock_a2a_client.add_task_response("task-s", [{"id": "art-s", "content": "Done"}]) | ||||||||||||
|
|
||||||||||||
| updates: list[AgentResponseUpdate] = [] | ||||||||||||
| async for update in a2a_agent.run("Hello", stream=True): | ||||||||||||
| updates.append(update) | ||||||||||||
|
|
||||||||||||
| assert len(updates) == 2 | ||||||||||||
| assert updates[0].contents[0].text == "Thinking..." | ||||||||||||
| assert updates[0].role == "assistant" | ||||||||||||
| assert updates[1].contents[0].text == "Done" | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
| async def test_streaming_working_update_without_message_is_skipped( | ||||||||||||
| a2a_agent: A2AAgent, mock_a2a_client: MockA2AClient | ||||||||||||
| ) -> None: | ||||||||||||
| """Test that working updates without status.message are still silently skipped.""" | ||||||||||||
| mock_a2a_client.add_in_progress_task_response("task-n", context_id="ctx-n") | ||||||||||||
| mock_a2a_client.add_task_response("task-n", [{"id": "art-n", "content": "Result"}]) | ||||||||||||
|
|
||||||||||||
| updates: list[AgentResponseUpdate] = [] | ||||||||||||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test verifies |
||||||||||||
| async for update in a2a_agent.run("Hello", stream=True): | ||||||||||||
| updates.append(update) | ||||||||||||
|
|
||||||||||||
| assert len(updates) == 1 | ||||||||||||
| assert updates[0].contents[0].text == "Result" | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
| async def test_streaming_working_update_user_role_mapping(a2a_agent: A2AAgent, mock_a2a_client: MockA2AClient) -> None: | ||||||||||||
| """Test that A2ARole.user in status message maps to role='user'.""" | ||||||||||||
| mock_a2a_client.add_in_progress_task_response("task-u", context_id="ctx-u", text="User echo", role=A2ARole.user) | ||||||||||||
| mock_a2a_client.add_task_response("task-u", [{"id": "art-u", "content": "Done"}]) | ||||||||||||
|
|
||||||||||||
| updates: list[AgentResponseUpdate] = [] | ||||||||||||
| async for update in a2a_agent.run("Hello", stream=True): | ||||||||||||
| updates.append(update) | ||||||||||||
|
|
||||||||||||
| assert len(updates) == 2 | ||||||||||||
| assert updates[0].contents[0].text == "User echo" | ||||||||||||
| assert updates[0].role == "user" | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
| async def test_background_with_status_message_yields_continuation_token( | ||||||||||||
| a2a_agent: A2AAgent, mock_a2a_client: MockA2AClient | ||||||||||||
| ) -> None: | ||||||||||||
| """Test that background=True takes precedence over status message content.""" | ||||||||||||
| mock_a2a_client.add_in_progress_task_response("task-bg", context_id="ctx-bg", text="Should be ignored") | ||||||||||||
|
|
||||||||||||
| updates: list[AgentResponseUpdate] = [] | ||||||||||||
| async for update in a2a_agent.run("Hello", stream=True, background=True): | ||||||||||||
| updates.append(update) | ||||||||||||
|
|
||||||||||||
| assert len(updates) == 1 | ||||||||||||
| assert updates[0].continuation_token is not None | ||||||||||||
| assert updates[0].continuation_token["task_id"] == "task-bg" | ||||||||||||
| assert updates[0].contents == [] | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
| async def test_non_streaming_does_not_surface_intermediate_messages( | ||||||||||||
| a2a_agent: A2AAgent, mock_a2a_client: MockA2AClient | ||||||||||||
| ) -> None: | ||||||||||||
| """Test that run(stream=False) does not include intermediate status messages.""" | ||||||||||||
| mock_a2a_client.add_in_progress_task_response("task-ns", context_id="ctx-ns", text="Intermediate") | ||||||||||||
| mock_a2a_client.add_task_response("task-ns", [{"id": "art-ns", "content": "Final"}]) | ||||||||||||
|
|
||||||||||||
| response = await a2a_agent.run("Hello") | ||||||||||||
|
|
||||||||||||
| assert len(response.messages) == 1 | ||||||||||||
| assert response.messages[0].text == "Final" | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
| async def test_terminal_no_artifacts_after_working_with_content( | ||||||||||||
| a2a_agent: A2AAgent, mock_a2a_client: MockA2AClient | ||||||||||||
| ) -> None: | ||||||||||||
| """Test that a terminal task with no artifacts after working-state messages does not re-emit the working content.""" | ||||||||||||
| mock_a2a_client.add_in_progress_task_response("task-t", context_id="ctx-t", text="Working on it...") | ||||||||||||
| # Terminal task with no artifacts and no history | ||||||||||||
| status = TaskStatus(state=TaskState.completed, message=None) | ||||||||||||
| task = Task(id="task-t", context_id="ctx-t", status=status) | ||||||||||||
| mock_a2a_client.responses.append((task, None)) | ||||||||||||
|
|
||||||||||||
| updates: list[AgentResponseUpdate] = [] | ||||||||||||
| async for update in a2a_agent.run("Hello", stream=True): | ||||||||||||
| updates.append(update) | ||||||||||||
|
|
||||||||||||
| assert len(updates) == 2 | ||||||||||||
| assert updates[0].contents[0].text == "Working on it..." | ||||||||||||
| # Terminal task with no artifacts yields an empty-contents update | ||||||||||||
| assert updates[1].contents == [] | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
| async def test_streaming_working_update_with_empty_parts_is_skipped( | ||||||||||||
| a2a_agent: A2AAgent, mock_a2a_client: MockA2AClient | ||||||||||||
| ) -> None: | ||||||||||||
| """Test that a working update with status.message but empty parts list is skipped.""" | ||||||||||||
| # Construct a message with an empty parts list (distinct from message=None) | ||||||||||||
| message = A2AMessage( | ||||||||||||
| message_id=str(uuid4()), | ||||||||||||
| role=A2ARole.agent, | ||||||||||||
| parts=[], | ||||||||||||
| ) | ||||||||||||
| status = TaskStatus(state=TaskState.working, message=message) | ||||||||||||
| task = Task(id="task-ep", context_id="ctx-ep", status=status) | ||||||||||||
| mock_a2a_client.responses.append((task, None)) | ||||||||||||
| mock_a2a_client.add_task_response("task-ep", [{"id": "art-ep", "content": "Result"}]) | ||||||||||||
|
|
||||||||||||
| updates: list[AgentResponseUpdate] = [] | ||||||||||||
| async for update in a2a_agent.run("Hello", stream=True): | ||||||||||||
| updates.append(update) | ||||||||||||
|
|
||||||||||||
| assert len(updates) == 1 | ||||||||||||
| assert updates[0].contents[0].text == "Result" | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
| # endregion | ||||||||||||
giles17 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
_update_event(TaskStatusUpdateEvent) is still discarded. The fix readstask.status.messageas a proxy, but the event's ownstatus.messageis the authoritative payload for this streaming frame. If a real client provides a non-None event whose status diverges from the task snapshot (e.g., stale snapshot), intermediate content will be silently wrong. Prefer reading from the event when available:source = _update_event.status if _update_event is not None else task.status.