Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
gen_ai_attributes as GenAI,
)

from ..attributes import GEN_AI_WORKFLOW_NAME
from ..instruments import Instruments
from ..interfaces import EmitterMeta
from ..types import (
Expand Down Expand Up @@ -101,6 +102,13 @@ def on_end(self, obj: Any) -> None:
)
if llm_invocation.agent_id:
metric_attrs[GenAI.GEN_AI_AGENT_ID] = llm_invocation.agent_id
workflow_name = (
llm_invocation.attributes.get(GEN_AI_WORKFLOW_NAME)
if llm_invocation.attributes
else None
)
if workflow_name:
metric_attrs[GEN_AI_WORKFLOW_NAME] = workflow_name

_record_token_metrics(
self._token_histogram,
Expand Down Expand Up @@ -132,6 +140,13 @@ def on_end(self, obj: Any) -> None:
)
if tool_invocation.agent_id:
metric_attrs[GenAI.GEN_AI_AGENT_ID] = tool_invocation.agent_id
workflow_name = (
tool_invocation.attributes.get(GEN_AI_WORKFLOW_NAME)
if tool_invocation.attributes
else None
)
if workflow_name:
metric_attrs[GEN_AI_WORKFLOW_NAME] = workflow_name

_record_duration(
self._duration_histogram,
Expand Down Expand Up @@ -163,6 +178,13 @@ def on_end(self, obj: Any) -> None:
metric_attrs[GenAI.GEN_AI_AGENT_ID] = (
embedding_invocation.agent_id
)
workflow_name = (
embedding_invocation.attributes.get(GEN_AI_WORKFLOW_NAME)
if embedding_invocation.attributes
else None
)
if workflow_name:
metric_attrs[GEN_AI_WORKFLOW_NAME] = workflow_name

_record_duration(
self._duration_histogram,
Expand Down Expand Up @@ -203,6 +225,13 @@ def on_error(self, error: Error, obj: Any) -> None:
)
if llm_invocation.agent_id:
metric_attrs[GenAI.GEN_AI_AGENT_ID] = llm_invocation.agent_id
workflow_name = (
llm_invocation.attributes.get(GEN_AI_WORKFLOW_NAME)
if llm_invocation.attributes
else None
)
if workflow_name:
metric_attrs[GEN_AI_WORKFLOW_NAME] = workflow_name
if getattr(error, "type", None) is not None:
metric_attrs[ErrorAttributes.ERROR_TYPE] = (
error.type.__qualname__
Expand All @@ -228,6 +257,13 @@ def on_error(self, error: Error, obj: Any) -> None:
)
if tool_invocation.agent_id:
metric_attrs[GenAI.GEN_AI_AGENT_ID] = tool_invocation.agent_id
workflow_name = (
tool_invocation.attributes.get(GEN_AI_WORKFLOW_NAME)
if tool_invocation.attributes
else None
)
if workflow_name:
metric_attrs[GEN_AI_WORKFLOW_NAME] = workflow_name
if getattr(error, "type", None) is not None:
metric_attrs[ErrorAttributes.ERROR_TYPE] = (
error.type.__qualname__
Expand Down Expand Up @@ -260,6 +296,13 @@ def on_error(self, error: Error, obj: Any) -> None:
metric_attrs[GenAI.GEN_AI_AGENT_ID] = (
embedding_invocation.agent_id
)
workflow_name = (
embedding_invocation.attributes.get(GEN_AI_WORKFLOW_NAME)
if embedding_invocation.attributes
else None
)
if workflow_name:
metric_attrs[GEN_AI_WORKFLOW_NAME] = workflow_name
if getattr(error, "type", None) is not None:
metric_attrs[ErrorAttributes.ERROR_TYPE] = (
error.type.__qualname__
Expand Down Expand Up @@ -362,6 +405,13 @@ def _record_retrieval_metrics(
metric_attrs[GenAI.GEN_AI_AGENT_NAME] = retrieval.agent_name
if retrieval.agent_id:
metric_attrs[GenAI.GEN_AI_AGENT_ID] = retrieval.agent_id
workflow_name = (
retrieval.attributes.get(GEN_AI_WORKFLOW_NAME)
if retrieval.attributes
else None
)
if workflow_name:
metric_attrs[GEN_AI_WORKFLOW_NAME] = workflow_name
# Add error type if present
if error is not None and getattr(error, "type", None) is not None:
metric_attrs[ErrorAttributes.ERROR_TYPE] = error.type.__qualname__
Expand Down
110 changes: 110 additions & 0 deletions util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ def genai_debug_log(*_args: Any, **_kwargs: Any) -> None: # type: ignore
from opentelemetry.util.genai.emitters.configuration import (
build_emitter_pipeline,
)
from opentelemetry.util.genai.attributes import (
GEN_AI_AGENT_ID,
GEN_AI_AGENT_NAME,
GEN_AI_WORKFLOW_NAME,
)
from opentelemetry.util.genai.span_context import (
extract_span_context,
span_context_hex_ids,
Expand Down Expand Up @@ -254,6 +259,8 @@ def _get_eval_histogram(canonical_name: str):
self._evaluation_manager = None
# Active agent identity stack (name, id) for implicit propagation to nested operations
self._agent_context_stack: list[tuple[str, str]] = []
# Active workflow name stack for implicit propagation to nested operations
self._workflow_context_stack: list[str] = []
Comment on lines +262 to +263

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

1. Workflow stack not context-safe 🐞 Bug ⛯ Reliability

TelemetryHandler stores workflow identity in a mutable instance stack that is shared by all callers
of the singleton handler, which can mis-attribute workflow name in concurrent/overlapping workflows
(threads/async tasks) and pollute downstream span/metric attributes.
Agent Prompt
### Issue description
`TelemetryHandler` tracks workflow identity in a mutable list on the handler instance. Because `get_telemetry_handler()` returns a singleton, that stack becomes shared mutable state across concurrent requests/tasks, which can cause `gen_ai.workflow.name` to be applied to the wrong nested spans/metrics.

### Issue Context
The PR already adds `_inherit_parent_context_attributes()` which can derive workflow/agent identity from the current active span. Prefer context-local propagation over a shared stack.

### Fix Focus Areas
- util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py[260-264]
- util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py[722-732]
- util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py[851-905]
- util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py[395-420]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

# Span registry (run_id -> Span) to allow parenting even after original invocation ended.
# We intentionally retain ended parent spans to preserve trace linkage for late children
# (e.g., final LLM call after agent/workflow termination). A lightweight size cap can be
Expand Down Expand Up @@ -340,6 +347,51 @@ def _refresh_capture_content(
except Exception:
pass

@staticmethod
def _get_current_span_attribute(key: str) -> Optional[Any]:
"""Best-effort extraction of an attribute from the active span."""
try:
current_span = _trace_mod.get_current_span()
except Exception:
return None
if current_span is None:
return None
attributes = getattr(current_span, "attributes", None)
if attributes is None:
attributes = getattr(current_span, "_attributes", None)
if not attributes:
return None
try:
return attributes.get(key)
except Exception:
return None

def _inherit_parent_context_attributes(self, invocation: GenAI) -> None:
"""Propagate agent/workflow identity from active parent span context."""
if not invocation.agent_name:
parent_agent_name = self._get_current_span_attribute(
GEN_AI_AGENT_NAME
)
if isinstance(parent_agent_name, str) and parent_agent_name:
invocation.agent_name = parent_agent_name

if not invocation.agent_id:
parent_agent_id = self._get_current_span_attribute(GEN_AI_AGENT_ID)
if isinstance(parent_agent_id, str) and parent_agent_id:
invocation.agent_id = parent_agent_id

if GEN_AI_WORKFLOW_NAME not in invocation.attributes:
parent_workflow_name = self._get_current_span_attribute(
GEN_AI_WORKFLOW_NAME
)
if (
isinstance(parent_workflow_name, str)
and parent_workflow_name
):
invocation.attributes[GEN_AI_WORKFLOW_NAME] = (
parent_workflow_name
)

def start_llm(
self,
invocation: LLMInvocation,
Expand All @@ -357,6 +409,14 @@ def start_llm(
invocation.agent_name = top_name
if not invocation.agent_id:
invocation.agent_id = top_id
if (
GEN_AI_WORKFLOW_NAME not in invocation.attributes
and self._workflow_context_stack
):
invocation.attributes[GEN_AI_WORKFLOW_NAME] = (
self._workflow_context_stack[-1]
)
self._inherit_parent_context_attributes(invocation)
# Start invocation span; tracer context propagation handles parent/child links
self._emitter.on_start(invocation)
# Register span if created
Expand Down Expand Up @@ -475,6 +535,14 @@ def start_embedding(
invocation.agent_name = top_name
if not invocation.agent_id:
invocation.agent_id = top_id
if (
GEN_AI_WORKFLOW_NAME not in invocation.attributes
and self._workflow_context_stack
):
invocation.attributes[GEN_AI_WORKFLOW_NAME] = (
self._workflow_context_stack[-1]
)
self._inherit_parent_context_attributes(invocation)
invocation.start_time = time.time()
self._emitter.on_start(invocation)
span = getattr(invocation, "span", None)
Expand Down Expand Up @@ -541,6 +609,14 @@ def start_retrieval(
invocation.agent_name = top_name
if not invocation.agent_id:
invocation.agent_id = top_id
if (
GEN_AI_WORKFLOW_NAME not in invocation.attributes
and self._workflow_context_stack
):
invocation.attributes[GEN_AI_WORKFLOW_NAME] = (
self._workflow_context_stack[-1]
)
self._inherit_parent_context_attributes(invocation)
invocation.start_time = time.time()
self._emitter.on_start(invocation)
span = getattr(invocation, "span", None)
Expand Down Expand Up @@ -603,6 +679,14 @@ def start_tool_call(self, invocation: ToolCall) -> ToolCall:
invocation.agent_name = top_name
if not invocation.agent_id:
invocation.agent_id = top_id
if (
GEN_AI_WORKFLOW_NAME not in invocation.attributes
and self._workflow_context_stack
):
invocation.attributes[GEN_AI_WORKFLOW_NAME] = (
self._workflow_context_stack[-1]
)
self._inherit_parent_context_attributes(invocation)
self._emitter.on_start(invocation)
span = getattr(invocation, "span", None)
if span is not None:
Expand Down Expand Up @@ -643,6 +727,8 @@ def start_workflow(self, workflow: Workflow) -> Workflow:
if span is not None:
self._span_registry[str(workflow.run_id)] = span
self._entity_registry[str(workflow.run_id)] = workflow
if workflow.name:
self._workflow_context_stack.append(workflow.name)
return workflow

def _handle_evaluation_results(
Expand Down Expand Up @@ -784,6 +870,14 @@ def stop_workflow(self, workflow: Workflow) -> Workflow:
self._meter_provider.force_flush() # type: ignore[attr-defined]
except Exception:
pass
try:
if (
self._workflow_context_stack
and self._workflow_context_stack[-1] == workflow.name
):
self._workflow_context_stack.pop()
except Exception:
pass
return workflow

def fail_workflow(self, workflow: Workflow, error: Error) -> Workflow:
Expand All @@ -800,6 +894,14 @@ def fail_workflow(self, workflow: Workflow, error: Error) -> Workflow:
self._meter_provider.force_flush() # type: ignore[attr-defined]
except Exception:
pass
try:
if (
self._workflow_context_stack
and self._workflow_context_stack[-1] == workflow.name
):
self._workflow_context_stack.pop()
except Exception:
pass
return workflow

# Agent lifecycle -----------------------------------------------------
Expand All @@ -808,6 +910,14 @@ def start_agent(
) -> AgentCreation | AgentInvocation:
"""Start an agent operation (create or invoke) and create a pending span entry."""
self._refresh_capture_content()
if (
GEN_AI_WORKFLOW_NAME not in agent.attributes
and self._workflow_context_stack
):
agent.attributes[GEN_AI_WORKFLOW_NAME] = (
self._workflow_context_stack[-1]
)
self._inherit_parent_context_attributes(agent)
self._emitter.on_start(agent)
span = getattr(agent, "span", None)
if span is not None:
Expand Down
Loading