Skip to content

fix: handle premature stream termination for Anthropic (#1868)#2047

Open
gautamsirdeshmukh wants to merge 1 commit intostrands-agents:mainfrom
gautamsirdeshmukh:main
Open

fix: handle premature stream termination for Anthropic (#1868)#2047
gautamsirdeshmukh wants to merge 1 commit intostrands-agents:mainfrom
gautamsirdeshmukh:main

Conversation

@gautamsirdeshmukh
Copy link
Copy Markdown

Problem

The Anthropic provider's stream method tries to read event.message.usage from the last iterated stream event to extract token usage metadata. However, if the stream terminates prematurely and the last stream event's .message attribute has not yet been populated with usage, this line crashes with an AttributeError.

Solution

Instead of checking event.message.usage for the last stream event, we now call Anthropic SDK's stream.get_final_message() method, which returns a "message snapshot" accumulated from all received events rather than relying on the last event's state. This call is wrapped in a try/except/else block, so that if it fails (which is only possible when zero events were received) we log a warning instead of crashing.

Possible Concerns

One may worry that merely logging a warning when get_final_message() call fails could lead to undercounted token usage. However, this method only fails when the stream yields zero events - in which case, there is no usage data to report anyway. If one or more events were received, the Anthropic SDK guarantees that the snapshot contains usage data (initialized by the mandatory message_start event), and get_final_message() will succeed.

Related Issues

#1868

Documentation PR

N/A

Type of Change

Bug fix

Testing

How have you tested the change? Verify that the changes do not break functionality or introduce warnings in consuming repositories: agents-docs, agents-tools, agents-cli

  • I ran hatch run prepare (added a few tests to cover premature termination + empty stream cases)
  • I also performed a demo of 6 scenarios to compare behavior of old vs new code (for Sonnet and Opus)
======================================================================
  #1868 Fix: Before vs After — claude-sonnet-4-6
======================================================================

  ──────────────────────────────────────────────────────────────────
  Scenario 1: Normal completion
  Full stream with message_stop. Both paths should succeed.
  Stream events: ['message_start', 'content_block_start', 'content_block_delta', 'content_block_stop', 'message_stop']
  ──────────────────────────────────────────────────────────────────

  OLD (event.message.usage on last iterated event):
    ✅ Events: ['message_start', 'content_block_start', 'content_block_delta', 'content_block_stop', 'message_stop']
    ✅ Last event type: message_stop
    ✅ event.message.usage → {'input_tokens': 100, 'output_tokens': 50}

  NEW (stream.get_final_message() snapshot):
    ✅ Events: ['message_start', 'content_block_start', 'content_block_delta', 'content_block_stop', 'message_stop']
    ✅ get_final_message() returned snapshot:
       id=msg_test, model=claude-sonnet-4-6
       stop_reason=None, content=[]
       usage=Usage(cache_creation=None, cache_creation_input_tokens=0, cache_read_input_tokens=0, inference_geo=None, input_tokens=100, output_tokens=50, server_tool_use=None, service_tier=None)
    ✅ Extracted: {'input_tokens': 100, 'output_tokens': 50}

  ──────────────────────────────────────────────────────────────────
  Scenario 2: Premature termination (TextEvent)
  Stream dies after TextEvent. TextEvent has no .message attribute.
  Stream events: ['message_start', 'content_block_start', 'content_block_delta', 'text']
  ──────────────────────────────────────────────────────────────────

  OLD (event.message.usage on last iterated event):
    ❌ Last event type: text
    ❌ event.message.usage → CRASH: AttributeError: 'TextEvent' object has no attribute 'message'

  NEW (stream.get_final_message() snapshot):
    ✅ Events: ['message_start', 'content_block_start', 'content_block_delta']
    ✅ get_final_message() returned snapshot:
       id=msg_test, model=claude-sonnet-4-6
       stop_reason=None, content=[]
       usage=Usage(cache_creation=None, cache_creation_input_tokens=0, cache_read_input_tokens=0, inference_geo=None, input_tokens=100, output_tokens=25, server_tool_use=None, service_tier=None)
    ✅ Extracted: {'input_tokens': 100, 'output_tokens': 25}

  ──────────────────────────────────────────────────────────────────
  Scenario 3: Premature termination (ContentBlockDeltaEvent)
  Stream dies after content_block_delta. No .message on this event type.
  Stream events: ['message_start', 'content_block_delta']
  ──────────────────────────────────────────────────────────────────

  OLD (event.message.usage on last iterated event):
    ❌ Last event type: content_block_delta
    ❌ event.message.usage → CRASH: AttributeError: 'RawContentBlockDeltaEvent' object has no attribute 'message'

  NEW (stream.get_final_message() snapshot):
    ✅ Events: ['message_start', 'content_block_delta']
    ✅ get_final_message() returned snapshot:
       id=msg_test, model=claude-sonnet-4-6
       stop_reason=None, content=[]
       usage=Usage(cache_creation=None, cache_creation_input_tokens=0, cache_read_input_tokens=0, inference_geo=None, input_tokens=100, output_tokens=10, server_tool_use=None, service_tier=None)
    ✅ Extracted: {'input_tokens': 100, 'output_tokens': 10}

  ──────────────────────────────────────────────────────────────────
  Scenario 4: Premature termination (ContentBlockStartEvent)
  Stream dies right after content_block_start. No content delivered.
  Stream events: ['message_start', 'content_block_start']
  ──────────────────────────────────────────────────────────────────

  OLD (event.message.usage on last iterated event):
    ❌ Last event type: content_block_start
    ❌ event.message.usage → CRASH: AttributeError: 'RawContentBlockStartEvent' object has no attribute 'message'

  NEW (stream.get_final_message() snapshot):
    ✅ Events: ['message_start', 'content_block_start']
    ✅ get_final_message() returned snapshot:
       id=msg_test, model=claude-sonnet-4-6
       stop_reason=None, content=[]
       usage=Usage(cache_creation=None, cache_creation_input_tokens=0, cache_read_input_tokens=0, inference_geo=None, input_tokens=200, output_tokens=0, server_tool_use=None, service_tier=None)
    ✅ Extracted: {'input_tokens': 200, 'output_tokens': 0}

  ──────────────────────────────────────────────────────────────────
  Scenario 5: Content delivered, dies before message_stop
  Full content block delivered but stream dies before message_stop.
  Stream events: ['message_start', 'content_block_start', 'content_block_delta', 'content_block_stop']
  ──────────────────────────────────────────────────────────────────

  OLD (event.message.usage on last iterated event):
    ❌ Last event type: content_block_stop
    ❌ event.message.usage → CRASH: AttributeError: 'RawContentBlockStopEvent' object has no attribute 'message'

  NEW (stream.get_final_message() snapshot):
    ✅ Events: ['message_start', 'content_block_start', 'content_block_delta', 'content_block_stop']
    ✅ get_final_message() returned snapshot:
       id=msg_test, model=claude-sonnet-4-6
       stop_reason=None, content=[]
       usage=Usage(cache_creation=None, cache_creation_input_tokens=0, cache_read_input_tokens=0, inference_geo=None, input_tokens=150, output_tokens=40, server_tool_use=None, service_tier=None)
    ✅ Extracted: {'input_tokens': 150, 'output_tokens': 40}

  ──────────────────────────────────────────────────────────────────
  Scenario 6: Empty stream (zero events)
  Immediate connection failure. No events received at all.
  Stream events: []
  ──────────────────────────────────────────────────────────────────

  OLD (event.message.usage on last iterated event):
    ❌ Last event type: (none)
    ❌ event.message.usage → CRASH: UnboundLocalError: cannot access local variable 'event' where it is not associated with a value

  NEW (stream.get_final_message() snapshot):
    ⚠️  Events: []
    ⚠️  get_final_message() → raised exception (caught)
    ⚠️  snapshot.usage → not available (warning logged)

======================================================================
  #1868 Fix: Before vs After — claude-opus-4-6
======================================================================

  ──────────────────────────────────────────────────────────────────
  Scenario 1: Normal completion
  Full stream with message_stop. Both paths should succeed.
  Stream events: ['message_start', 'content_block_start', 'content_block_delta', 'content_block_stop', 'message_stop']
  ──────────────────────────────────────────────────────────────────

  OLD (event.message.usage on last iterated event):
    ✅ Events: ['message_start', 'content_block_start', 'content_block_delta', 'content_block_stop', 'message_stop']
    ✅ Last event type: message_stop
    ✅ event.message.usage → {'input_tokens': 100, 'output_tokens': 50}

  NEW (stream.get_final_message() snapshot):
    ✅ Events: ['message_start', 'content_block_start', 'content_block_delta', 'content_block_stop', 'message_stop']
    ✅ get_final_message() returned snapshot:
       id=msg_test, model=claude-opus-4-6
       stop_reason=None, content=[]
       usage=Usage(cache_creation=None, cache_creation_input_tokens=0, cache_read_input_tokens=0, inference_geo=None, input_tokens=100, output_tokens=50, server_tool_use=None, service_tier=None)
    ✅ Extracted: {'input_tokens': 100, 'output_tokens': 50}

  ──────────────────────────────────────────────────────────────────
  Scenario 2: Premature termination (TextEvent)
  Stream dies after TextEvent. TextEvent has no .message attribute.
  Stream events: ['message_start', 'content_block_start', 'content_block_delta', 'text']
  ──────────────────────────────────────────────────────────────────

  OLD (event.message.usage on last iterated event):
    ❌ Last event type: text
    ❌ event.message.usage → CRASH: AttributeError: 'TextEvent' object has no attribute 'message'

  NEW (stream.get_final_message() snapshot):
    ✅ Events: ['message_start', 'content_block_start', 'content_block_delta']
    ✅ get_final_message() returned snapshot:
       id=msg_test, model=claude-opus-4-6
       stop_reason=None, content=[]
       usage=Usage(cache_creation=None, cache_creation_input_tokens=0, cache_read_input_tokens=0, inference_geo=None, input_tokens=100, output_tokens=25, server_tool_use=None, service_tier=None)
    ✅ Extracted: {'input_tokens': 100, 'output_tokens': 25}

  ──────────────────────────────────────────────────────────────────
  Scenario 3: Premature termination (ContentBlockDeltaEvent)
  Stream dies after content_block_delta. No .message on this event type.
  Stream events: ['message_start', 'content_block_delta']
  ──────────────────────────────────────────────────────────────────

  OLD (event.message.usage on last iterated event):
    ❌ Last event type: content_block_delta
    ❌ event.message.usage → CRASH: AttributeError: 'RawContentBlockDeltaEvent' object has no attribute 'message'

  NEW (stream.get_final_message() snapshot):
    ✅ Events: ['message_start', 'content_block_delta']
    ✅ get_final_message() returned snapshot:
       id=msg_test, model=claude-opus-4-6
       stop_reason=None, content=[]
       usage=Usage(cache_creation=None, cache_creation_input_tokens=0, cache_read_input_tokens=0, inference_geo=None, input_tokens=100, output_tokens=10, server_tool_use=None, service_tier=None)
    ✅ Extracted: {'input_tokens': 100, 'output_tokens': 10}

  ──────────────────────────────────────────────────────────────────
  Scenario 4: Premature termination (ContentBlockStartEvent)
  Stream dies right after content_block_start. No content delivered.
  Stream events: ['message_start', 'content_block_start']
  ──────────────────────────────────────────────────────────────────

  OLD (event.message.usage on last iterated event):
    ❌ Last event type: content_block_start
    ❌ event.message.usage → CRASH: AttributeError: 'RawContentBlockStartEvent' object has no attribute 'message'

  NEW (stream.get_final_message() snapshot):
    ✅ Events: ['message_start', 'content_block_start']
    ✅ get_final_message() returned snapshot:
       id=msg_test, model=claude-opus-4-6
       stop_reason=None, content=[]
       usage=Usage(cache_creation=None, cache_creation_input_tokens=0, cache_read_input_tokens=0, inference_geo=None, input_tokens=200, output_tokens=0, server_tool_use=None, service_tier=None)
    ✅ Extracted: {'input_tokens': 200, 'output_tokens': 0}

  ──────────────────────────────────────────────────────────────────
  Scenario 5: Content delivered, dies before message_stop
  Full content block delivered but stream dies before message_stop.
  Stream events: ['message_start', 'content_block_start', 'content_block_delta', 'content_block_stop']
  ──────────────────────────────────────────────────────────────────

  OLD (event.message.usage on last iterated event):
    ❌ Last event type: content_block_stop
    ❌ event.message.usage → CRASH: AttributeError: 'RawContentBlockStopEvent' object has no attribute 'message'

  NEW (stream.get_final_message() snapshot):
    ✅ Events: ['message_start', 'content_block_start', 'content_block_delta', 'content_block_stop']
    ✅ get_final_message() returned snapshot:
       id=msg_test, model=claude-opus-4-6
       stop_reason=None, content=[]
       usage=Usage(cache_creation=None, cache_creation_input_tokens=0, cache_read_input_tokens=0, inference_geo=None, input_tokens=150, output_tokens=40, server_tool_use=None, service_tier=None)
    ✅ Extracted: {'input_tokens': 150, 'output_tokens': 40}

  ──────────────────────────────────────────────────────────────────
  Scenario 6: Empty stream (zero events)
  Immediate connection failure. No events received at all.
  Stream events: []
  ──────────────────────────────────────────────────────────────────

  OLD (event.message.usage on last iterated event):
    ❌ Last event type: (none)
    ❌ event.message.usage → CRASH: UnboundLocalError: cannot access local variable 'event' where it is not associated with a value

  NEW (stream.get_final_message() snapshot):
    ⚠️  Events: []
    ⚠️  get_final_message() → raised exception (caught)
    ⚠️  snapshot.usage → not available (warning logged)

======================================================================
  Done.
======================================================================

Checklist

  • I have read the CONTRIBUTING document
  • I have added any necessary tests that prove my fix is effective or my feature works
  • I have updated the documentation accordingly
  • I have added an appropriate example to the documentation to outline the feature, or no new docs are needed
  • My changes generate no new warnings
  • Any dependent changes have been merged and published

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@codecov
Copy link
Copy Markdown

codecov bot commented Apr 3, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.

📢 Thoughts on this report? Let us know!

@gautamsirdeshmukh
Copy link
Copy Markdown
Author

gautamsirdeshmukh commented Apr 3, 2026

Looking into why 1-3 integ tests continue to time out intermittently (this change to the Anthropic stream method has zero connection to the failed cases for multi-agent executions, concurrency for tests may need to be optimized).

Edit: Looks like #2044 is seeing the same test failure. Certainly unrelated to either change, as expected.

@mkmeral
Copy link
Copy Markdown
Contributor

mkmeral commented Apr 6, 2026

/strands review

@gautamsirdeshmukh
Copy link
Copy Markdown
Author

/strands are there any issues with this change?

yield event

mock_stream.__aiter__ = mock_aiter
mock_stream.get_final_message.return_value = unittest.mock.Mock(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we build this into the agenerator? That way we dont need to redefine this feature for multiple tests in this file?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on this. The mock stream setup pattern (creating an AsyncMock with __aiter__ and get_final_message) is repeated 3 times across test_stream, test_stream_early_termination, and test_stream_empty. A shared fixture or helper would reduce duplication and make it easier to add more streaming tests. Additionally, test_structured_output (line 892) needs the same update — it still uses the old agenerator pattern which doesn't have get_final_message().

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agenerator is just a plain async generator, it yields items but that's about it. We need to create a custom generator in order to attach/mock the get_final_message method as well. I do agree that we should have a helper method or something of that nature to reduce duplication

@JackYPCOnline
Copy link
Copy Markdown
Contributor

/strands review

yield self.format_chunk({"type": "metadata", "usage": usage.model_dump()})
try:
message_snapshot = await stream.get_final_message()
except Exception as e:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this error catch more targeted? Like AssertionError or something?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. The Anthropic SDK's get_final_message() specifically does:

assert self.__final_message_snapshot is not None

So AssertionError is the right type to catch here. Catching broad Exception masks real issues — for example, test_structured_output currently passes despite using a mock without get_final_message() only because the AttributeError is silently swallowed by except Exception.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, scoping this catch to AttributeError, we should surface unexpected exceptions

@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 6, 2026

Assessment: Request Changes

Good fix for a real crash scenario. The approach of using stream.get_final_message() is sound and well-motivated. Two issues need to be addressed before merging:

Review Details
  • Exception handling: The except Exception catch is too broad (as noted by @Unshure). The Anthropic SDK raises AssertionError specifically from get_final_message(). Catching all exceptions masks real bugs — including the one described below.
  • Broken test coverage: test_structured_output (line 892) still uses agenerator (a plain async generator without get_final_message()). It currently only passes because the broad except Exception silently swallows the AttributeError. Narrowing the catch (which is the right thing to do) will expose this as a test failure. This test needs the same mock stream pattern used in the new tests.
  • Test duplication: The mock stream setup is repeated across 3 tests. Consider extracting a shared fixture (as @Unshure suggested), which would also naturally fix the test_structured_output issue.

The core fix is solid and the thorough scenario testing in the PR description is appreciated.

@gautamsirdeshmukh
Copy link
Copy Markdown
Author

/strands review

@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 6, 2026

Assessment: Approve

All previous review feedback has been addressed cleanly. The exception catch is now scoped to AssertionError (matching the Anthropic SDK's actual raise), the generate_mock_stream helper eliminates test duplication, and test_structured_output has been updated to use the new pattern.

Review Summary
  • Production code: The stream.get_final_message() approach with try/except AssertionError/else is correct and resilient. The logging format follows project conventions.
  • Test coverage: Three scenarios covered — normal completion, early termination, and empty stream. The test_structured_output test is also properly updated.
  • Helper design: generate_mock_stream() handles both success (return_value) and failure (side_effect) cases via the isinstance(final_message, Exception) check, which is clean.
  • Documentation: N/A appropriate — this is a bug fix with no user-facing API changes.
  • API review: Not required — no public API surface changes.

Solid fix with thorough testing.

@gautamsirdeshmukh
Copy link
Copy Markdown
Author

/strands review

@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 6, 2026

Assessment: Approve

All prior review feedback has been addressed. No new issues found.

Review Details
  • Exception handling: except AssertionError correctly matches the Anthropic SDK's get_final_message() failure mode (assert self.__final_message_snapshot is not None). Since until_done() is a no-op after the stream is fully iterated, no other exceptions are expected from this call path.
  • Test helper: generate_mock_stream_context() cleanly encapsulates both the mock stream (with __aiter__ and get_final_message) and the async context manager, eliminating the prior duplication. All 4 call sites pass final_message explicitly.
  • Test coverage: Normal completion, early termination, and empty stream scenarios are covered. test_structured_output is properly updated. All agenerator references removed from affected tests.
  • Conventions: Logging follows AGENTS.md structured format. No public API changes, so no docs PR or API review needed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants