From 6143c8b715076b8e77efccb3eb54af6ebb8de408 Mon Sep 17 00:00:00 2001 From: Zelys Date: Sun, 5 Apr 2026 17:11:33 -0500 Subject: [PATCH] fix(streaming): handle tool use metadata in contentBlockDelta for non-standard models Some Bedrock models (e.g., Kimi K2.5) send toolUseId and name in contentBlockDelta rather than in contentBlockStart. The parser only extracted input from the delta, leaving current_tool_use without those fields and crashing in handle_content_block_stop. - Capture toolUseId and name from delta when not already set by start - Use .get("input", "") to avoid KeyError when input key is absent - Add warning + skip for incomplete tool use blocks in stop handler Fixes #1646 --- src/strands/event_loop/streaming.py | 23 ++++- tests/strands/event_loop/test_streaming.py | 107 +++++++++++++++++++++ 2 files changed, 127 insertions(+), 3 deletions(-) diff --git a/src/strands/event_loop/streaming.py b/src/strands/event_loop/streaming.py index 0a1161135..a13694dbe 100644 --- a/src/strands/event_loop/streaming.py +++ b/src/strands/event_loop/streaming.py @@ -210,10 +210,19 @@ def handle_content_block_delta( typed_event: ModelStreamEvent = ModelStreamEvent({}) if "toolUse" in delta_content: + tool_use_delta = delta_content["toolUse"] if "input" not in state["current_tool_use"]: state["current_tool_use"]["input"] = "" - state["current_tool_use"]["input"] += delta_content["toolUse"]["input"] + state["current_tool_use"]["input"] += tool_use_delta.get("input", "") + + # Some models provide toolUseId and name in contentBlockDelta instead of contentBlockStart. + # Capture them here if not already set from a prior contentBlockStart event. + if "toolUseId" not in state["current_tool_use"] and "toolUseId" in tool_use_delta: + state["current_tool_use"]["toolUseId"] = tool_use_delta["toolUseId"] + if "name" not in state["current_tool_use"] and "name" in tool_use_delta: + state["current_tool_use"]["name"] = tool_use_delta["name"] + typed_event = ToolUseStreamEvent(delta_content, state["current_tool_use"]) elif "text" in delta_content: @@ -281,8 +290,16 @@ def handle_content_block_stop(state: dict[str, Any]) -> dict[str, Any]: except ValueError: current_tool_use["input"] = {} - tool_use_id = current_tool_use["toolUseId"] - tool_use_name = current_tool_use["name"] + tool_use_id = current_tool_use.get("toolUseId", "") + tool_use_name = current_tool_use.get("name", "") + + if not tool_use_id or not tool_use_name: + logger.warning( + "Incomplete tool use block (missing toolUseId or name); skipping content block. " + "The model may be using a non-standard streaming format." + ) + state["current_tool_use"] = {} + return state tool_use = ToolUse( toolUseId=tool_use_id, diff --git a/tests/strands/event_loop/test_streaming.py b/tests/strands/event_loop/test_streaming.py index 93f8d95f8..dff9c5688 100644 --- a/tests/strands/event_loop/test_streaming.py +++ b/tests/strands/event_loop/test_streaming.py @@ -1422,3 +1422,110 @@ async def test_process_stream_keeps_tool_use_stop_reason_unchanged(agenerator, a last_event = cast(ModelStopReason, (await alist(stream))[-1]) assert last_event["stop"][0] == "tool_use" + + +def test_handle_content_block_delta_captures_tool_use_id_and_name_from_delta(): + """Delta events that include toolUseId and name should populate current_tool_use.""" + event = {"delta": {"toolUse": {"input": '{"x": 1}', "toolUseId": "abc123", "name": "output_slide"}}} + state = {"current_tool_use": {}} + + updated_state, _ = strands.event_loop.streaming.handle_content_block_delta(event, state) + + assert updated_state["current_tool_use"]["toolUseId"] == "abc123" + assert updated_state["current_tool_use"]["name"] == "output_slide" + assert updated_state["current_tool_use"]["input"] == '{"x": 1}' + + +def test_handle_content_block_delta_does_not_override_existing_tool_use_id_and_name(): + """toolUseId and name from contentBlockStart should not be overridden by a later delta.""" + event = {"delta": {"toolUse": {"input": '{"x": 1}', "toolUseId": "from_delta", "name": "from_delta"}}} + state = {"current_tool_use": {"toolUseId": "from_start", "name": "from_start", "input": ""}} + + updated_state, _ = strands.event_loop.streaming.handle_content_block_delta(event, state) + + assert updated_state["current_tool_use"]["toolUseId"] == "from_start" + assert updated_state["current_tool_use"]["name"] == "from_start" + + +def test_handle_content_block_delta_tool_use_without_input_key(): + """A toolUse delta missing the input key should not raise KeyError.""" + event = {"delta": {"toolUse": {}}} + state = {"current_tool_use": {"toolUseId": "t1", "name": "tool"}} + + updated_state, _ = strands.event_loop.streaming.handle_content_block_delta(event, state) + + assert updated_state["current_tool_use"]["input"] == "" + + +def test_handle_content_block_stop_skips_incomplete_tool_use_missing_id(caplog): + """A tool use block missing toolUseId is skipped with a warning.""" + import logging + + state = { + "content": [], + "current_tool_use": {"name": "output_slide", "input": '{"x": 1}'}, + "text": "", + "reasoningText": "", + "citationsContent": [], + } + + with caplog.at_level(logging.WARNING, logger="strands.event_loop.streaming"): + updated_state = strands.event_loop.streaming.handle_content_block_stop(state) + + assert updated_state["content"] == [] + assert updated_state["current_tool_use"] == {} + assert "Incomplete tool use block" in caplog.text + + +def test_handle_content_block_stop_skips_incomplete_tool_use_missing_name(caplog): + """A tool use block missing name is skipped with a warning.""" + import logging + + state = { + "content": [], + "current_tool_use": {"toolUseId": "abc123", "input": '{"x": 1}'}, + "text": "", + "reasoningText": "", + "citationsContent": [], + } + + with caplog.at_level(logging.WARNING, logger="strands.event_loop.streaming"): + updated_state = strands.event_loop.streaming.handle_content_block_stop(state) + + assert updated_state["content"] == [] + assert updated_state["current_tool_use"] == {} + assert "Incomplete tool use block" in caplog.text + + +@pytest.mark.asyncio +async def test_process_stream_tool_use_info_in_delta(agenerator, alist): + """Models that provide toolUseId and name in contentBlockDelta (not contentBlockStart) work correctly.""" + response = [ + {"messageStart": {"role": "assistant"}}, + {"contentBlockStart": {"start": {}}}, + { + "contentBlockDelta": { + "delta": {"toolUse": {"input": '{"title": "Test"}', "toolUseId": "xyz789", "name": "output_slide"}} + } + }, + {"contentBlockStop": {}}, + {"messageStop": {"stopReason": "tool_use"}}, + { + "metadata": { + "usage": {"inputTokens": 5, "outputTokens": 10, "totalTokens": 15}, + "metrics": {"latencyMs": 50}, + } + }, + ] + + stream = strands.event_loop.streaming.process_stream(agenerator(response)) + events = await alist(stream) + last_event = cast(ModelStopReason, events[-1]) + + stop_reason, message, _, _ = last_event["stop"] + assert stop_reason == "tool_use" + assert len(message["content"]) == 1 + tool_use = message["content"][0]["toolUse"] + assert tool_use["toolUseId"] == "xyz789" + assert tool_use["name"] == "output_slide" + assert tool_use["input"] == {"title": "Test"}