From 1ddde26cae64e4f8d0c9173e8c26ad4736d0e3f9 Mon Sep 17 00:00:00 2001 From: Aditya Mehra Date: Tue, 10 Feb 2026 16:18:53 -0800 Subject: [PATCH] fix(crewai): share singleton handler for OpenAI-v2 context inheritance Switch CrewAI instrumentation to use the shared telemetry handler singleton and add regression coverage that verifies OpenAI-v2 chat spans and metrics inherit CrewAI agent context. Co-authored-by: Cursor --- .../instrumentation/crewai/instrumentation.py | 7 +- .../tests/conftest.py | 5 + .../tests/test_openai_v2_inheritance.py | 154 ++++++++++++++++++ .../tests/test_singleton.py | 22 +-- 4 files changed, 175 insertions(+), 13 deletions(-) create mode 100644 instrumentation-genai/opentelemetry-instrumentation-crewai/tests/test_openai_v2_inheritance.py diff --git a/instrumentation-genai/opentelemetry-instrumentation-crewai/src/opentelemetry/instrumentation/crewai/instrumentation.py b/instrumentation-genai/opentelemetry-instrumentation-crewai/src/opentelemetry/instrumentation/crewai/instrumentation.py index e44bc15d..934bf037 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-crewai/src/opentelemetry/instrumentation/crewai/instrumentation.py +++ b/instrumentation-genai/opentelemetry-instrumentation-crewai/src/opentelemetry/instrumentation/crewai/instrumentation.py @@ -11,7 +11,10 @@ from wrapt import wrap_function_wrapper from opentelemetry.instrumentation.utils import unwrap from opentelemetry.instrumentation.instrumentor import BaseInstrumentor -from opentelemetry.util.genai.handler import TelemetryHandler +from opentelemetry.util.genai.handler import ( + TelemetryHandler, + get_telemetry_handler, +) from opentelemetry.util.genai.types import ( Workflow, AgentInvocation, @@ -108,7 +111,7 @@ def _instrument(self, **kwargs): meter_provider = metrics.get_meter_provider() - _handler = TelemetryHandler( + _handler = get_telemetry_handler( tracer_provider=tracer_provider, meter_provider=meter_provider ) diff --git a/instrumentation-genai/opentelemetry-instrumentation-crewai/tests/conftest.py b/instrumentation-genai/opentelemetry-instrumentation-crewai/tests/conftest.py index f1b014a6..2b6f4aa2 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-crewai/tests/conftest.py +++ b/instrumentation-genai/opentelemetry-instrumentation-crewai/tests/conftest.py @@ -44,17 +44,22 @@ def meter_provider(metric_reader): def reset_global_handler(): """Reset the global handler before and after each test.""" import opentelemetry.instrumentation.crewai.instrumentation as crewai_module + from opentelemetry.util.genai.handler import get_telemetry_handler # Store original value original_handler = crewai_module._handler # Reset before test crewai_module._handler = None + if hasattr(get_telemetry_handler, "_default_handler"): + delattr(get_telemetry_handler, "_default_handler") yield # Reset after test crewai_module._handler = original_handler + if hasattr(get_telemetry_handler, "_default_handler"): + delattr(get_telemetry_handler, "_default_handler") @pytest.fixture(autouse=True) diff --git a/instrumentation-genai/opentelemetry-instrumentation-crewai/tests/test_openai_v2_inheritance.py b/instrumentation-genai/opentelemetry-instrumentation-crewai/tests/test_openai_v2_inheritance.py new file mode 100644 index 00000000..8f83e7ba --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-crewai/tests/test_openai_v2_inheritance.py @@ -0,0 +1,154 @@ +"""CrewAI + OpenAI-v2 context inheritance tests. + +These tests validate that when CrewAI and OpenAI-v2 share the singleton +TelemetryHandler, downstream chat spans/metrics emitted via the OpenAI-v2-style +LLM lifecycle inherit agent identity from the parent CrewAI agent span. +""" + +import os +from unittest import mock + +from opentelemetry.semconv._incubating.attributes import ( + gen_ai_attributes as GenAIAttributes, +) +from opentelemetry.semconv._incubating.metrics import gen_ai_metrics +from opentelemetry.util.genai.handler import get_telemetry_handler +from opentelemetry.util.genai.types import InputMessage, LLMInvocation, OutputMessage, Text + +import opentelemetry.instrumentation.crewai.instrumentation as crewai_module + + +def test_openai_v2_chat_inherits_agent_context_to_spans_and_metrics( + tracer_provider, meter_provider, metric_reader, span_exporter +): + # Ensure fresh singleton with test providers. + if hasattr(get_telemetry_handler, "_default_handler"): + delattr(get_telemetry_handler, "_default_handler") + + handler = get_telemetry_handler( + tracer_provider=tracer_provider, + meter_provider=meter_provider, + ) + crewai_module._handler = handler + + mock_agent = mock.MagicMock() + mock_agent.role = "Crew Router Agent" + mock_task = mock.MagicMock() + mock_task.description = "Route support query" + mock_task.expected_output = "Routing decision" + + # Simulates the core OpenAI-v2 instrumentation flow: + # build LLMInvocation -> start_llm -> set outputs/tokens -> stop_llm + def _agent_exec_side_effect(*_args, **_kwargs): + llm = LLMInvocation( + request_model="gpt-4o-mini", + input_messages=[ + InputMessage(role="user", parts=[Text(content="hello")]) + ], + provider="openai", + framework="openai-sdk", + system="openai", + ) + handler.start_llm(llm) + llm.output_messages = [ + OutputMessage( + role="assistant", + parts=[Text(content="ok")], + finish_reason="stop", + ) + ] + llm.response_model_name = "gpt-4o-mini-2024-07-18" + llm.input_tokens = 12 + llm.output_tokens = 5 + handler.stop_llm(llm) + return "agent-complete" + + wrapped = mock.MagicMock(side_effect=_agent_exec_side_effect) + result = crewai_module._wrap_agent_execute_task( + wrapped, mock_agent, (), {"task": mock_task} + ) + assert result == "agent-complete" + + # Validate chat span inherited agent identity. + spans = span_exporter.get_finished_spans() + chat_span = next((span for span in spans if span.name == "chat gpt-4o-mini"), None) + assert chat_span is not None + assert ( + chat_span.attributes.get(GenAIAttributes.GEN_AI_AGENT_NAME) + == "Crew Router Agent" + ) + inherited_agent_id = chat_span.attributes.get(GenAIAttributes.GEN_AI_AGENT_ID) + assert isinstance(inherited_agent_id, str) and inherited_agent_id + + # Validate chat metrics inherited agent identity. + try: + meter_provider.force_flush() + except Exception: + pass + metric_reader.collect() + + resource_metrics = metric_reader.get_metrics_data().resource_metrics + assert resource_metrics + metrics = resource_metrics[0].scope_metrics[0].metrics + + duration_metric = next( + ( + metric + for metric in metrics + if metric.name == gen_ai_metrics.GEN_AI_CLIENT_OPERATION_DURATION + ), + None, + ) + token_metric = next( + ( + metric + for metric in metrics + if metric.name == gen_ai_metrics.GEN_AI_CLIENT_TOKEN_USAGE + ), + None, + ) + + assert duration_metric is not None + assert token_metric is not None + + found_duration_with_agent = False + for point in duration_metric.data.data_points: + attrs = point.attributes + if ( + attrs.get(GenAIAttributes.GEN_AI_OPERATION_NAME) + == GenAIAttributes.GenAiOperationNameValues.CHAT.value + and attrs.get(GenAIAttributes.GEN_AI_AGENT_NAME) + == "Crew Router Agent" + and attrs.get(GenAIAttributes.GEN_AI_AGENT_ID) == inherited_agent_id + ): + found_duration_with_agent = True + break + assert found_duration_with_agent + + found_input_with_agent = False + found_output_with_agent = False + for point in token_metric.data.data_points: + attrs = point.attributes + if ( + attrs.get(GenAIAttributes.GEN_AI_OPERATION_NAME) + != GenAIAttributes.GenAiOperationNameValues.CHAT.value + ): + continue + if ( + attrs.get(GenAIAttributes.GEN_AI_AGENT_NAME) + != "Crew Router Agent" + or attrs.get(GenAIAttributes.GEN_AI_AGENT_ID) != inherited_agent_id + ): + continue + token_type = attrs.get(GenAIAttributes.GEN_AI_TOKEN_TYPE) + if token_type == GenAIAttributes.GenAiTokenTypeValues.INPUT.value: + found_input_with_agent = True + if token_type == GenAIAttributes.GenAiTokenTypeValues.COMPLETION.value: + found_output_with_agent = True + + assert found_input_with_agent + assert found_output_with_agent + + # Keep environment clean for neighboring tests that may inspect globals. + crewai_module._handler = None + os.environ.pop("OTEL_INSTRUMENTATION_GENAI_EMITTERS", None) diff --git a/instrumentation-genai/opentelemetry-instrumentation-crewai/tests/test_singleton.py b/instrumentation-genai/opentelemetry-instrumentation-crewai/tests/test_singleton.py index 3cd38c00..67cb44ad 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-crewai/tests/test_singleton.py +++ b/instrumentation-genai/opentelemetry-instrumentation-crewai/tests/test_singleton.py @@ -64,14 +64,14 @@ def test_handler_receives_tracer_provider(self, tracer_provider, meter_provider) "opentelemetry.instrumentation.crewai.instrumentation.wrap_function_wrapper" ): with mock.patch( - "opentelemetry.instrumentation.crewai.instrumentation.TelemetryHandler" - ) as MockHandler: + "opentelemetry.instrumentation.crewai.instrumentation.get_telemetry_handler" + ) as mock_get_handler: instrumentor.instrument( tracer_provider=tracer_provider, meter_provider=meter_provider, ) - MockHandler.assert_called_once_with( + mock_get_handler.assert_called_once_with( tracer_provider=tracer_provider, meter_provider=meter_provider, ) @@ -96,12 +96,12 @@ def test_handler_uses_default_tracer_provider_when_not_provided( mock_get_tracer.return_value = mock_tracer_provider with mock.patch( - "opentelemetry.instrumentation.crewai.instrumentation.TelemetryHandler" - ) as MockHandler: + "opentelemetry.instrumentation.crewai.instrumentation.get_telemetry_handler" + ) as mock_get_handler: instrumentor.instrument(meter_provider=meter_provider) - MockHandler.assert_called_once() - call_kwargs = MockHandler.call_args[1] + mock_get_handler.assert_called_once() + call_kwargs = mock_get_handler.call_args[1] assert call_kwargs["tracer_provider"] == mock_tracer_provider # Cleanup @@ -124,12 +124,12 @@ def test_handler_uses_default_meter_provider_when_not_provided( mock_get_meter.return_value = mock_meter_provider with mock.patch( - "opentelemetry.instrumentation.crewai.instrumentation.TelemetryHandler" - ) as MockHandler: + "opentelemetry.instrumentation.crewai.instrumentation.get_telemetry_handler" + ) as mock_get_handler: instrumentor.instrument(tracer_provider=tracer_provider) - MockHandler.assert_called_once() - call_kwargs = MockHandler.call_args[1] + mock_get_handler.assert_called_once() + call_kwargs = mock_get_handler.call_args[1] assert call_kwargs["meter_provider"] == mock_meter_provider # Cleanup