Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
from wrapt import wrap_function_wrapper
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.util.genai.handler import TelemetryHandler
from opentelemetry.util.genai.handler import (
TelemetryHandler,
get_telemetry_handler,
)
from opentelemetry.util.genai.types import (
Workflow,
AgentInvocation,
Expand Down Expand Up @@ -108,7 +111,7 @@ def _instrument(self, **kwargs):

meter_provider = metrics.get_meter_provider()

_handler = TelemetryHandler(
_handler = get_telemetry_handler(
tracer_provider=tracer_provider, meter_provider=meter_provider
)
Comment on lines +114 to 116

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

1. Singleton ignores providers 🐞 Bug ✓ Correctness

CrewAI now uses the shared get_telemetry_handler singleton; once any instrumentation initializes the
singleton, later calls with different tracer/meter providers are silently ignored, making telemetry
configuration order-dependent across instrumentations (e.g., OpenAI-v2 vs CrewAI). This can lead to
spans/metrics being emitted through unexpected providers/exporters with no warning.
Agent Prompt
### Issue description
`get_telemetry_handler()` caches a singleton `TelemetryHandler` and ignores later `tracer_provider`/`meter_provider`/`logger_provider` arguments once `_default_handler` is set. After this PR, CrewAI also uses that singleton, so the first instrumentor to run effectively “wins” provider configuration for all other instrumentations that share the singleton.

### Issue Context
- CrewAI now does `_handler = get_telemetry_handler(tracer_provider=..., meter_provider=...)`.
- OpenAI-v2 also does `handler = get_telemetry_handler(tracer_provider=..., meter_provider=..., logger_provider=...)`.
- `get_telemetry_handler()` only constructs the handler on first call and returns the cached instance thereafter.

### Fix Focus Areas
- util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py[1115-1133]
- instrumentation-genai/opentelemetry-instrumentation-crewai/src/opentelemetry/instrumentation/crewai/instrumentation.py[97-116]
- instrumentation-genai/opentelemetry-instrumentation-openai-v2/src/opentelemetry/instrumentation/openai_v2/__init__.py[68-79]

### Suggested approach
- Add a supported API path for deterministic initialization (e.g., `initialize_telemetry_handler(...)` / `set_default_telemetry_handler(handler)`), and have instrumentors use it.
- Additionally (or if you don’t want a new API), add mismatch detection in `get_telemetry_handler()`:
  - If a cached handler exists and any provider argument is non-None, emit a WARNING that the singleton is already configured and the passed providers are being ignored.
  - Document the “first-call wins” semantics clearly if that remains the intended behavior.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,22 @@ def meter_provider(metric_reader):
def reset_global_handler():
"""Reset the global handler before and after each test."""
import opentelemetry.instrumentation.crewai.instrumentation as crewai_module
from opentelemetry.util.genai.handler import get_telemetry_handler

# Store original value
original_handler = crewai_module._handler

# Reset before test
crewai_module._handler = None
if hasattr(get_telemetry_handler, "_default_handler"):
delattr(get_telemetry_handler, "_default_handler")

yield

# Reset after test
crewai_module._handler = original_handler
if hasattr(get_telemetry_handler, "_default_handler"):
delattr(get_telemetry_handler, "_default_handler")


@pytest.fixture(autouse=True)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
"""CrewAI + OpenAI-v2 context inheritance tests.

These tests validate that when CrewAI and OpenAI-v2 share the singleton
TelemetryHandler, downstream chat spans/metrics emitted via the OpenAI-v2-style
LLM lifecycle inherit agent identity from the parent CrewAI agent span.
"""

import os
from unittest import mock

from opentelemetry.semconv._incubating.attributes import (
gen_ai_attributes as GenAIAttributes,
)
from opentelemetry.semconv._incubating.metrics import gen_ai_metrics
from opentelemetry.util.genai.handler import get_telemetry_handler
from opentelemetry.util.genai.types import InputMessage, LLMInvocation, OutputMessage, Text

import opentelemetry.instrumentation.crewai.instrumentation as crewai_module


def test_openai_v2_chat_inherits_agent_context_to_spans_and_metrics(
tracer_provider, meter_provider, metric_reader, span_exporter
):
# Ensure fresh singleton with test providers.
if hasattr(get_telemetry_handler, "_default_handler"):
delattr(get_telemetry_handler, "_default_handler")

handler = get_telemetry_handler(
tracer_provider=tracer_provider,
meter_provider=meter_provider,
)
crewai_module._handler = handler

mock_agent = mock.MagicMock()
mock_agent.role = "Crew Router Agent"
mock_task = mock.MagicMock()
mock_task.description = "Route support query"
mock_task.expected_output = "Routing decision"

# Simulates the core OpenAI-v2 instrumentation flow:
# build LLMInvocation -> start_llm -> set outputs/tokens -> stop_llm
def _agent_exec_side_effect(*_args, **_kwargs):
llm = LLMInvocation(
request_model="gpt-4o-mini",
input_messages=[
InputMessage(role="user", parts=[Text(content="hello")])
],
provider="openai",
framework="openai-sdk",
system="openai",
)
handler.start_llm(llm)
llm.output_messages = [
OutputMessage(
role="assistant",
parts=[Text(content="ok")],
finish_reason="stop",
)
]
llm.response_model_name = "gpt-4o-mini-2024-07-18"
llm.input_tokens = 12
llm.output_tokens = 5
handler.stop_llm(llm)
return "agent-complete"

wrapped = mock.MagicMock(side_effect=_agent_exec_side_effect)
result = crewai_module._wrap_agent_execute_task(
wrapped, mock_agent, (), {"task": mock_task}
)
assert result == "agent-complete"

# Validate chat span inherited agent identity.
spans = span_exporter.get_finished_spans()
chat_span = next((span for span in spans if span.name == "chat gpt-4o-mini"), None)
assert chat_span is not None
assert (
chat_span.attributes.get(GenAIAttributes.GEN_AI_AGENT_NAME)
== "Crew Router Agent"
)
inherited_agent_id = chat_span.attributes.get(GenAIAttributes.GEN_AI_AGENT_ID)
assert isinstance(inherited_agent_id, str) and inherited_agent_id

# Validate chat metrics inherited agent identity.
try:
meter_provider.force_flush()
except Exception:
pass
metric_reader.collect()

resource_metrics = metric_reader.get_metrics_data().resource_metrics
assert resource_metrics
metrics = resource_metrics[0].scope_metrics[0].metrics

duration_metric = next(
(
metric
for metric in metrics
if metric.name == gen_ai_metrics.GEN_AI_CLIENT_OPERATION_DURATION
),
None,
)
token_metric = next(
(
metric
for metric in metrics
if metric.name == gen_ai_metrics.GEN_AI_CLIENT_TOKEN_USAGE
),
None,
)

assert duration_metric is not None
assert token_metric is not None

found_duration_with_agent = False
for point in duration_metric.data.data_points:
attrs = point.attributes
if (
attrs.get(GenAIAttributes.GEN_AI_OPERATION_NAME)
== GenAIAttributes.GenAiOperationNameValues.CHAT.value
and attrs.get(GenAIAttributes.GEN_AI_AGENT_NAME)
== "Crew Router Agent"
and attrs.get(GenAIAttributes.GEN_AI_AGENT_ID) == inherited_agent_id
):
found_duration_with_agent = True
break
assert found_duration_with_agent

found_input_with_agent = False
found_output_with_agent = False
for point in token_metric.data.data_points:
attrs = point.attributes
if (
attrs.get(GenAIAttributes.GEN_AI_OPERATION_NAME)
!= GenAIAttributes.GenAiOperationNameValues.CHAT.value
):
continue
if (
attrs.get(GenAIAttributes.GEN_AI_AGENT_NAME)
!= "Crew Router Agent"
or attrs.get(GenAIAttributes.GEN_AI_AGENT_ID) != inherited_agent_id
):
continue
token_type = attrs.get(GenAIAttributes.GEN_AI_TOKEN_TYPE)
if token_type == GenAIAttributes.GenAiTokenTypeValues.INPUT.value:
found_input_with_agent = True
if token_type == GenAIAttributes.GenAiTokenTypeValues.COMPLETION.value:
found_output_with_agent = True

assert found_input_with_agent
assert found_output_with_agent

# Keep environment clean for neighboring tests that may inspect globals.
crewai_module._handler = None
os.environ.pop("OTEL_INSTRUMENTATION_GENAI_EMITTERS", None)
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ def test_handler_receives_tracer_provider(self, tracer_provider, meter_provider)
"opentelemetry.instrumentation.crewai.instrumentation.wrap_function_wrapper"
):
with mock.patch(
"opentelemetry.instrumentation.crewai.instrumentation.TelemetryHandler"
) as MockHandler:
"opentelemetry.instrumentation.crewai.instrumentation.get_telemetry_handler"
) as mock_get_handler:
instrumentor.instrument(
tracer_provider=tracer_provider,
meter_provider=meter_provider,
)

MockHandler.assert_called_once_with(
mock_get_handler.assert_called_once_with(
tracer_provider=tracer_provider,
meter_provider=meter_provider,
)
Expand All @@ -96,12 +96,12 @@ def test_handler_uses_default_tracer_provider_when_not_provided(
mock_get_tracer.return_value = mock_tracer_provider

with mock.patch(
"opentelemetry.instrumentation.crewai.instrumentation.TelemetryHandler"
) as MockHandler:
"opentelemetry.instrumentation.crewai.instrumentation.get_telemetry_handler"
) as mock_get_handler:
instrumentor.instrument(meter_provider=meter_provider)

MockHandler.assert_called_once()
call_kwargs = MockHandler.call_args[1]
mock_get_handler.assert_called_once()
call_kwargs = mock_get_handler.call_args[1]
assert call_kwargs["tracer_provider"] == mock_tracer_provider

# Cleanup
Expand All @@ -124,12 +124,12 @@ def test_handler_uses_default_meter_provider_when_not_provided(
mock_get_meter.return_value = mock_meter_provider

with mock.patch(
"opentelemetry.instrumentation.crewai.instrumentation.TelemetryHandler"
) as MockHandler:
"opentelemetry.instrumentation.crewai.instrumentation.get_telemetry_handler"
) as mock_get_handler:
instrumentor.instrument(tracer_provider=tracer_provider)

MockHandler.assert_called_once()
call_kwargs = MockHandler.call_args[1]
mock_get_handler.assert_called_once()
call_kwargs = mock_get_handler.call_args[1]
assert call_kwargs["meter_provider"] == mock_meter_provider

# Cleanup
Expand Down