From 750e402f3a73512f25d875afd8a055cffb543738 Mon Sep 17 00:00:00 2001 From: Zelys Date: Sat, 4 Apr 2026 16:40:35 -0500 Subject: [PATCH] feat: emit contextWindowFallback stream event before context reduction When the agent catches a ContextWindowOverflowException it now yields a contextWindowFallback streaming event before calling reduce_context, giving callers a real-time signal that context compression is starting. Adds ContextWindowFallbackEvent TypedDict to streaming.py, a matching ContextWindowFallbackStreamEvent TypedEvent subclass in _events.py, and wires it into the overflow handler in agent.py. Closes #1511 --- src/strands/agent/agent.py | 11 ++++++++++- src/strands/types/_events.py | 18 +++++++++++++++++- src/strands/types/streaming.py | 16 ++++++++++++++++ tests/strands/agent/test_agent.py | 27 +++++++++++++++++++++++++++ 4 files changed, 70 insertions(+), 2 deletions(-) diff --git a/src/strands/agent/agent.py b/src/strands/agent/agent.py index 3a23133de..e2c84cf3e 100644 --- a/src/strands/agent/agent.py +++ b/src/strands/agent/agent.py @@ -57,7 +57,14 @@ from ..tools.registry import ToolRegistry from ..tools.structured_output._structured_output_context import StructuredOutputContext from ..tools.watcher import ToolWatcher -from ..types._events import AgentResultEvent, EventLoopStopEvent, InitEventLoopEvent, ModelStreamChunkEvent, TypedEvent +from ..types._events import ( + AgentResultEvent, + ContextWindowFallbackStreamEvent, + EventLoopStopEvent, + InitEventLoopEvent, + ModelStreamChunkEvent, + TypedEvent, +) from ..types.agent import AgentInput, ConcurrentInvocationMode from ..types.content import ContentBlock, Message, Messages, SystemContentBlock from ..types.exceptions import ConcurrencyException, ContextWindowOverflowException @@ -963,6 +970,8 @@ async def _execute_event_loop_cycle( yield event except ContextWindowOverflowException as e: + # Notify callers that context compression is starting before the blocking operation + yield ContextWindowFallbackStreamEvent(message=str(e)) # Try reducing the context size and retrying self.conversation_manager.reduce_context(self, e=e) diff --git a/src/strands/types/_events.py b/src/strands/types/_events.py index 1d5a5de79..7d92332f4 100644 --- a/src/strands/types/_events.py +++ b/src/strands/types/_events.py @@ -16,7 +16,7 @@ from .citations import Citation from .content import Message from .event_loop import Metrics, StopReason, Usage -from .streaming import ContentBlockDelta, StreamEvent +from .streaming import ContentBlockDelta, ContextWindowFallbackEvent, StreamEvent from .tools import ToolResult, ToolUse if TYPE_CHECKING: @@ -216,6 +216,22 @@ def is_callback_event(self) -> bool: return False +class ContextWindowFallbackStreamEvent(TypedEvent): + """Event emitted when the agent handles a context window overflow by reducing context. + + Fired immediately before the conversation manager reduces the context, giving + callers a real-time signal that context compression is about to begin. + """ + + def __init__(self, message: str) -> None: + """Initialize with the overflow error message. + + Args: + message: The overflow error message from the model provider. + """ + super().__init__({"contextWindowFallback": ContextWindowFallbackEvent(message=message)}) + + class EventLoopStopEvent(TypedEvent): """Event emitted when the agent execution completes normally.""" diff --git a/src/strands/types/streaming.py b/src/strands/types/streaming.py index 8ec2e8d7b..636e2cfc3 100644 --- a/src/strands/types/streaming.py +++ b/src/strands/types/streaming.py @@ -205,6 +205,20 @@ class RedactContentEvent(TypedDict, total=False): redactAssistantContentMessage: str | None +class ContextWindowFallbackEvent(TypedDict): + """Event emitted when the agent handles a context window overflow. + + Fired before the conversation manager reduces the context, giving callers a + real-time signal that context compression is about to begin. Useful for + showing progress indicators or logging during long-running agent sessions. + + Attributes: + message: The overflow error message from the model provider. + """ + + message: str + + class StreamEvent(TypedDict, total=False): """The messages output stream. @@ -212,6 +226,7 @@ class StreamEvent(TypedDict, total=False): contentBlockDelta: Delta content for a content block. contentBlockStart: Start of a content block. contentBlockStop: End of a content block. + contextWindowFallback: Context window overflow is being handled by reducing context. internalServerException: Internal server error information. messageStart: Start of a message. messageStop: End of a message. @@ -225,6 +240,7 @@ class StreamEvent(TypedDict, total=False): contentBlockDelta: ContentBlockDeltaEvent contentBlockStart: ContentBlockStartEvent contentBlockStop: ContentBlockStopEvent + contextWindowFallback: ContextWindowFallbackEvent internalServerException: ExceptionEvent messageStart: MessageStartEvent messageStop: MessageStopEvent diff --git a/tests/strands/agent/test_agent.py b/tests/strands/agent/test_agent.py index 5a3cce11c..b867cef3a 100644 --- a/tests/strands/agent/test_agent.py +++ b/tests/strands/agent/test_agent.py @@ -2756,3 +2756,30 @@ def test_as_tool_defaults_description_when_agent_has_none(): tool = agent.as_tool() assert tool.tool_spec["description"] == "Use the researcher agent as a tool by providing a natural language input" + + +@pytest.mark.asyncio +async def test_stream_async_emits_context_window_fallback_event(mock_model, agent, agenerator, alist): + """stream_async yields contextWindowFallback before reducing context on overflow.""" + overflow_message = "Input is too long for requested model" + + mock_model.mock_stream.side_effect = [ + ContextWindowOverflowException(RuntimeError(overflow_message)), + agenerator( + [ + {"contentBlockStart": {"contentBlockIndex": 0, "start": {}}}, + {"contentBlockDelta": {"contentBlockIndex": 0, "delta": {"text": "OK"}}}, + {"contentBlockStop": {"contentBlockIndex": 0}}, + {"messageStop": {"stopReason": "end_turn"}}, + ] + ), + ] + + # Prevent reduce_context from raising when there are no messages to trim. + agent.conversation_manager.reduce_context = unittest.mock.Mock() + + events = await alist(agent.stream_async("hello")) + + fallback_events = [e for e in events if "contextWindowFallback" in e] + assert len(fallback_events) == 1 + assert overflow_message in fallback_events[0]["contextWindowFallback"]["message"]