Skip to content

Python: Fix A2AAgent to surface message content from in-progress TaskStatusUpdateEvents#4798

Open
giles17 wants to merge 3 commits intomicrosoft:mainfrom
giles17:agent/fix-4783-1
Open

Python: Fix A2AAgent to surface message content from in-progress TaskStatusUpdateEvents#4798
giles17 wants to merge 3 commits intomicrosoft:mainfrom
giles17:agent/fix-4783-1

Conversation

@giles17
Copy link
Contributor

@giles17 giles17 commented Mar 19, 2026

Motivation and Context

When an A2A remote agent sent TaskStatusUpdateEvents in a working/submitted state with status.message content (e.g., intermediate streaming text), A2AAgent._parse_task_into_updates silently discarded those messages. This caused callers to lose intermediate streaming content during non-background task execution.

Fixes #4783

Description

The root cause was that _parse_task_into_updates only handled terminal task states and background continuation tokens, returning an empty list for all other in-progress updates — even when task.status.message carried meaningful text parts. The fix adds a check after the background-token branch: if the in-progress task's status has a non-empty message with parts, those parts are parsed into an AgentResponseUpdate and yielded to the caller. Three regression tests verify that working updates with message content are surfaced, that multiple intermediate messages stream correctly, and that working updates without messages remain silently skipped.

Contribution Checklist

  • The code builds clean without any errors or warnings
  • The PR follows the Contribution Guidelines
  • All unit tests pass, and I have added new tests where possible
  • Is this a breaking change? If yes, add "[BREAKING]" prefix to the title of the PR.

Note: PR autogenerated by giles17's agent

…teEvents (microsoft#4783)

_updates_from_task() returned [] for working-state tasks when
background=False, silently discarding all intermediate message content
from task.status.message. Now extracts and yields message parts from
in-progress status updates during streaming.

Also fixed MockA2AClient.send_message to yield all queued responses
(enabling multi-event streaming tests) and added text parameter to
add_in_progress_task_response for tests that need status messages.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Copilot AI review requested due to automatic review settings March 19, 2026 21:08
@giles17 giles17 self-assigned this Mar 19, 2026
Copy link
Contributor Author

@giles17 giles17 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated Code Review

Reviewers: 4 | Confidence: 89%

✓ Correctness

The production code change correctly addresses the bug where in-progress streaming updates with message content were silently dropped when background=False. The new guard clause in _updates_from_task properly surfaces status.message content after the background/terminal checks. The mock change (yielding all responses instead of one) is required for the new multi-event tests and doesn't break existing tests since they each enqueue only a single response. The new tests cover the key scenarios well. The only minor issue is a stale docstring on _map_a2a_stream that still describes all in-progress updates as silently consumed in non-background mode.

✓ Security Reliability

The fix correctly surfaces in-progress streaming message content that was previously silently dropped. The new code block follows existing patterns for parsing A2A parts and mapping roles. The mock send_message change from pop-one to yield-all is necessary to simulate realistic multi-event streams and doesn't break existing single-response tests. No security or reliability issues found — no injection risks, no resource leaks, and input validation (None checks, empty-parts checks, empty-contents checks) is thorough.

✗ Test Coverage

The three new tests cover the primary happy paths for the bug fix (multiple working updates with messages, single working update, and working update without message). However, there are meaningful gaps: the user-role mapping branch in the new production code is never exercised (all tests hardcode A2ARole.agent), the interaction between background=True and a status message is untested, and there is no test for the edge case where status.message exists but parts is empty.

✗ Design Approach

The fix correctly surfaces task.status.message content during in-progress streaming events and is well-grounded: the A2A SDK's ClientTaskManager copies event.status (including event.status.message) into task.status, so reading from task.status.message in _updates_from_task is equivalent to reading from the discarded _update_event. However, the SDK also appends each working-state message to task.history. Because _parse_messages_from_task falls back to task.history[-1] when a terminal task has no artifacts, the last working-state message already surfaced during streaming will be yielded a second time by the terminal-task handler — a duplication scenario introduced by this fix. All three new tests use artifact-based terminal responses, so they never exercise this fallback. The mock change from pop(0) to iterating-all-and-clearing is safe for the existing test suite (all tests call send_message once) and is necessary to deliver multiple responses in a single streaming call.

Flagged Issues

  • The role ternary ("assistant" if ... == A2ARole.agent else "user") at line 385 of _agent.py is only tested for the A2ARole.agent branch. The "user" path is never exercised because add_in_progress_task_response hardcodes role=A2ARole.agent. A test with role=A2ARole.user is needed to cover the else branch.
  • Potential double-delivery of the last working-state message: the A2A SDK's ClientTaskManager appends each event.status.message to task.history. When the terminal event arrives without artifacts, _parse_messages_from_task falls back to task.history[-1], which is the last working-state message already surfaced during streaming. None of the new tests cover a no-artifact terminal response after working-state messages, so this duplication path is untested.

Suggestions

  • The _map_a2a_stream docstring (lines 316-318) still says in-progress task updates are "silently consumed" when background=False, which is no longer accurate. Update it to note that updates with message content are now yielded.
  • The new code block at line 379 does not guard on task.status.state in IN_PROGRESS_TASK_STATES. While currently unreachable for unknown states, adding an explicit state guard would be more defensive against future A2A spec additions.
  • Add a test for background=True with a working-state update carrying a status.message, to verify the continuation-token path (line 366) takes priority and message content is not surfaced. This documents the intentional precedence and guards against regressions.
  • Consider adding a test where task.status.message is present but parts is an empty list, to verify the guard on line 379 correctly falls through to return [].
  • When background=True and the task is in-progress, the early-return at line 366 silently drops status.message content. Add a comment clarifying this is intentional, or consider whether the continuation-token update should also carry the contents from task.status.message.

Automated review by giles17's agents

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Fixes A2A streaming behavior in the Python A2A integration so callers can receive intermediate message content emitted via in-progress TaskStatusUpdateEvents (e.g., working updates with status.message text parts), addressing #4783.

Changes:

  • Update A2AAgent._updates_from_task() to convert in-progress task.status.message.parts into AgentResponseUpdates (when not running in background=True mode).
  • Enhance the test mock client to yield multiple queued SSE-like events per send_message() call.
  • Add regression tests covering streaming of multiple in-progress status messages, a single in-progress message, and skipping in-progress updates without message content.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.

File Description
python/packages/a2a/agent_framework_a2a/_agent.py Surfaces status.message content for in-progress task updates as AgentResponseUpdates.
python/packages/a2a/tests/test_a2a_agent.py Adds streaming regression tests and updates the mock client to support multi-event streams.

@giles17 giles17 marked this pull request as draft March 19, 2026 21:26
@giles17 giles17 marked this pull request as ready for review March 19, 2026 21:28
@markwallace-microsoft
Copy link
Member

markwallace-microsoft commented Mar 20, 2026

Python Test Coverage

Python Test Coverage Report •
FileStmtsMissCoverMissing
packages/a2a/agent_framework_a2a
   _agent.py2101692%352, 357, 359, 475, 496, 517, 537, 551, 565, 577–578, 620–621, 650–652
TOTAL27300321888% 

Python Unit Test Overview

Tests Skipped Failures Errors Time
5355 20 💤 0 ❌ 0 🔥 1m 27s ⏱️

Copy link
Contributor Author

@giles17 giles17 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated Code Review

Reviewers: 4 | Confidence: 89%

✓ Correctness

The fix correctly addresses the bug where streaming in-progress TaskStatusUpdateEvents with message content were silently dropped. The new emit_intermediate flag gates intermediate content emission on stream=True, preserving backward compatibility for non-streaming callers. The mock send_message change from pop-one to yield-all is necessary for multi-event streaming tests and is backward-compatible since all existing tests add only one response. Tests are thorough, covering role mapping, background precedence, and edge cases. No correctness issues found.

✓ Security Reliability

This PR fixes a real bug where streaming in-progress status updates with message content were silently dropped. The implementation is clean: it adds an emit_intermediate flag propagated from stream in run(), and surfaces task.status.message parts for non-background streaming callers. The background check correctly takes precedence (returning a continuation token instead of content). The mock send_message change from single-pop to yield-all is necessary for multi-event streaming tests and is safe since all existing tests add exactly one response. No security issues (no injection, deserialization, secrets, or resource leak risks). One minor defensive coding suggestion regarding an explicit state guard on the new intermediate block.

✓ Test Coverage

The new tests provide solid coverage for the emit_intermediate feature: multiple streaming updates, single update, no-message skip, role mapping, background precedence, non-streaming exclusion, and terminal-after-working. The mock send_message change from yield-one to yield-all is backward-compatible since all existing tests add a single response. One minor gap: there is no test for the edge case where task.status.message has an empty parts list (not None but []), which exercises the task.status.message.parts truthiness check. Additionally, the response_id field on intermediate updates is never asserted.

✓ Design Approach

The fix correctly addresses the root cause: streaming TaskStatusUpdateEvent items with non-terminal (working) states were silently dropped because _updates_from_task had no path to yield intermediate message content. Adding emit_intermediate (tied to stream=True) and reading task.status.message is a clean, minimal change. Two design concerns remain: (1) the _update_event from the stream tuple is still unconditionally discarded — the fix reads the task snapshot (task.status.message) rather than the event's own payload (_update_event.status.message), which is the authoritative source for streaming content and works only because the client happens to embed the event data into the task snapshot; (2) the mock's send_message is silently changed from 'pop-one-per-call' to 'yield-all-then-clear', which is necessary for multi-event tests but would silently break any future test that calls run() twice against accumulated responses — the mock now behaves differently from the old design without documentation of that contract change.

Suggestions

  • Consider whether run(stream=True, background=True) should surface both the intermediate message content and the continuation token rather than silently discarding the message. The test test_background_with_status_message_yields_continuation_token verifies that background takes precedence and content is lost, which is a reasonable default but may surprise callers who want both.
  • The emit_intermediate block (around line 447) does not verify task.status.state in IN_PROGRESS_TASK_STATES, unlike the background block above it. Adding an explicit state check would be more defensive and consistent: if the A2A protocol adds a new non-terminal, non-in-progress state in the future, content could be unexpectedly emitted.
  • The _update_event from task, _update_event = item in _map_a2a_stream is still discarded. The TaskStatusUpdateEvent is the authoritative source for streamed content, not the task snapshot. If a real A2A client ever provides a non-None event whose status.message differs from task.status.message, intermediate content will be read from the wrong source. Consider reading from the event when available and falling back to the task snapshot.
  • The send_message mock change from pop-one-per-call to yield-all-then-clear is implicitly required by the new tests but isn't documented. Adding a docstring comment explaining the new contract ('all queued responses are consumed by a single send_message call') would prevent future test authors from relying on the removed per-call behavior.
  • Add a test where task.status.message exists but has an empty parts list ([]), verifying the update is skipped. This exercises the task.status.message.parts falsiness branch distinctly from message=None.
  • Consider asserting response_id on intermediate updates (e.g., assert updates[0].response_id == "task-w") in at least one streaming test to verify task ID propagation.
  • In test_terminal_no_artifacts_after_working_with_content, the assertion updates[1].contents == [] documents that a terminal task with no artifacts always yields an empty-content update. Consider whether streaming consumers should receive this empty sentinel at all; if not, the fix to suppress it is out of scope here but worth a follow-up issue.

Automated review by giles17's agents

@@ -373,7 +379,7 @@ async def _map_a2a_stream(
yield update
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _update_event (TaskStatusUpdateEvent) is still discarded. The fix reads task.status.message as a proxy, but the event's own status.message is the authoritative payload for this streaming frame. If a real client provides a non-None event whose status diverges from the task snapshot (e.g., stale snapshot), intermediate content will be silently wrong. Prefer reading from the event when available: source = _update_event.status if _update_event is not None else task.status.

Suggested change
yield update
task, _update_event = item
effective_status = _update_event.status if _update_event is not None else task.status
for update in self._updates_from_task(task, background=background, emit_intermediate=emit_intermediate, status_override=effective_status):

return [
AgentResponseUpdate(
contents=contents,
role="assistant" if task.status.message.role == A2ARole.agent else "user",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defensive: Consider adding task.status.state in IN_PROGRESS_TASK_STATES to this condition, matching the pattern used by the background branch above. Without it, any unknown future non-terminal task state would emit intermediate content when streaming, which may not be intended.

Suggested change
role="assistant" if task.status.message.role == A2ARole.agent else "user",
if emit_intermediate and task.status.state in IN_PROGRESS_TASK_STATES and task.status.message is not None and task.status.message.parts:

@@ -102,9 +111,9 @@ async def send_message(self, message: Any) -> AsyncIterator[Any]:
"""Mock send_message method that yields responses."""
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The semantics changed from 'pop one response per send_message call' to 'yield all queued responses and clear'. This is correct for multi-event streaming but silently removes support for multi-call test scenarios. A short comment documenting the new contract would prevent future surprises.

Suggested change
"""Mock send_message method that yields responses."""
# All queued responses are delivered as a single streaming batch per call.
for response in self.responses:
yield response
self.responses.clear()

mock_a2a_client.add_in_progress_task_response("task-n", context_id="ctx-n")
mock_a2a_client.add_task_response("task-n", [{"id": "art-n", "content": "Result"}])

updates: list[AgentResponseUpdate] = []
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test verifies stream=False doesn't surface intermediate messages. Consider also adding a test for the case where status.message exists but parts is an empty list [], to confirm the truthiness guard in _updates_from_task works as expected (distinct from message=None).

…nd add missing test coverage

- Add emit_intermediate parameter to _updates_from_task and _map_a2a_stream
- Thread stream flag from run() so only streaming callers see intermediate updates
- Add IN_PROGRESS_TASK_STATES guard to emit_intermediate condition
- Add role parameter to test helper add_in_progress_task_response
- Add clarifying comment on MockA2AClient.send_message batch semantics
- Add tests for user role mapping, background precedence, non-streaming behavior,
  terminal task with no artifacts, and empty parts edge case

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Python: [Bug]: A2AAgent failed to parse message content from TaskStatusUpdateEvent

3 participants