From e10e52205deb763d25104919ffef53954aadba1b Mon Sep 17 00:00:00 2001 From: STHITAPRAJNAS Date: Sat, 4 Apr 2026 13:23:07 +0000 Subject: [PATCH 1/2] fix(langchain): pass trace_name to propagate_attributes in on_chain_start MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When CallbackHandler.on_chain_start fires at the root of a chain (parent_run_id is None), propagate_attributes was called without a trace_name, so the trace name was determined by whichever internal node's on_chain_start happened to fire first. On LangGraph resume (e.g. after a human-in-the-loop interrupt) that node is often an internal subgraph whose name is "", which produces a blank trace name. The fix passes span_name — the name already computed from the serialized runnable and kwargs — as trace_name to propagate_attributes. This ensures the trace name is always pinned to the root chain's name regardless of execution order on resume. As a companion change, _parse_langfuse_trace_attributes now also reads a langfuse_trace_name key from LangChain metadata, consistent with the existing langfuse_session_id / langfuse_user_id / langfuse_tags pattern. When present, metadata langfuse_trace_name takes priority over the computed span_name. The key is also added to the strip-list in _strip_langfuse_keys_from_dict so it does not leak into observation metadata. Fixes #1602 --- langfuse/langchain/CallbackHandler.py | 7 + tests/test_langchain_callback_unit.py | 267 ++++++++++++++++++++++++++ 2 files changed, 274 insertions(+) create mode 100644 tests/test_langchain_callback_unit.py diff --git a/langfuse/langchain/CallbackHandler.py b/langfuse/langchain/CallbackHandler.py index 8d2c8db90..bfc3fdef8 100644 --- a/langfuse/langchain/CallbackHandler.py +++ b/langfuse/langchain/CallbackHandler.py @@ -287,6 +287,11 @@ def _parse_langfuse_trace_attributes( ): attributes["user_id"] = metadata["langfuse_user_id"] + if "langfuse_trace_name" in metadata and isinstance( + metadata["langfuse_trace_name"], str + ): + attributes["trace_name"] = metadata["langfuse_trace_name"] + if tags is not None or ( "langfuse_tags" in metadata and isinstance(metadata["langfuse_tags"], list) ): @@ -365,6 +370,7 @@ def on_chain_start( ) self._propagation_context_manager = propagate_attributes( + trace_name=parsed_trace_attributes.get("trace_name") or span_name, user_id=parsed_trace_attributes.get("user_id", None), session_id=parsed_trace_attributes.get("session_id", None), tags=parsed_trace_attributes.get("tags", None), @@ -1403,6 +1409,7 @@ def _strip_langfuse_keys_from_dict( "langfuse_session_id", "langfuse_user_id", "langfuse_tags", + "langfuse_trace_name", ] metadata_copy = metadata.copy() diff --git a/tests/test_langchain_callback_unit.py b/tests/test_langchain_callback_unit.py new file mode 100644 index 000000000..a7453cce4 --- /dev/null +++ b/tests/test_langchain_callback_unit.py @@ -0,0 +1,267 @@ +"""Unit tests for CallbackHandler trace-name propagation. + +These tests cover the fix for on_chain_start not passing trace_name to +propagate_attributes, which caused non-deterministic trace names on LangGraph +resume (e.g. after a human-in-the-loop interrupt). + +No real API calls are made — propagate_attributes and get_client are mocked. +""" + +import uuid +from contextlib import contextmanager +from typing import Any, Dict, Optional +from unittest.mock import MagicMock, call, patch + +import pytest + +from langfuse.langchain import CallbackHandler +from langfuse.langchain.CallbackHandler import ( + LangchainCallbackHandler, + _strip_langfuse_keys_from_dict, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_handler() -> CallbackHandler: + """Return a CallbackHandler with Langfuse SDK calls mocked out.""" + with patch("langfuse.langchain.CallbackHandler.get_client") as mock_get_client: + mock_client = MagicMock() + mock_client.start_observation.return_value = MagicMock(trace_id="trace-123") + mock_get_client.return_value = mock_client + handler = CallbackHandler() + # Keep a reference so tests can inspect it + handler._langfuse_client = MagicMock() + handler._langfuse_client.start_observation.return_value = MagicMock( + trace_id="trace-123" + ) + return handler + + +def _make_run_id() -> uuid.UUID: + return uuid.uuid4() + + +# --------------------------------------------------------------------------- +# Tests: _parse_langfuse_trace_attributes +# --------------------------------------------------------------------------- + + +class TestParseLangfuseTraceAttributes: + def _parse(self, handler, metadata=None, tags=None): + return handler._parse_langfuse_trace_attributes( + metadata=metadata, tags=tags + ) + + def test_extracts_trace_name_from_metadata(self): + handler = _make_handler() + result = self._parse( + handler, metadata={"langfuse_trace_name": "my-agent"} + ) + assert result["trace_name"] == "my-agent" + + def test_ignores_non_string_trace_name(self): + handler = _make_handler() + result = self._parse(handler, metadata={"langfuse_trace_name": 42}) + assert "trace_name" not in result + + def test_does_not_set_trace_name_when_absent(self): + handler = _make_handler() + result = self._parse(handler, metadata={"langfuse_session_id": "s1"}) + assert "trace_name" not in result + + def test_extracts_all_attributes_together(self): + handler = _make_handler() + result = self._parse( + handler, + metadata={ + "langfuse_trace_name": "agent", + "langfuse_session_id": "sess-1", + "langfuse_user_id": "user-1", + }, + ) + assert result["trace_name"] == "agent" + assert result["session_id"] == "sess-1" + assert result["user_id"] == "user-1" + + +# --------------------------------------------------------------------------- +# Tests: on_chain_start passes trace_name to propagate_attributes +# --------------------------------------------------------------------------- + + +class TestOnChainStartTraceNamePropagation: + """Verify that on_chain_start forwards trace_name to propagate_attributes.""" + + def _run_on_chain_start( + self, + handler: CallbackHandler, + serialized: Optional[Dict[str, Any]] = None, + name: Optional[str] = None, + parent_run_id: Optional[uuid.UUID] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> uuid.UUID: + run_id = _make_run_id() + kwargs: Dict[str, Any] = {} + if name is not None: + kwargs["name"] = name + handler.on_chain_start( + serialized=serialized or {}, + inputs={}, + run_id=run_id, + parent_run_id=parent_run_id, + metadata=metadata, + **kwargs, + ) + return run_id + + def test_trace_name_passed_to_propagate_attributes(self): + """span_name derived from serialized['name'] is forwarded as trace_name.""" + handler = _make_handler() + + @contextmanager + def _noop_ctx(*args, **kwargs): + yield + + with patch( + "langfuse.langchain.CallbackHandler.propagate_attributes" + ) as mock_pa: + mock_pa.return_value = MagicMock( + __enter__=MagicMock(return_value=None), + __exit__=MagicMock(return_value=False), + ) + self._run_on_chain_start( + handler, + serialized={"name": "my-agent"}, + parent_run_id=None, + ) + + mock_pa.assert_called_once() + _, kwargs = mock_pa.call_args + assert kwargs.get("trace_name") == "my-agent" + + def test_trace_name_uses_kwargs_name_over_serialized(self): + """The 'name' kwarg takes priority over serialized dict (LangChain convention).""" + handler = _make_handler() + + with patch( + "langfuse.langchain.CallbackHandler.propagate_attributes" + ) as mock_pa: + mock_pa.return_value = MagicMock( + __enter__=MagicMock(return_value=None), + __exit__=MagicMock(return_value=False), + ) + self._run_on_chain_start( + handler, + serialized={"name": "fallback-name"}, + name="explicit-name", + parent_run_id=None, + ) + + _, kwargs = mock_pa.call_args + assert kwargs.get("trace_name") == "explicit-name" + + def test_metadata_langfuse_trace_name_overrides_span_name(self): + """langfuse_trace_name in metadata takes priority over computed span_name.""" + handler = _make_handler() + + with patch( + "langfuse.langchain.CallbackHandler.propagate_attributes" + ) as mock_pa: + mock_pa.return_value = MagicMock( + __enter__=MagicMock(return_value=None), + __exit__=MagicMock(return_value=False), + ) + self._run_on_chain_start( + handler, + serialized={"name": "computed-name"}, + metadata={"langfuse_trace_name": "override-name"}, + parent_run_id=None, + ) + + _, kwargs = mock_pa.call_args + assert kwargs.get("trace_name") == "override-name" + + def test_propagate_attributes_not_called_for_child_runs(self): + """propagate_attributes must only be called at the root (parent_run_id=None).""" + handler = _make_handler() + root_run_id = _make_run_id() + handler._child_to_parent_run_id_map[root_run_id] = None + + with patch( + "langfuse.langchain.CallbackHandler.propagate_attributes" + ) as mock_pa: + mock_pa.return_value = MagicMock( + __enter__=MagicMock(return_value=None), + __exit__=MagicMock(return_value=False), + ) + # Child run — parent_run_id is set + self._run_on_chain_start( + handler, + serialized={"name": "child-node"}, + parent_run_id=root_run_id, + ) + + mock_pa.assert_not_called() + + def test_empty_span_name_still_propagated(self): + """Even when span_name resolves to '', it should still be forwarded.""" + handler = _make_handler() + + with patch( + "langfuse.langchain.CallbackHandler.propagate_attributes" + ) as mock_pa: + mock_pa.return_value = MagicMock( + __enter__=MagicMock(return_value=None), + __exit__=MagicMock(return_value=False), + ) + # Empty serialized — span_name will be '' + self._run_on_chain_start( + handler, + serialized=None, + parent_run_id=None, + ) + + _, kwargs = mock_pa.call_args + # '' is what get_langchain_run_name returns for None serialized + assert kwargs.get("trace_name") == "" + + +# --------------------------------------------------------------------------- +# Tests: _strip_langfuse_keys_from_dict strips langfuse_trace_name +# --------------------------------------------------------------------------- + + +class TestStripLangfuseKeys: + def test_strips_langfuse_trace_name(self): + metadata = { + "langfuse_trace_name": "my-agent", + "other_key": "value", + } + result = _strip_langfuse_keys_from_dict(metadata, keep_langfuse_trace_attributes=False) + assert "langfuse_trace_name" not in result + assert result["other_key"] == "value" + + def test_keeps_langfuse_trace_name_when_flag_set(self): + metadata = { + "langfuse_trace_name": "my-agent", + "other_key": "value", + } + result = _strip_langfuse_keys_from_dict(metadata, keep_langfuse_trace_attributes=True) + assert result["langfuse_trace_name"] == "my-agent" + + def test_strips_all_trace_attribute_keys_together(self): + metadata = { + "langfuse_trace_name": "n", + "langfuse_session_id": "s", + "langfuse_user_id": "u", + "langfuse_tags": ["t"], + "keep_me": 1, + } + result = _strip_langfuse_keys_from_dict(metadata, keep_langfuse_trace_attributes=False) + for key in ("langfuse_trace_name", "langfuse_session_id", "langfuse_user_id", "langfuse_tags"): + assert key not in result + assert result["keep_me"] == 1 From 7598aedd067ce3f816155f278a870510c3ca6fb6 Mon Sep 17 00:00:00 2001 From: STHITAPRAJNAS Date: Sat, 4 Apr 2026 20:08:48 +0000 Subject: [PATCH 2/2] fix(openai): tool_calls dropped when content chunk precedes tool deltas in stream get_response_for_chat() built its return value with a Python `or` chain: return completion["content"] or (completion["tool_calls"] and {...}) or None Models like Qwen and DeepSeek emit a non-empty content chunk (often "\n\n" or a brief reasoning prefix) before streaming the tool-call deltas. Because a non-empty string is truthy, the `or` chain short-circuited at the content branch and returned just the whitespace string, silently discarding all accumulated tool_call data. Fix: check tool_calls first. When tool_calls are present, return them as the primary output. If the content is non-whitespace (e.g. a genuine reasoning preamble) it is included alongside the tool_calls rather than dropped. The function_call (legacy OpenAI format) and plain content paths are unchanged. Fixes langfuse/langfuse#12490 --- langfuse/openai.py | 45 ++--- tests/test_openai_streaming_unit.py | 255 ++++++++++++++++++++++++++++ 2 files changed, 279 insertions(+), 21 deletions(-) create mode 100644 tests/test_openai_streaming_unit.py diff --git a/langfuse/openai.py b/langfuse/openai.py index 16d293e73..530a1857a 100644 --- a/langfuse/openai.py +++ b/langfuse/openai.py @@ -722,27 +722,30 @@ def _extract_streamed_openai_response(resource: Any, chunks: Any) -> Any: completion += choice.get("text", "") def get_response_for_chat() -> Any: - return ( - completion["content"] - or ( - completion["function_call"] - and { - "role": "assistant", - "function_call": completion["function_call"], - } - ) - or ( - completion["tool_calls"] - and { - "role": "assistant", - # "tool_calls": [{"function": completion["tool_calls"]}], - "tool_calls": [ - {"function": data} for data in completion["tool_calls"] - ], - } - ) - or None - ) + # tool_calls must be checked before content: models like Qwen/DeepSeek emit + # whitespace content chunks (e.g. "\n\n") before streaming tool call deltas, + # which would otherwise cause the content branch to short-circuit and silently + # drop the collected tool_calls. + if completion["tool_calls"]: + result: Any = { + "role": "assistant", + "tool_calls": [ + {"function": data} for data in completion["tool_calls"] + ], + } + # Preserve non-whitespace content that co-exists with tool calls + # (some reasoning models emit a brief text preamble before calling tools). + if completion["content"] and completion["content"].strip(): + result["content"] = completion["content"] + return result + + if completion["function_call"]: + return { + "role": "assistant", + "function_call": completion["function_call"], + } + + return completion["content"] or None return ( model, diff --git a/tests/test_openai_streaming_unit.py b/tests/test_openai_streaming_unit.py new file mode 100644 index 000000000..5f6c79b15 --- /dev/null +++ b/tests/test_openai_streaming_unit.py @@ -0,0 +1,255 @@ +"""Unit tests for _extract_streamed_openai_response / get_response_for_chat. + +Covers the bug where a non-empty content chunk (e.g. "\n\n") emitted before +tool-call deltas caused get_response_for_chat() to short-circuit and silently +drop all collected tool_calls from the logged generation output. + +No real OpenAI API calls — chunks are built from SimpleNamespace objects that +mirror the __dict__ structure of openai-python v1 Pydantic models. +""" + +import types +from dataclasses import dataclass +from typing import Any, List, Optional +from unittest.mock import patch + +import pytest + +from langfuse.openai import OpenAiDefinition, _extract_streamed_openai_response + + +# --------------------------------------------------------------------------- +# Helpers: fake OpenAI v1 streaming chunk objects +# --------------------------------------------------------------------------- + +def _make_tool_call_delta( + name: Optional[str] = None, + arguments: str = "", + index: int = 0, + tool_id: Optional[str] = None, + call_type: Optional[str] = None, +) -> Any: + """Build a ChoiceDeltaToolCall-alike SimpleNamespace.""" + function = types.SimpleNamespace(name=name, arguments=arguments) + return types.SimpleNamespace( + index=index, + id=tool_id, + type=call_type, + function=function, + ) + + +def _make_chunk( + content: Optional[str] = None, + tool_calls: Optional[List[Any]] = None, + function_call: Any = None, + role: Optional[str] = None, + finish_reason: Optional[str] = None, + model: str = "gpt-4o", +) -> Any: + """Build a streaming chunk SimpleNamespace (mirrors chunk.__dict__ in v1).""" + delta = types.SimpleNamespace( + role=role, + content=content, + tool_calls=tool_calls, + function_call=function_call, + ) + choice = types.SimpleNamespace(delta=delta, finish_reason=finish_reason) + return types.SimpleNamespace(model=model, choices=[choice], usage=None) + + +def _chat_resource() -> OpenAiDefinition: + return OpenAiDefinition( + module="openai", + object="ChatCompletion", + method="create", + type="chat", + sync=True, + ) + + +def _run(chunks: List[Any]) -> Any: + """Run _extract_streamed_openai_response with is_openai_v1 patched to True.""" + with patch("langfuse.openai._is_openai_v1", return_value=True): + _, response, _, _ = _extract_streamed_openai_response(_chat_resource(), iter(chunks)) + return response + + +# --------------------------------------------------------------------------- +# Bug reproduction: content chunk before tool_calls +# --------------------------------------------------------------------------- + + +class TestToolCallsWithPrecedingContentChunk: + """ + Models like Qwen/DeepSeek sometimes emit a whitespace content chunk + (e.g. "\n\n") before beginning to stream tool-call deltas. Previously + get_response_for_chat() evaluated `completion["content"] or ...` and + returned the content string immediately, dropping the tool_calls entirely. + """ + + def test_tool_calls_not_dropped_when_whitespace_content_precedes_them(self): + chunks = [ + _make_chunk(role="assistant"), + _make_chunk(content="\n\n"), # spurious whitespace before tool call + _make_chunk( + tool_calls=[_make_tool_call_delta(name="get_weather", arguments="")], + ), + _make_chunk( + tool_calls=[_make_tool_call_delta(name=None, arguments='{"city": "Paris"}')], + ), + _make_chunk(finish_reason="tool_calls"), + ] + result = _run(chunks) + + assert isinstance(result, dict), "Expected a dict, not a plain string" + assert "tool_calls" in result, "tool_calls must not be dropped" + assert result["tool_calls"][0]["function"]["name"] == "get_weather" + assert result["tool_calls"][0]["function"]["arguments"] == '{"city": "Paris"}' + + def test_whitespace_only_content_not_included_in_result(self): + """A leading "\n\n" is whitespace-only and should be omitted from output.""" + chunks = [ + _make_chunk(role="assistant"), + _make_chunk(content="\n\n"), + _make_chunk( + tool_calls=[_make_tool_call_delta(name="search", arguments='{"q":"hi"}')], + ), + _make_chunk(finish_reason="tool_calls"), + ] + result = _run(chunks) + + assert "content" not in result or result.get("content") is None + + def test_meaningful_content_preserved_alongside_tool_calls(self): + """When content has real text (not just whitespace), it should be kept.""" + chunks = [ + _make_chunk(role="assistant"), + _make_chunk(content="Sure, let me check that. "), + _make_chunk( + tool_calls=[_make_tool_call_delta(name="lookup", arguments='{"id":1}')], + ), + _make_chunk(finish_reason="tool_calls"), + ] + result = _run(chunks) + + assert "tool_calls" in result + assert result.get("content") == "Sure, let me check that. " + + def test_non_whitespace_content_before_tool_calls_preserves_both(self): + chunks = [ + _make_chunk(role="assistant"), + _make_chunk(content="I'll call"), + _make_chunk(content=" the tool."), + _make_chunk( + tool_calls=[_make_tool_call_delta(name="do_thing", arguments="{}")], + ), + _make_chunk(finish_reason="tool_calls"), + ] + result = _run(chunks) + + assert result["tool_calls"][0]["function"]["name"] == "do_thing" + assert result.get("content") == "I'll call the tool." + + +# --------------------------------------------------------------------------- +# Baseline: pure content response (no tools) +# --------------------------------------------------------------------------- + + +class TestPureContentResponse: + def test_plain_text_response_returned_as_string(self): + chunks = [ + _make_chunk(role="assistant"), + _make_chunk(content="Hello, "), + _make_chunk(content="world!"), + _make_chunk(finish_reason="stop"), + ] + result = _run(chunks) + assert result == "Hello, world!" + + def test_empty_stream_returns_none(self): + result = _run([]) + assert result is None + + +# --------------------------------------------------------------------------- +# Pure tool-call response (no content at all) +# --------------------------------------------------------------------------- + + +class TestPureToolCallResponse: + def test_tool_calls_returned_without_content(self): + chunks = [ + _make_chunk(role="assistant"), + _make_chunk( + tool_calls=[_make_tool_call_delta(name="get_price", arguments='{"sku":"A1"}')], + ), + _make_chunk(finish_reason="tool_calls"), + ] + result = _run(chunks) + + assert isinstance(result, dict) + assert "tool_calls" in result + assert result["tool_calls"][0]["function"]["name"] == "get_price" + assert "content" not in result + + def test_multiple_tool_calls_all_returned(self): + chunks = [ + _make_chunk(role="assistant"), + _make_chunk( + tool_calls=[_make_tool_call_delta(name="tool_a", arguments='{"x":1}')], + ), + # second tool call — name triggers a new entry in the accumulator + _make_chunk( + tool_calls=[_make_tool_call_delta(name="tool_b", arguments='{"y":2}')], + ), + _make_chunk(finish_reason="tool_calls"), + ] + result = _run(chunks) + + assert len(result["tool_calls"]) == 2 + names = {tc["function"]["name"] for tc in result["tool_calls"]} + assert names == {"tool_a", "tool_b"} + + +# --------------------------------------------------------------------------- +# Legacy function_call (OpenAI v0 format) +# --------------------------------------------------------------------------- + + +class TestFunctionCallResponse: + def test_function_call_returned_when_no_tool_calls(self): + # function_call uses a different delta key path; simulate with direct + # injection via a SimpleNamespace that has function_call set + chunks = [ + _make_chunk(role="assistant"), + ] + # Patch the completion dict after the fact is tricky; instead, build + # a chunk that triggers the function_call accumulation path. + fn_chunk = types.SimpleNamespace( + model="gpt-3.5-turbo", + choices=[ + types.SimpleNamespace( + delta=types.SimpleNamespace( + role=None, + content=None, + tool_calls=None, + function_call=types.SimpleNamespace( + name="old_fn", + arguments='{"a":1}', + ), + ), + finish_reason=None, + ) + ], + usage=None, + ) + with patch("langfuse.openai._is_openai_v1", return_value=True): + _, result, _, _ = _extract_streamed_openai_response( + _chat_resource(), iter([fn_chunk]) + ) + + assert isinstance(result, dict) + assert "function_call" in result + assert result["function_call"]["name"] == "old_fn"