diff --git a/examples/README.md b/examples/README.md index e748cf5a..ab7b7994 100644 --- a/examples/README.md +++ b/examples/README.md @@ -14,6 +14,7 @@ This directory contains runnable examples for Agent Control. Each example has it | Customer Support Agent | Enterprise scenario with PII protection, prompt-injection defense, and multiple tools. | https://docs.agentcontrol.dev/examples/customer-support | | DeepEval | Build a custom evaluator using DeepEval GEval metrics. | https://docs.agentcontrol.dev/examples/deepeval | | Galileo Luna-2 | Toxicity detection and content moderation with Galileo Protect. | https://docs.agentcontrol.dev/examples/galileo-luna2 | +| OTEL Merged Events | Create controls, merge local and server events, and inspect OTEL spans locally. | https://docs.agentcontrol.dev/examples/otel-merged-events | | LangChain SQL Agent | Protect a SQL agent from dangerous queries with server-side controls. | https://docs.agentcontrol.dev/examples/langchain-sql | | Steer Action Demo | Banking transfer agent showcasing allow, deny, warn, and steer actions. | https://docs.agentcontrol.dev/examples/steer-action-demo | | AWS Strands | Guardrails for AWS Strands agent workflows and tool calls. | https://docs.agentcontrol.dev/examples/aws-strands | diff --git a/examples/otel_merged_events/README.md b/examples/otel_merged_events/README.md new file mode 100644 index 00000000..cafe7e17 --- /dev/null +++ b/examples/otel_merged_events/README.md @@ -0,0 +1,68 @@ +# OTEL Merged Events Example + +This example shows how Agent Control can: + +- create one SDK-local control and one server-side control +- merge the resulting control execution events in the SDK +- export the merged batch through the OTEL event sink +- collect the exported OTEL spans locally with an in-memory exporter + +## What this example shows + +- control creation and agent association on the server +- SDK-local and server-side evaluation in the same protected function call +- merged-event OTEL emission without needing a live collector +- the OTEL attributes emitted for each control execution + +## Prerequisites + +1. Start the Agent Control server from the repo root: + +```bash +make server-run +``` + +2. Install the example dependencies: + +```bash +cd examples/otel_merged_events +uv pip install -e . --upgrade +``` + +## Setup + +Create the demo agent and controls: + +```bash +cd examples/otel_merged_events +uv run python setup_controls.py +``` + +This creates: + +- `otel-merged-local-input-check` +- `otel-merged-server-input-check` + +Both controls use composite `and` conditions with multiple evaluator leaves so +the exported OTEL spans include representative and aggregate metadata such as +`primary_evaluator`, `primary_selector_path`, `leaf_count`, +`all_evaluators`, and `all_selector_paths`. + +## Run + +Run the demo script: + +```bash +cd examples/otel_merged_events +uv run python demo_agent.py +``` + +The script prints: + +- the protected function result +- the number of OTEL spans collected +- one OTEL span per reconstructed control execution event + +This example uses an in-memory OTEL exporter so you can inspect the spans +locally. In a production setup, the same merged-event sink can export to an +OTLP endpoint instead. diff --git a/examples/otel_merged_events/demo_agent.py b/examples/otel_merged_events/demo_agent.py new file mode 100644 index 00000000..3eb0e4fa --- /dev/null +++ b/examples/otel_merged_events/demo_agent.py @@ -0,0 +1,361 @@ +#!/usr/bin/env python3 +"""Inspect merged control events and the OTEL spans emitted from them.""" + +from __future__ import annotations + +import asyncio +import json +import logging +import os +import sys +from typing import Any + +os.environ.setdefault("AGENT_CONTROL_OTEL_ENABLED", "true") +os.environ.setdefault("AGENT_CONTROL_OTEL_SERVICE_NAME", "agent-control-otel-demo") + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + datefmt="%H:%M:%S", +) +logging.getLogger("httpx").setLevel(logging.WARNING) +logging.getLogger("httpcore").setLevel(logging.WARNING) + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../../sdks/python/src")) + +from opentelemetry import trace +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) + +import agent_control +from agent_control import AgentControlClient, emit_control_events, get_server_controls +from agent_control.evaluation import ( + _ControlAdapter, + _build_server_control_lookup, + _get_applicable_controls, + _has_applicable_prefiltered_server_controls, + _merge_results, +) +from agent_control.evaluation_events import build_control_execution_events +from agent_control.telemetry import has_control_event_sink +from agent_control.tracing import with_trace +from agent_control_engine.core import ControlEngine +from agent_control_models import ControlDefinition, EvaluationRequest, EvaluationResponse + + +AGENT_NAME = "otel-merged-events-demo-agent" +SERVER_URL = os.getenv("AGENT_CONTROL_URL", "http://localhost:8000") +MESSAGE = "local-trigger priority server-trigger elevated in one request" + + +def configure_in_memory_exporter() -> InMemorySpanExporter: + """Configure a tracer provider that stores exported spans in memory. + + Args: + None. + + Returns: + The configured in-memory OTEL exporter. + """ + exporter = InMemorySpanExporter() + provider = TracerProvider( + resource=Resource.create({"service.name": "agent-control-otel-demo"}) + ) + provider.add_span_processor(SimpleSpanProcessor(exporter)) + trace.set_tracer_provider(provider) + return exporter + + +def print_json_block(title: str, payload: dict[str, Any]) -> None: + """Print a titled JSON block. + + Args: + title: Section title to print. + payload: JSON-serializable payload. + + Returns: + None. + """ + print(f"\n{title}") + print("-" * len(title)) + print(json.dumps(payload, indent=2, sort_keys=True, default=str)) + + +def print_event_summary(label: str, events: list[Any]) -> None: + """Print a concise summary of reconstructed events. + + Args: + label: Section label to print. + events: Reconstructed control execution events. + + Returns: + None. + """ + print(f"\n{label}") + print("-" * len(label)) + if not events: + print("No events reconstructed.") + return + + for event in events: + print( + f"- control={event.control_name} stage={event.check_stage} " + f"matched={event.matched} action={event.action} " + f"trace_id={event.trace_id} parent_span_id={event.span_id}" + ) + + +def print_event_details(label: str, events: list[Any]) -> None: + """Print the full reconstructed control-event payloads. + + Args: + label: Section label to print. + events: Reconstructed control execution events. + + Returns: + None. + """ + print(f"\n{label}") + print("-" * len(label)) + if not events: + print("No events reconstructed.") + return + + for index, event in enumerate(events, start=1): + print_json_block( + f"Event {index}", + event.model_dump(mode="json"), + ) + + +def span_to_collector_payload(span: Any) -> dict[str, Any]: + """Render a finished OTEL span as a collector-facing payload sample. + + Args: + span: Finished span captured by the in-memory exporter. + + Returns: + A simplified payload showing the fields an OTEL collector/backend + typically receives and indexes. + """ + resource = {} + if getattr(span, "resource", None) is not None: + resource = dict(sorted(dict(span.resource.attributes).items())) + + return { + "resource": resource, + "scope": { + "name": getattr(getattr(span, "instrumentation_scope", None), "name", None), + "version": getattr( + getattr(span, "instrumentation_scope", None), "version", None + ), + }, + "span": { + "name": span.name, + "trace_id": f"{span.context.trace_id:032x}", + "span_id": f"{span.context.span_id:016x}", + "parent_span_id": ( + f"{span.parent.span_id:016x}" if span.parent is not None else None + ), + "start_time_unix_nano": span.start_time, + "end_time_unix_nano": span.end_time, + "attributes": dict(sorted(dict(span.attributes).items())), + }, + } + + +def empty_response() -> EvaluationResponse: + """Return an empty evaluation response for merge convenience. + + Args: + None. + + Returns: + An empty successful evaluation response. + """ + return EvaluationResponse( + is_safe=True, + confidence=1.0, + reason=None, + matches=None, + errors=None, + non_matches=None, + ) + + +def partition_controls( + controls: list[dict[str, Any]], +) -> tuple[list[_ControlAdapter], list[dict[str, Any]]]: + """Split cached controls into SDK-local and server-side groups. + + Args: + controls: Raw control payloads cached in the SDK. + + Returns: + A tuple of parsed SDK-local controls and raw server control payloads. + """ + local_controls: list[_ControlAdapter] = [] + server_control_payloads: list[dict[str, Any]] = [] + + for control in controls: + control_data = control.get("control", {}) + if control_data.get("execution", "server") == "sdk": + local_controls.append( + _ControlAdapter( + id=control["id"], + name=control["name"], + control=ControlDefinition.model_validate(control_data), + ) + ) + else: + server_control_payloads.append(control) + + return local_controls, server_control_payloads + + +async def run_walkthrough(exporter: InMemorySpanExporter) -> None: + """Run the merged-event walkthrough and print each intermediate artifact. + + Args: + exporter: In-memory OTEL exporter that records finished spans. + + Returns: + None. + """ + controls = get_server_controls() or [] + local_controls, server_control_payloads = partition_controls(controls) + request = EvaluationRequest( + agent_name=AGENT_NAME, + step={"type": "llm", "name": "draft_answer", "input": MESSAGE}, + stage="pre", + ) + + print("=" * 80) + print("Merged Control Events OTEL Demo") + print("=" * 80) + print(f"Input message: {MESSAGE}") + print(f"Loaded controls: {len(controls)}") + print( + f"SDK-local controls: {[control.name for control in local_controls]} | " + f"Server-side controls: {[control['name'] for control in server_control_payloads]}" + ) + print(f"Merged-event sink registered: {has_control_event_sink()}") + print( + "Composite controls are ordered intentionally so the collector output shows " + "different primary evaluator metadata for the local and server spans." + ) + + with with_trace() as (trace_id, span_id): + print(f"Trace context: trace_id={trace_id} span_id={span_id}") + + applicable_local_controls = _get_applicable_controls( + local_controls, + request, + context="sdk", + ) + print( + f"Applicable SDK-local controls on this input: " + f"{[control.name for control in applicable_local_controls]}" + ) + + local_result = empty_response() + local_events: list[Any] = [] + if applicable_local_controls: + local_engine = ControlEngine(applicable_local_controls, context="sdk") + local_result = await local_engine.process(request) + local_lookup = { + control.id: control.control for control in applicable_local_controls + } + local_events = build_control_execution_events( + local_result, + request, + local_lookup, + trace_id, + span_id, + AGENT_NAME, + ) + + print_json_block("Local evaluation response", local_result.model_dump(mode="json")) + print_event_summary("Reconstructed local events", local_events) + print_event_details("Local event details", local_events) + + server_result = empty_response() + server_events: list[Any] = [] + if _has_applicable_prefiltered_server_controls(server_control_payloads, request): + print( + f"Applicable server-side controls on this input: " + f"{[control['name'] for control in server_control_payloads]}" + ) + async with AgentControlClient(base_url=SERVER_URL) as client: + response = await client.http_client.post( + "/api/v1/evaluation", + json=request.model_dump(mode="json", exclude_none=True), + headers={ + "X-Trace-Id": trace_id, + "X-Span-Id": span_id, + "X-Agent-Control-Merge-Events": "true", + }, + ) + response.raise_for_status() + server_result = EvaluationResponse.model_validate(response.json()) + + server_lookup = _build_server_control_lookup(server_control_payloads) + server_events = build_control_execution_events( + server_result, + request, + server_lookup, + trace_id, + span_id, + AGENT_NAME, + ) + + print_json_block("Server evaluation response", server_result.model_dump(mode="json")) + print_event_summary("Reconstructed server events", server_events) + print_event_details("Server event details", server_events) + + merged_result = _merge_results(local_result, server_result) + merged_events = local_events + server_events + + print_json_block("Merged evaluation result", merged_result.model_dump(mode="json")) + print_event_summary("Final merged event batch", merged_events) + print_event_details("Merged event details", merged_events) + + emit_control_events(merged_events) + spans = exporter.get_finished_spans() + + print("\nCollector output") + print("----------------") + print(f"Collected OTEL spans: {len(spans)}") + for index, span in enumerate(spans, start=1): + print_json_block( + f"Collector payload for span {index}", + span_to_collector_payload(span), + ) + + +async def main() -> None: + """Initialize the demo agent and run the OTEL walkthrough. + + Args: + None. + + Returns: + None. + """ + exporter = configure_in_memory_exporter() + + agent_control.init( + agent_name=AGENT_NAME, + agent_description="Demo agent for OTEL merged-event emission", + server_url=SERVER_URL, + ) + + await run_walkthrough(exporter) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/otel_merged_events/pyproject.toml b/examples/otel_merged_events/pyproject.toml new file mode 100644 index 00000000..3a1aa4ca --- /dev/null +++ b/examples/otel_merged_events/pyproject.toml @@ -0,0 +1,25 @@ +[project] +name = "agent-control-otel-merged-events-example" +version = "0.1.0" +description = "Merged control event OTEL export example for Agent Control" +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + "agent-control-engine", + "agent-control-evaluators", + "agent-control-models", + "agent-control-sdk[otel]", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["."] + +[tool.uv.sources] +agent-control-sdk = { path = "../../sdks/python", editable = true } +agent-control-models = { path = "../../models", editable = true } +agent-control-engine = { path = "../../engine", editable = true } +agent-control-evaluators = { path = "../../evaluators/builtin", editable = true } diff --git a/examples/otel_merged_events/setup_controls.py b/examples/otel_merged_events/setup_controls.py new file mode 100644 index 00000000..499ba173 --- /dev/null +++ b/examples/otel_merged_events/setup_controls.py @@ -0,0 +1,209 @@ +#!/usr/bin/env python3 +"""Create the controls used by the OTEL merged-events demo.""" + +from __future__ import annotations + +import asyncio +import os +import sys + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../../sdks/python/src")) + +from agent_control import AgentControlClient + + +AGENT_NAME = "otel-merged-events-demo-agent" +SERVER_URL = os.getenv("AGENT_CONTROL_URL", "http://localhost:8000") + + +async def create_agent(client: AgentControlClient) -> None: + """Create or fetch the demo agent. + + Args: + client: Configured Agent Control client. + + Returns: + None. + """ + response = await client.http_client.post( + "/api/v1/agents/initAgent", + json={ + "agent": { + "agent_name": AGENT_NAME, + "agent_description": "Demo agent for OTEL merged-event emission", + }, + "steps": [], + }, + ) + response.raise_for_status() + created = response.json().get("created", False) + label = "Created" if created else "Fetched" + print(f"{label} agent: {AGENT_NAME}") + + +async def create_control( + client: AgentControlClient, + name: str, + control_definition: dict[str, object], +) -> int: + """Create a control and return its ID. + + Args: + client: Configured Agent Control client. + name: Control name. + control_definition: Control definition payload. + + Returns: + The created control ID. + """ + response = await client.http_client.put( + "/api/v1/controls", + json={"name": name, "data": control_definition}, + ) + control_exists = response.status_code == 409 + if control_exists: + response = await client.http_client.get("/api/v1/controls", params={"name": name}) + response.raise_for_status() + controls = [ + control + for control in response.json().get("controls", []) + if control.get("name") == name + ] + if not controls: + raise RuntimeError(f"Could not find existing control named '{name}'") + + control_id = controls[0]["id"] + response = await client.http_client.put( + f"/api/v1/controls/{control_id}/data", + json={"data": control_definition}, + ) + response.raise_for_status() + print(f"Updated existing control '{name}' with id {control_id}") + return control_id + + response.raise_for_status() + control_id = response.json()["control_id"] + print(f"Created control '{name}' with id {control_id}") + return control_id + + +async def attach_control( + client: AgentControlClient, + control_id: int, +) -> None: + """Attach a control to the demo agent. + + Args: + client: Configured Agent Control client. + control_id: Control ID to attach. + + Returns: + None. + """ + response = await client.http_client.post( + f"/api/v1/agents/{AGENT_NAME}/controls/{control_id}" + ) + if response.status_code not in (200, 409): + response.raise_for_status() + print(f"Attached control {control_id} to agent {AGENT_NAME}") + + +async def main() -> None: + """Create the demo controls. + + Args: + None. + + Returns: + None. + """ + async with AgentControlClient(base_url=SERVER_URL) as client: + await create_agent(client) + + local_control_id = await create_control( + client, + "otel-merged-local-input-check", + { + "description": ( + "SDK-local composite control for merged OTEL event export demo" + ), + "enabled": True, + "execution": "sdk", + "scope": {"step_types": ["llm"], "stages": ["pre"]}, + "condition": { + "and": [ + { + "selector": {"path": "input"}, + "evaluator": { + "name": "regex", + "config": {"pattern": "local-trigger", "flags": []}, + }, + }, + { + "selector": {"path": "input"}, + "evaluator": { + "name": "list", + "config": { + "values": ["priority"], + "logic": "any", + "match_on": "match", + "match_mode": "contains", + "case_sensitive": False, + }, + }, + }, + ] + }, + "action": {"decision": "allow"}, + "tags": ["otel", "merged-events", "sdk"], + }, + ) + + server_control_id = await create_control( + client, + "otel-merged-server-input-check", + { + "description": ( + "Server-side composite control for merged OTEL event export demo" + ), + "enabled": True, + "execution": "server", + "scope": {"step_types": ["llm"], "stages": ["pre"]}, + "condition": { + "and": [ + { + "selector": {"path": "input"}, + "evaluator": { + "name": "list", + "config": { + "values": ["server-trigger"], + "logic": "any", + "match_on": "match", + "match_mode": "contains", + "case_sensitive": False, + }, + }, + }, + { + "selector": {"path": "input"}, + "evaluator": { + "name": "regex", + "config": {"pattern": "elevated", "flags": []}, + }, + }, + ] + }, + "action": {"decision": "allow"}, + "tags": ["otel", "merged-events", "server"], + }, + ) + + await attach_control(client, local_control_id) + await attach_control(client, server_control_id) + + print("\nSetup complete.") + print("Run demo_agent.py to trigger both controls and collect OTEL spans.") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/models/src/agent_control_models/evaluation.py b/models/src/agent_control_models/evaluation.py index 07ab4810..458c91a5 100644 --- a/models/src/agent_control_models/evaluation.py +++ b/models/src/agent_control_models/evaluation.py @@ -127,8 +127,6 @@ class EvaluationResponse(BaseModel): default=None, description="List of controls that were evaluated but did not match (if any)", ) - - class EvaluationResult(EvaluationResponse): """ Client-side result model for evaluation analysis. diff --git a/sdks/python/pyproject.toml b/sdks/python/pyproject.toml index 590043b6..6986268c 100644 --- a/sdks/python/pyproject.toml +++ b/sdks/python/pyproject.toml @@ -38,6 +38,11 @@ Repository = "https://github.com/yourusername/agent-control" strands-agents = ["strands-agents>=1.26.0"] google-adk = ["google-adk>=1.0.0"] galileo = ["agent-control-evaluator-galileo>=3.0.0"] +otel = [ + "opentelemetry-api>=1.28.0", + "opentelemetry-sdk>=1.28.0", + "opentelemetry-exporter-otlp-proto-http>=1.28.0", +] [dependency-groups] dev = [ diff --git a/sdks/python/src/agent_control/__init__.py b/sdks/python/src/agent_control/__init__.py index 24353411..31a92b69 100644 --- a/sdks/python/src/agent_control/__init__.py +++ b/sdks/python/src/agent_control/__init__.py @@ -95,8 +95,17 @@ async def handle_input(user_message: str) -> str: sync_shutdown_observability, ) from .telemetry import ( + clear_control_event_sink, clear_trace_context_provider, + configure_otel_event_sink, + control_event_to_otel_attributes, + control_event_to_otel_span, + create_otel_event_sink, + emit_control_events, get_trace_context_from_provider, + has_control_event_sink, + is_otel_event_emission_configured, + set_control_event_sink, set_trace_context_provider, ) from .tracing import ( @@ -619,6 +628,9 @@ def run_in_thread() -> None: if batcher: logger.info("Observability enabled") + if configure_otel_event_sink(): + logger.info("OTEL merged-event emission enabled") + if policy_refresh_interval_seconds > 0: _start_policy_refresh_loop(policy_refresh_interval_seconds) else: @@ -1307,6 +1319,15 @@ async def main(): "set_trace_context_provider", "get_trace_context_from_provider", "clear_trace_context_provider", + "configure_otel_event_sink", + "control_event_to_otel_attributes", + "control_event_to_otel_span", + "create_otel_event_sink", + "set_control_event_sink", + "has_control_event_sink", + "is_otel_event_emission_configured", + "emit_control_events", + "clear_control_event_sink", # Observability "init_observability", "add_event", diff --git a/sdks/python/src/agent_control/evaluation.py b/sdks/python/src/agent_control/evaluation.py index d76af177..b9de8d1c 100644 --- a/sdks/python/src/agent_control/evaluation.py +++ b/sdks/python/src/agent_control/evaluation.py @@ -1,14 +1,12 @@ """Evaluation check operations for Agent Control SDK.""" from dataclasses import dataclass -from datetime import UTC, datetime from typing import Any, Literal, cast from agent_control_engine import list_evaluators from agent_control_engine.core import ControlEngine from agent_control_models import ( ControlDefinition, - ControlExecutionEvent, ControlMatch, EvaluationRequest, EvaluationResponse, @@ -19,139 +17,11 @@ from ._state import state from .client import AgentControlClient -from .observability import add_event, get_logger, is_observability_enabled +from .evaluation_events import build_control_execution_events, enqueue_observability_events +from .telemetry import emit_control_events, has_control_event_sink from .tracing import get_trace_and_span_ids from .validation import ensure_agent_name -_logger = get_logger(__name__) - -# Fallback IDs used when trace context is missing. -# All-zero values are invalid trace/span IDs per OpenTelemetry. -_FALLBACK_TRACE_ID = "0" * 32 -_FALLBACK_SPAN_ID = "0" * 16 -_trace_warning_logged = False - - -def _observability_metadata( - control_def: ControlDefinition, -) -> tuple[str | None, str | None, dict[str, object]]: - """Return representative event fields plus full composite context.""" - identity = control_def.observability_identity() - return ( - identity.selector_path, - identity.evaluator_name, - { - "primary_evaluator": identity.evaluator_name, - "primary_selector_path": identity.selector_path, - "leaf_count": identity.leaf_count, - "all_evaluators": identity.all_evaluators, - "all_selector_paths": identity.all_selector_paths, - }, - ) - - -def _map_applies_to(step_type: str) -> Literal["llm_call", "tool_call"]: - return "tool_call" if step_type == "tool" else "llm_call" - - -def _emit_local_events( - local_result: "EvaluationResponse", - request: "EvaluationRequest", - local_controls: list["_ControlAdapter"], - trace_id: str | None, - span_id: str | None, - agent_name: str | None, -) -> None: - """Emit observability events for locally-evaluated controls. - - Mirrors the server's _emit_observability_events() so that SDK-evaluated - controls are visible in the observability pipeline. - - When trace_id/span_id are missing, fallback all-zero IDs are used so events - are still recorded (but clearly marked as uncorrelated). - - Only runs when observability is enabled. - """ - if not is_observability_enabled(): - return - - global _trace_warning_logged # noqa: PLW0603 - if not trace_id or not span_id: - if not _trace_warning_logged: - _logger.warning( - "Emitting local control events without trace context; " - "events will use fallback IDs and cannot be correlated with traces. " - "Pass trace_id/span_id for full observability." - ) - _trace_warning_logged = True - trace_id = trace_id or _FALLBACK_TRACE_ID - span_id = span_id or _FALLBACK_SPAN_ID - - applies_to = _map_applies_to(request.step.type) - control_lookup = {c.id: c for c in local_controls} - now = datetime.now(UTC) - resolved_agent_name = agent_name or request.agent_name - - def _emit_matches(matches: list[ControlMatch] | None, matched: bool) -> None: - if not matches: - return - for match in matches: - ctrl = control_lookup.get(match.control_id) - event_metadata = dict(match.result.metadata or {}) - selector_path = None - evaluator_name = None - if ctrl: - selector_path, evaluator_name, identity_metadata = _observability_metadata( - ctrl.control - ) - event_metadata.update(identity_metadata) - add_event( - ControlExecutionEvent( - control_execution_id=match.control_execution_id, - trace_id=trace_id, - span_id=span_id, - agent_name=resolved_agent_name, - control_id=match.control_id, - control_name=match.control_name, - check_stage=request.stage, - applies_to=applies_to, - action=match.action, - matched=matched, - confidence=match.result.confidence, - timestamp=now, - evaluator_name=evaluator_name, - selector_path=selector_path, - error_message=match.result.error if not matched else None, - metadata=event_metadata, - ) - ) - - _emit_matches(local_result.matches, matched=True) - _emit_matches(local_result.errors, matched=False) - _emit_matches(local_result.non_matches, matched=False) - - -async def check_evaluation( - client: AgentControlClient, - agent_name: str, - step: "Step", - stage: Literal["pre", "post"], -) -> EvaluationResult: - """Check if agent interaction is safe.""" - normalized_name = ensure_agent_name(agent_name) - - request = EvaluationRequest( - agent_name=normalized_name, - step=step, - stage=stage, - ) - request_payload = request.model_dump(mode="json") - - response = await client.http_client.post("/api/v1/evaluation", json=request_payload) - response.raise_for_status() - - return cast(EvaluationResult, EvaluationResult.from_dict(response.json())) - @dataclass class _ControlAdapter: @@ -159,7 +29,7 @@ class _ControlAdapter: id: int name: str - control: "ControlDefinition" + control: ControlDefinition def _get_applicable_controls( @@ -176,6 +46,35 @@ def _get_applicable_controls( return cast(list[_ControlAdapter], applicable_controls) +def _build_server_control_lookup( + server_control_payloads: list[dict[str, Any]], +) -> dict[int, ControlDefinition]: + """Build a best-effort lookup of server control definitions. + + The merged-event path reconstructs server-side events in the SDK after the + server returns a lightweight ``EvaluationResponse``. This helper parses the + cached server control payloads so the shared event builder can reconstruct + those events locally. + + Args: + server_control_payloads: Raw cached server control payloads. + + Returns: + A mapping of control ID to parsed ``ControlDefinition`` for every + payload that can be parsed locally. + """ + control_lookup: dict[int, ControlDefinition] = {} + + for control in server_control_payloads: + try: + control_lookup[control["id"]] = ControlDefinition.model_validate(control["control"]) + except Exception: + # The server remains authoritative for malformed/unparseable controls. + continue + + return control_lookup + + def _has_applicable_prefiltered_server_controls( server_control_payloads: list[dict[str, Any]], request: EvaluationRequest, @@ -218,10 +117,23 @@ def _has_applicable_prefiltered_server_controls( def _merge_results( - local_result: "EvaluationResponse", - server_result: "EvaluationResponse", -) -> "EvaluationResult": - """Merge local and server evaluation results.""" + local_result: EvaluationResponse, + server_result: EvaluationResponse, +) -> EvaluationResult: + """Merge local and server evaluation results into one SDK-facing result. + + This helper merges only evaluation semantics. Event reconstruction happens + later so the response shape can stay lightweight regardless of which event + ingestion path is used. + + Args: + local_result: Evaluation response produced by SDK-local controls. + server_result: Evaluation response produced by server-side controls. + + Returns: + A merged ``EvaluationResult`` with combined matches, errors, + non-matches, and the strictest safety/confidence outcome. + """ is_safe = local_result.is_safe and server_result.is_safe confidence = min(local_result.confidence, server_result.confidence) @@ -255,41 +167,64 @@ def _merge_results( ) +async def check_evaluation( + client: AgentControlClient, + agent_name: str, + step: Step, + stage: Literal["pre", "post"], +) -> EvaluationResult: + """Check if agent interaction is safe.""" + normalized_name = ensure_agent_name(agent_name) + + request = EvaluationRequest( + agent_name=normalized_name, + step=step, + stage=stage, + ) + request_payload = request.model_dump(mode="json") + + response = await client.http_client.post("/api/v1/evaluation", json=request_payload) + response.raise_for_status() + + return cast(EvaluationResult, EvaluationResult.from_dict(response.json())) + + async def check_evaluation_with_local( client: AgentControlClient, agent_name: str, - step: "Step", + step: Step, stage: Literal["pre", "post"], controls: list[dict[str, Any]], trace_id: str | None = None, span_id: str | None = None, event_agent_name: str | None = None, ) -> EvaluationResult: - """ - Check if agent interaction is safe, running local controls first. + """Evaluate controls with local-first execution and configurable event flow. - This function executes controls with execution="sdk" locally in the SDK, - then calls the server for execution="server" controls. If a local control - denies, it short-circuits and returns immediately without calling the server. + This is the main decision boundary between the two supported event + ingestion styles: + - default behavior: local events are reconstructed and queued immediately in + the SDK, while server-side events are still emitted by the server + - merged-event behavior: local and server events are reconstructed in the + SDK and emitted once through a registered sink - Note on parse errors: If a local control fails to parse/validate, it is - skipped (logged as WARNING) and the error is included in result.errors. - This does NOT affect is_safe or confidence—callers concerned with safety - should check result.errors for any parse failures. + In both cases, the evaluation result itself stays lightweight and event + reconstruction happens after evaluation completes. Args: - client: AgentControlClient instance - agent_name: Normalized agent identifier - step: Step payload to evaluate - stage: 'pre' for pre-execution check, 'post' for post-execution check - controls: List of control dicts from initAgent response - (each has 'id', 'name', 'control' keys) + client: Configured AgentControl client. + agent_name: Agent name to evaluate against. + step: Step payload to evaluate. + stage: Evaluation stage, ``pre`` or ``post``. + controls: Cached control payloads used to split local vs server + execution. + trace_id: Optional explicit trace ID. + span_id: Optional explicit span ID. + event_agent_name: Optional override for the agent name stamped on + reconstructed events. Returns: - EvaluationResult with safety analysis (merged from local + server) - - Raises: - httpx.HTTPError: If server request fails + A merged evaluation result across local and server execution. """ normalized_name = ensure_agent_name(agent_name) resolved_trace_id = trace_id @@ -299,7 +234,6 @@ async def check_evaluation_with_local( resolved_trace_id = trace_id or current_trace_id resolved_span_id = span_id or current_span_id - # Partition controls by local flag local_controls: list[_ControlAdapter] = [] parse_errors: list[ControlMatch] = [] available_evaluators = list_evaluators() @@ -344,12 +278,6 @@ async def check_evaluation_with_local( except Exception as exc: control_id = control.get("id", -1) control_name = control.get("name", "unknown") - _logger.warning( - "Skipping invalid local control '%s' (id=%s): %s", - control_name, - control_id, - exc, - ) parse_errors.append( ControlMatch( control_id=control_id, @@ -374,16 +302,12 @@ def _with_parse_errors(result: EvaluationResult) -> EvaluationResult: if not parse_errors: return result combined_errors = (result.errors or []) + parse_errors - return EvaluationResult( - is_safe=result.is_safe, - confidence=result.confidence, - reason=result.reason, - matches=result.matches, - errors=combined_errors, - non_matches=result.non_matches, - ) + return result.model_copy(update={"errors": combined_errors}) + + merged_emission_enabled = has_control_event_sink() local_result: EvaluationResponse | None = None + local_events = [] applicable_local_controls = _get_applicable_controls( local_controls, request, @@ -392,27 +316,26 @@ def _with_parse_errors(result: EvaluationResult) -> EvaluationResult: if applicable_local_controls: engine = ControlEngine(applicable_local_controls, context="sdk") local_result = await engine.process(request) - - _emit_local_events( + local_control_lookup = { + control.id: control.control for control in applicable_local_controls + } + local_events = build_control_execution_events( local_result, request, - applicable_local_controls, + local_control_lookup, resolved_trace_id, resolved_span_id, - agent_name=event_agent_name, + event_agent_name, ) + if not merged_emission_enabled: + enqueue_observability_events(local_events) + if not local_result.is_safe: - return _with_parse_errors( - EvaluationResult( - is_safe=local_result.is_safe, - confidence=local_result.confidence, - reason=local_result.reason, - matches=local_result.matches, - errors=local_result.errors, - non_matches=local_result.non_matches, - ) - ) + result = _with_parse_errors(EvaluationResult.model_validate(local_result.model_dump())) + if merged_emission_enabled: + emit_control_events(local_events) + return result if _has_applicable_prefiltered_server_controls(server_control_payloads, request): request_payload = request.model_dump(mode="json", exclude_none=True) @@ -421,6 +344,8 @@ def _with_parse_errors(result: EvaluationResult) -> EvaluationResult: headers["X-Trace-Id"] = resolved_trace_id if resolved_span_id: headers["X-Span-Id"] = resolved_span_id + if merged_emission_enabled: + headers["X-Agent-Control-Merge-Events"] = "true" response = await client.http_client.post( "/api/v1/evaluation", @@ -429,32 +354,32 @@ def _with_parse_errors(result: EvaluationResult) -> EvaluationResult: ) response.raise_for_status() server_result = EvaluationResponse.model_validate(response.json()) + server_control_lookup = _build_server_control_lookup(server_control_payloads) + server_events = build_control_execution_events( + server_result, + request, + server_control_lookup, + resolved_trace_id, + resolved_span_id, + event_agent_name, + ) if local_result is not None: - return _with_parse_errors(_merge_results(local_result, server_result)) - - return _with_parse_errors( - EvaluationResult( - is_safe=server_result.is_safe, - confidence=server_result.confidence, - reason=server_result.reason, - matches=server_result.matches, - errors=server_result.errors, - non_matches=server_result.non_matches, - ) - ) + result = _with_parse_errors(_merge_results(local_result, server_result)) + if merged_emission_enabled: + emit_control_events(local_events + server_events) + return result + + result = _with_parse_errors(EvaluationResult.model_validate(server_result.model_dump())) + if merged_emission_enabled: + emit_control_events(server_events) + return result if local_result is not None: - return _with_parse_errors( - EvaluationResult( - is_safe=local_result.is_safe, - confidence=local_result.confidence, - reason=local_result.reason, - matches=local_result.matches, - errors=local_result.errors, - non_matches=local_result.non_matches, - ) - ) + result = _with_parse_errors(EvaluationResult.model_validate(local_result.model_dump())) + if merged_emission_enabled: + emit_control_events(local_events) + return result return _with_parse_errors(EvaluationResult(is_safe=True, confidence=1.0)) @@ -471,58 +396,10 @@ async def evaluate_controls( trace_id: str | None = None, span_id: str | None = None, ) -> EvaluationResult: - """ - Evaluate controls for a step. - - This convenience function evaluates controls (both local SDK-executed and - server-executed) for a given step. - - Args: - step_name: Name of the step (e.g., "chat", "search_db") - input: Input data for the step (for pre-stage evaluation) - output: Output data from the step (for post-stage evaluation) - context: Additional context metadata - step_type: Type of step - "llm" or "tool" (default: "llm") - stage: When to evaluate - "pre" or "post" (default: "pre") - agent_name: Agent name (required) - trace_id: Optional OpenTelemetry trace ID for observability - span_id: Optional OpenTelemetry span ID for observability - - Returns: - EvaluationResult with is_safe, confidence, reason, matches, errors - - Raises: - httpx.HTTPError: If server request fails - - Example: - import agent_control - - # Evaluate controls for an agent - result = await agent_control.evaluate_controls( - "chat", - input="User message here", - stage="pre", - agent_name="customer-service-bot" - ) - - # With trace/span IDs for observability - result = await agent_control.evaluate_controls( - "chat", - input="User message", - stage="pre", - agent_name="customer-service-bot", - trace_id="4bf92f3577b34da6a3ce929d0e0e4736", - span_id="00f067aa0ba902b7" - ) - """ - # Ensure server_url is set (for mypy type narrowing) + """Evaluate controls for a step.""" if state.server_url is None: - raise RuntimeError( - "Server URL not configured. Call agent_control.init() first." - ) + raise RuntimeError("Server URL not configured. Call agent_control.init() first.") - # Build Step dict (input and output are required by Step model) - # Tool steps require dict input/output, LLM steps use strings default_value = {} if step_type == "tool" else "" step_dict: dict[str, Any] = { "type": step_type, @@ -533,15 +410,11 @@ async def evaluate_controls( if context is not None: step_dict["context"] = context - # Convert to Step object if models available - step_obj = Step(**step_dict) # type: ignore - - # Get controls from server cache + step_obj = Step(**step_dict) # type: ignore[arg-type] resolved_controls = state.server_controls or [] - # Evaluate using local + server controls async with AgentControlClient(base_url=state.server_url, api_key=state.api_key) as client: - result = await check_evaluation_with_local( + return await check_evaluation_with_local( client=client, agent_name=agent_name, step=step_obj, @@ -551,5 +424,3 @@ async def evaluate_controls( span_id=span_id, event_agent_name=agent_name, ) - - return result diff --git a/sdks/python/src/agent_control/evaluation_events.py b/sdks/python/src/agent_control/evaluation_events.py new file mode 100644 index 00000000..c21ac08a --- /dev/null +++ b/sdks/python/src/agent_control/evaluation_events.py @@ -0,0 +1,210 @@ +"""Derived control-execution event reconstruction for SDK evaluation flows.""" + +from datetime import UTC, datetime +from typing import Literal + +from agent_control_models import ( + ControlDefinition, + ControlExecutionEvent, + ControlMatch, + EvaluationRequest, + EvaluationResponse, +) + +from .observability import add_event, get_logger, is_observability_enabled + +_logger = get_logger(__name__) + +# All-zero values are invalid trace/span IDs per OpenTelemetry and make it +# obvious that the event could not be correlated to an external trace. +_FALLBACK_TRACE_ID = "0" * 32 +_FALLBACK_SPAN_ID = "0" * 16 +_trace_warning_logged = False + + +def observability_metadata( + control_def: ControlDefinition, +) -> tuple[str | None, str | None, dict[str, object]]: + """Return representative event fields plus full composite context.""" + identity = control_def.observability_identity() + return ( + identity.selector_path, + identity.evaluator_name, + { + "primary_evaluator": identity.evaluator_name, + "primary_selector_path": identity.selector_path, + "leaf_count": identity.leaf_count, + "all_evaluators": identity.all_evaluators, + "all_selector_paths": identity.all_selector_paths, + }, + ) + + +def map_applies_to(step_type: str) -> Literal["llm_call", "tool_call"]: + """Map Agent Control step types to observability applies_to values.""" + return "tool_call" if step_type == "tool" else "llm_call" + + +def _resolve_event_trace_context( + trace_id: str | None, + span_id: str | None, +) -> tuple[str, str]: + """Return event IDs, applying fallback IDs and a one-time warning if needed.""" + global _trace_warning_logged # noqa: PLW0603 + + if trace_id and span_id: + return trace_id, span_id + + if not _trace_warning_logged: + _logger.warning( + "Emitting control events without trace context; events will use fallback " + "IDs and cannot be correlated with traces. Pass trace_id/span_id for " + "full observability." + ) + _trace_warning_logged = True + + return trace_id or _FALLBACK_TRACE_ID, span_id or _FALLBACK_SPAN_ID + + +def _build_events_for_matches( + matches: list[ControlMatch] | None, + *, + matched: bool, + request: EvaluationRequest, + control_lookup: dict[int, ControlDefinition], + trace_id: str, + span_id: str, + agent_name: str, + now: datetime, +) -> list[ControlExecutionEvent]: + if not matches: + return [] + + applies_to = map_applies_to(request.step.type) + events: list[ControlExecutionEvent] = [] + + for match in matches: + control_def = control_lookup.get(match.control_id) + event_metadata = dict(match.result.metadata or {}) + selector_path = None + evaluator_name = None + + if control_def is not None: + selector_path, evaluator_name, identity_metadata = observability_metadata(control_def) + event_metadata.update(identity_metadata) + + events.append( + ControlExecutionEvent( + control_execution_id=match.control_execution_id, + trace_id=trace_id, + span_id=span_id, + agent_name=agent_name, + control_id=match.control_id, + control_name=match.control_name, + check_stage=request.stage, + applies_to=applies_to, + action=match.action, + matched=matched, + confidence=match.result.confidence, + timestamp=now, + evaluator_name=evaluator_name, + selector_path=selector_path, + error_message=match.result.error if not matched else None, + metadata=event_metadata, + ) + ) + + return events + + +def build_control_execution_events( + response: EvaluationResponse, + request: EvaluationRequest, + control_lookup: dict[int, ControlDefinition], + trace_id: str | None, + span_id: str | None, + agent_name: str | None, +) -> list[ControlExecutionEvent]: + """Reconstruct control execution events from an evaluation response. + + This is the shared reconstruction step used by both supported ingestion + styles: + - the default SDK observability path, where reconstructed local events are + queued into the existing SDK batcher + - the merged-event path, where local and server events are reconstructed in + the SDK and emitted together through a registered sink + + Args: + response: Evaluation response containing matches, errors, and + non-matches. + request: Original evaluation request used to derive stage and + ``applies_to``. + control_lookup: Parsed controls keyed by control ID. + trace_id: Optional trace ID for correlation. + span_id: Optional span ID for correlation. + agent_name: Optional override for the agent name stamped on events. + + Returns: + A list of reconstructed ``ControlExecutionEvent`` objects. + """ + resolved_trace_id, resolved_span_id = _resolve_event_trace_context(trace_id, span_id) + resolved_agent_name = agent_name or request.agent_name + now = datetime.now(UTC) + + events: list[ControlExecutionEvent] = [] + events.extend( + _build_events_for_matches( + response.matches, + matched=True, + request=request, + control_lookup=control_lookup, + trace_id=resolved_trace_id, + span_id=resolved_span_id, + agent_name=resolved_agent_name, + now=now, + ) + ) + events.extend( + _build_events_for_matches( + response.errors, + matched=False, + request=request, + control_lookup=control_lookup, + trace_id=resolved_trace_id, + span_id=resolved_span_id, + agent_name=resolved_agent_name, + now=now, + ) + ) + events.extend( + _build_events_for_matches( + response.non_matches, + matched=False, + request=request, + control_lookup=control_lookup, + trace_id=resolved_trace_id, + span_id=resolved_span_id, + agent_name=resolved_agent_name, + now=now, + ) + ) + return events + + +def enqueue_observability_events(events: list[ControlExecutionEvent]) -> None: + """Enqueue reconstructed events through the existing SDK observability path. + + This preserves the default SDK behavior of forwarding local events through + the existing observability batcher rather than a custom merged-event sink. + + Args: + events: Reconstructed control execution events to enqueue. + + Returns: + None. + """ + if not is_observability_enabled(): + return + + for event in events: + add_event(event) diff --git a/sdks/python/src/agent_control/settings.py b/sdks/python/src/agent_control/settings.py index f06398d4..a5dbd3d5 100644 --- a/sdks/python/src/agent_control/settings.py +++ b/sdks/python/src/agent_control/settings.py @@ -118,6 +118,24 @@ class SDKSettings(BaseSettings): description="Log span results (legacy compatibility)", ) + # Optional OTEL emission for merged control events + otel_enabled: bool = Field( + default=False, + description="Enable OTEL emission for merged control execution events", + ) + otel_endpoint: str = Field( + default="", + description="OTLP HTTP endpoint for control execution span export", + ) + otel_headers: dict[str, str] = Field( + default_factory=dict, + description="Headers to include when exporting OTEL spans", + ) + otel_service_name: str = Field( + default="agent-control", + description="Service name to use for OTEL control execution spans", + ) + # Global settings instance - loaded from environment at import time settings = SDKSettings() diff --git a/sdks/python/src/agent_control/telemetry/__init__.py b/sdks/python/src/agent_control/telemetry/__init__.py index 8d2ccf90..842b2264 100644 --- a/sdks/python/src/agent_control/telemetry/__init__.py +++ b/sdks/python/src/agent_control/telemetry/__init__.py @@ -1,5 +1,19 @@ -"""Telemetry interfaces for provider-agnostic tracing.""" +"""Telemetry interfaces for provider-agnostic tracing and event emission.""" +from .event_sink import ( + ControlEventSink, + clear_control_event_sink, + emit_control_events, + has_control_event_sink, + set_control_event_sink, +) +from .otel import ( + configure_otel_event_sink, + control_event_to_otel_attributes, + control_event_to_otel_span, + create_otel_event_sink, + is_otel_event_emission_configured, +) from .trace_context import ( TraceContext, TraceContextProvider, @@ -9,9 +23,19 @@ ) __all__ = [ + "ControlEventSink", "TraceContext", "TraceContextProvider", + "clear_control_event_sink", "clear_trace_context_provider", + "configure_otel_event_sink", + "control_event_to_otel_attributes", + "control_event_to_otel_span", + "create_otel_event_sink", + "emit_control_events", "get_trace_context_from_provider", + "has_control_event_sink", + "is_otel_event_emission_configured", + "set_control_event_sink", "set_trace_context_provider", ] diff --git a/sdks/python/src/agent_control/telemetry/event_sink.py b/sdks/python/src/agent_control/telemetry/event_sink.py new file mode 100644 index 00000000..cb9910c3 --- /dev/null +++ b/sdks/python/src/agent_control/telemetry/event_sink.py @@ -0,0 +1,65 @@ +"""Provider-agnostic sink for merged control execution events.""" + +from collections.abc import Callable + +from agent_control_models import ControlExecutionEvent + +ControlEventSink = Callable[[list[ControlExecutionEvent]], None] + +_control_event_sink: ControlEventSink | None = None + + +def set_control_event_sink(sink: ControlEventSink | None) -> None: + """Register a sink for merged control execution events. + + Registering a sink enables the optional merged-event path, where the SDK + reconstructs local and server events and emits them together after merging + results. + + Args: + sink: Sink callback to receive merged control execution events, or + ``None`` to clear the current sink. + + Returns: + None. + """ + global _control_event_sink + _control_event_sink = sink + + +def emit_control_events(events: list[ControlExecutionEvent]) -> None: + """Emit merged control execution events to the registered sink. + + Args: + events: Merged control execution events to emit. + + Returns: + None. Sink failures are swallowed so evaluation behavior is not changed + by telemetry issues. + """ + if not events or _control_event_sink is None: + return + + try: + _control_event_sink(events) + except Exception: + # Sink failures should not break control evaluation. + pass + + +def has_control_event_sink() -> bool: + """Return whether the optional merged-event path is enabled. + + Args: + None. + + Returns: + ``True`` when a merged control event sink has been registered. + """ + return _control_event_sink is not None + + +def clear_control_event_sink() -> None: + """Clear the registered control event sink.""" + global _control_event_sink + _control_event_sink = None diff --git a/sdks/python/src/agent_control/telemetry/otel.py b/sdks/python/src/agent_control/telemetry/otel.py new file mode 100644 index 00000000..787394bf --- /dev/null +++ b/sdks/python/src/agent_control/telemetry/otel.py @@ -0,0 +1,276 @@ +"""OTEL emission helpers for merged control execution events.""" + +from __future__ import annotations + +import importlib +import json +from datetime import UTC +from typing import Any + +from agent_control_models import ControlExecutionEvent + +from ..settings import SDKSettings, get_settings +from ..tracing import validate_span_id, validate_trace_id +from .event_sink import ControlEventSink, has_control_event_sink, set_control_event_sink + + +def _import_optional_module(module_name: str) -> Any | None: + """Import a module if available. + + Args: + module_name: Fully-qualified module name. + + Returns: + Imported module object when available, otherwise ``None``. + """ + try: + return importlib.import_module(module_name) + except ImportError: # pragma: no cover - exercised in environments without OTEL installed + return None + + +def _import_optional_attr(module_name: str, attr_name: str) -> Any | None: + """Import an attribute from an optional module. + + Args: + module_name: Fully-qualified module name. + attr_name: Attribute to load from that module. + + Returns: + The imported attribute when available, otherwise ``None``. + """ + module = _import_optional_module(module_name) + if module is None: + return None + return getattr(module, attr_name, None) + + +trace: Any | None = _import_optional_module("opentelemetry.trace") +OTLPSpanExporter: Any | None = _import_optional_attr( + "opentelemetry.exporter.otlp.proto.http.trace_exporter", + "OTLPSpanExporter", +) +Resource: Any | None = _import_optional_attr("opentelemetry.sdk.resources", "Resource") +TracerProvider: Any | None = _import_optional_attr( + "opentelemetry.sdk.trace", + "TracerProvider", +) +BatchSpanProcessor: Any | None = _import_optional_attr( + "opentelemetry.sdk.trace.export", + "BatchSpanProcessor", +) +NonRecordingSpan: Any | None = _import_optional_attr( + "opentelemetry.trace", + "NonRecordingSpan", +) +SpanContext: Any | None = _import_optional_attr("opentelemetry.trace", "SpanContext") +TraceFlags: Any | None = _import_optional_attr("opentelemetry.trace", "TraceFlags") +TraceState: Any | None = _import_optional_attr("opentelemetry.trace", "TraceState") +set_span_in_context: Any | None = _import_optional_attr( + "opentelemetry.trace", + "set_span_in_context", +) + +_OTEL_SYSTEM = "agent-control" +_OTEL_EVENT_TYPE = "control_execution" +_TRACER_NAME = "agent_control.telemetry.otel" +_TRACER_VERSION = "1.0" +_otel_sink_configured = False + + +def is_otel_event_emission_configured( + sdk_settings: SDKSettings | None = None, +) -> bool: + """Return whether OTEL emission is configured. + + Args: + sdk_settings: Optional explicit SDK settings instance. + + Returns: + ``True`` when OTEL emission has been enabled or an OTLP endpoint has + been configured. + """ + effective_settings = sdk_settings or get_settings() + return effective_settings.otel_enabled or bool(effective_settings.otel_endpoint) + + +def control_event_to_otel_attributes( + event: ControlExecutionEvent, +) -> dict[str, str | bool | float | int]: + """Convert a control execution event into OTEL span attributes. + + Args: + event: Control execution event to translate. + + Returns: + A dictionary of OTEL span attributes representing the control event. + """ + attributes: dict[str, str | bool | float | int] = { + "gen_ai.system": _OTEL_SYSTEM, + "agent_control.event_type": _OTEL_EVENT_TYPE, + "agent_control.control_execution_id": event.control_execution_id, + "agent_control.agent_name": event.agent_name, + "agent_control.control_id": event.control_id, + "agent_control.control_name": event.control_name, + "agent_control.check_stage": event.check_stage, + "agent_control.applies_to": event.applies_to, + "agent_control.action": event.action, + "agent_control.matched": event.matched, + "agent_control.confidence": event.confidence, + } + + if event.execution_duration_ms is not None: + attributes["agent_control.execution_duration_ms"] = event.execution_duration_ms + if event.evaluator_name is not None: + attributes["agent_control.evaluator_name"] = event.evaluator_name + if event.selector_path is not None: + attributes["agent_control.selector_path"] = event.selector_path + if event.error_message is not None: + attributes["agent_control.error_message"] = event.error_message + + for key, value in event.metadata.items(): + attr_key = f"agent_control.metadata.{key}" + if isinstance(value, (bool, int, float, str)): + attributes[attr_key] = value + else: + attributes[attr_key] = json.dumps(value, sort_keys=True, default=str) + + return attributes + + +def _build_parent_context(trace_id: str | None, span_id: str | None) -> Any | None: + """Build an OTEL parent context from event IDs when they are valid.""" + if not trace_id or not span_id: + return None + if not validate_trace_id(trace_id) or not validate_span_id(span_id): + return None + if ( + NonRecordingSpan is None + or SpanContext is None + or TraceFlags is None + or TraceState is None + or set_span_in_context is None + ): + return None + + parent_span = NonRecordingSpan( + SpanContext( + trace_id=int(trace_id, 16), + span_id=int(span_id, 16), + is_remote=True, + trace_flags=TraceFlags(TraceFlags.SAMPLED), + trace_state=TraceState(), + ) + ) + return set_span_in_context(parent_span) + + +def control_event_to_otel_span( + event: ControlExecutionEvent, + tracer: Any, +) -> None: + """Emit one control execution event as an OTEL span. + + Args: + event: Control execution event to emit. + tracer: OTEL tracer used to create the span. + + Returns: + None. + """ + parent_context = _build_parent_context(event.trace_id, event.span_id) + start_time = int(event.timestamp.astimezone(UTC).timestamp() * 1_000_000_000) + end_time = start_time + if event.execution_duration_ms is not None: + end_time += int(event.execution_duration_ms * 1_000_000) + + span = tracer.start_span( + name=f"control:{event.control_name}", + context=parent_context, + start_time=start_time, + attributes=control_event_to_otel_attributes(event), + ) + span.end(end_time=end_time) + + +def _ensure_otel_tracer( + sdk_settings: SDKSettings, +) -> Any | None: + """Return a configured OTEL tracer for control execution spans.""" + if trace is None: + return None + + global _otel_sink_configured + + if sdk_settings.otel_endpoint and not _otel_sink_configured: + if ( + OTLPSpanExporter is None + or Resource is None + or TracerProvider is None + or BatchSpanProcessor is None + ): + return None + + provider = trace.get_tracer_provider() + if not isinstance(provider, TracerProvider): + provider = TracerProvider( + resource=Resource.create({"service.name": sdk_settings.otel_service_name}) + ) + trace.set_tracer_provider(provider) + + exporter = OTLPSpanExporter( + endpoint=sdk_settings.otel_endpoint, + headers=sdk_settings.otel_headers or None, + ) + provider.add_span_processor(BatchSpanProcessor(exporter)) + _otel_sink_configured = True + + return trace.get_tracer(_TRACER_NAME, _TRACER_VERSION) + + +def create_otel_event_sink( + sdk_settings: SDKSettings | None = None, +) -> ControlEventSink | None: + """Create a merged-event sink that emits OTEL spans. + + Args: + sdk_settings: Optional explicit SDK settings instance. + + Returns: + A control event sink when OTEL support is available, otherwise ``None``. + """ + effective_settings = sdk_settings or get_settings() + tracer = _ensure_otel_tracer(effective_settings) + if tracer is None: + return None + + def sink(events: list[ControlExecutionEvent]) -> None: + for event in events: + control_event_to_otel_span(event, tracer) + + return sink + + +def configure_otel_event_sink( + sdk_settings: SDKSettings | None = None, +) -> bool: + """Register the OTEL merged-event sink when OTEL settings are present. + + Args: + sdk_settings: Optional explicit SDK settings instance. + + Returns: + ``True`` when the OTEL sink was registered, otherwise ``False``. + """ + effective_settings = sdk_settings or get_settings() + if not is_otel_event_emission_configured(effective_settings): + return False + if has_control_event_sink(): + return False + + sink = create_otel_event_sink(effective_settings) + if sink is None: + return False + + set_control_event_sink(sink) + return True diff --git a/sdks/python/src/agent_control/telemetry/trace_context.py b/sdks/python/src/agent_control/telemetry/trace_context.py index a871fb29..be545725 100644 --- a/sdks/python/src/agent_control/telemetry/trace_context.py +++ b/sdks/python/src/agent_control/telemetry/trace_context.py @@ -38,8 +38,6 @@ def get_trace_context_from_provider() -> TraceContext | None: trace_id = trace_context.get("trace_id") span_id = trace_context.get("span_id") - if not isinstance(trace_id, str) or not isinstance(span_id, str): - return None if not trace_id or not span_id: return None diff --git a/sdks/python/tests/test_event_sink.py b/sdks/python/tests/test_event_sink.py new file mode 100644 index 00000000..8013f4d6 --- /dev/null +++ b/sdks/python/tests/test_event_sink.py @@ -0,0 +1,59 @@ +"""Tests for the telemetry merged control event sink interface.""" + +from datetime import UTC, datetime + +from agent_control.telemetry.event_sink import ( + clear_control_event_sink, + emit_control_events, + set_control_event_sink, +) +from agent_control_models import ControlExecutionEvent + + +def _event() -> ControlExecutionEvent: + return ControlExecutionEvent( + control_execution_id="ce-1", + trace_id="a" * 32, + span_id="b" * 16, + agent_name="test-agent", + control_id=1, + control_name="pii_check", + check_stage="pre", + applies_to="llm_call", + action="allow", + matched=False, + confidence=0.95, + timestamp=datetime.now(UTC), + metadata={}, + ) + + +def teardown_function() -> None: + clear_control_event_sink() + + +def test_emit_control_events_calls_registered_sink() -> None: + seen: list[list[ControlExecutionEvent]] = [] + + def _sink(events: list[ControlExecutionEvent]) -> None: + seen.append(events) + + event = _event() + set_control_event_sink(_sink) + + emit_control_events([event]) + + assert seen == [[event]] + + +def test_emit_control_events_noops_without_sink() -> None: + emit_control_events([_event()]) + + +def test_emit_control_events_swallows_sink_failures() -> None: + def _sink(_events: list[ControlExecutionEvent]) -> None: + raise RuntimeError("boom") + + set_control_event_sink(_sink) + + emit_control_events([_event()]) diff --git a/sdks/python/tests/test_observability_updates.py b/sdks/python/tests/test_observability_updates.py index bb11a5ae..3d8c284c 100644 --- a/sdks/python/tests/test_observability_updates.py +++ b/sdks/python/tests/test_observability_updates.py @@ -1,14 +1,18 @@ -"""Tests for observability updates: event emission, non_matches propagation, applies_to mapping.""" +"""Tests for reconstructed control-execution events in SDK evaluation flows.""" from unittest.mock import AsyncMock, MagicMock, patch import pytest from agent_control import evaluation -from agent_control.evaluation import ( - _ControlAdapter, - _emit_local_events, - _map_applies_to, - _merge_results, +from agent_control.evaluation import _ControlAdapter, _merge_results +from agent_control.evaluation_events import ( + build_control_execution_events, + enqueue_observability_events, + map_applies_to, +) +from agent_control.telemetry.trace_context import ( + clear_trace_context_provider, + set_trace_context_provider, ) from agent_control.telemetry.trace_context import ( clear_trace_context_provider, @@ -16,37 +20,19 @@ ) from agent_control_models import ControlDefinition -# ============================================================================= -# _map_applies_to tests -# ============================================================================= - class TestMapAppliesTo: - """Tests for _map_applies_to helper.""" - def test_maps_tool_to_tool_call(self): - assert _map_applies_to("tool") == "tool_call" + assert map_applies_to("tool") == "tool_call" def test_maps_llm_to_llm_call(self): - assert _map_applies_to("llm") == "llm_call" - - def test_maps_unknown_to_llm_call(self): - """Unknown types default to llm_call (matches server pattern).""" - assert _map_applies_to("unknown") == "llm_call" - assert _map_applies_to("") == "llm_call" - - -# ============================================================================= -# _merge_results tests -# ============================================================================= + assert map_applies_to("llm") == "llm_call" class TestMergeResults: - """Tests for _merge_results combining non_matches.""" - def _make_response(self, **kwargs): - """Create a mock EvaluationResponse.""" from agent_control_models import EvaluationResponse + defaults = { "is_safe": True, "confidence": 1.0, @@ -60,6 +46,7 @@ def _make_response(self, **kwargs): def _make_match(self, control_id, control_name="ctrl", action="allow", matched=True): from agent_control_models import ControlMatch, EvaluatorResult + return ControlMatch( control_id=control_id, control_name=control_name, @@ -67,69 +54,55 @@ def _make_match(self, control_id, control_name="ctrl", action="allow", matched=T result=EvaluatorResult(matched=matched, confidence=0.9), ) - def test_combines_non_matches(self): - """non_matches from both sides should be combined.""" - nm1 = self._make_match(1, "ctrl-1", matched=False) - nm2 = self._make_match(2, "ctrl-2", matched=False) - - local = self._make_response(non_matches=[nm1]) - server = self._make_response(non_matches=[nm2]) - - result = _merge_results(local, server) - assert result.non_matches is not None - assert len(result.non_matches) == 2 - ids = {nm.control_id for nm in result.non_matches} - assert ids == {1, 2} - - def test_non_matches_none_when_both_empty(self): - local = self._make_response() - server = self._make_response() - result = _merge_results(local, server) - assert result.non_matches is None + def test_combines_matches_errors_and_non_matches(self): + local = self._make_response( + matches=[self._make_match(1)], + errors=[self._make_match(2, matched=False)], + ) + server = self._make_response(non_matches=[self._make_match(3, matched=False)]) - def test_non_matches_from_one_side(self): - nm = self._make_match(1, matched=False) - local = self._make_response(non_matches=[nm]) - server = self._make_response() result = _merge_results(local, server) - assert result.non_matches is not None - assert len(result.non_matches) == 1 - def test_still_combines_matches_and_errors(self): - m1 = self._make_match(1, "m1") - m2 = self._make_match(2, "m2") - e1 = self._make_match(3, "e1", matched=False) + assert [match.control_id for match in result.matches or []] == [1] + assert [match.control_id for match in result.errors or []] == [2] + assert [match.control_id for match in result.non_matches or []] == [3] - local = self._make_response(matches=[m1], errors=[e1]) - server = self._make_response(matches=[m2]) - - result = _merge_results(local, server) - assert len(result.matches) == 2 - assert len(result.errors) == 1 +class TestBuildControlExecutionEvents: + def _make_control(self, id, name, condition): + return _ControlAdapter( + id=id, + name=name, + control=ControlDefinition( + execution="sdk", + condition=condition, + action={"decision": "allow"}, + ), + ) -# ============================================================================= -# _emit_local_events tests -# ============================================================================= + def _make_request(self, step_type="llm"): + from agent_control_models import EvaluationRequest + step_input = {"query": "hello"} if step_type == "tool" else "hello" + return EvaluationRequest( + agent_name="agent-000000000001", + step={"type": step_type, "name": "test-step", "input": step_input}, + stage="pre", + ) -class TestEmitLocalEvents: - """Tests for _emit_local_events helper.""" + def _make_match(self, control_id, control_name="ctrl", action="allow", matched=True): + from agent_control_models import ControlMatch, EvaluatorResult - def _make_control_adapter(self, id, name, evaluator_name="regex", selector_path="input"): - """Create a _ControlAdapter for testing.""" - control_def = ControlDefinition( - execution="sdk", - condition={ - "evaluator": {"name": evaluator_name, "config": {"pattern": "test"}}, - "selector": {"path": selector_path}, - }, - action={"decision": "deny"}, + return ControlMatch( + control_id=control_id, + control_name=control_name, + action=action, + result=EvaluatorResult(matched=matched, confidence=0.9), ) - return _ControlAdapter(id=id, name=name, control=control_def) def _make_response(self, matches=None, errors=None, non_matches=None): from agent_control_models import EvaluationResponse + return EvaluationResponse( is_safe=not bool(matches), confidence=1.0 if not matches else 0.5, @@ -138,130 +111,45 @@ def _make_response(self, matches=None, errors=None, non_matches=None): non_matches=non_matches, ) - def _make_match(self, control_id, control_name="ctrl", action="deny", matched=True): - from agent_control_models import ControlMatch, EvaluatorResult - return ControlMatch( - control_id=control_id, - control_name=control_name, - action=action, - result=EvaluatorResult(matched=matched, confidence=0.9), - ) - - def _make_request(self, step_type="llm"): - from agent_control_models import EvaluationRequest - # Tool steps require object input, LLM steps accept string - step_input = {"query": "hello"} if step_type == "tool" else "hello" - return EvaluationRequest( - agent_name="agent-000000000001", - step={"type": step_type, "name": "test-step", "input": step_input}, - stage="pre", - ) - - def test_emits_events_when_observability_enabled(self): - """Should call add_event for each match/error/non_match.""" - from agent_control.evaluation import _emit_local_events - - ctrl = self._make_control_adapter(1, "ctrl-1") - match = self._make_match(1, "ctrl-1") - non_match = self._make_match(2, "ctrl-2", matched=False) - response = self._make_response(matches=[match], non_matches=[non_match]) - request = self._make_request() - - with patch("agent_control.evaluation.is_observability_enabled", return_value=True), \ - patch("agent_control.evaluation.add_event") as mock_add: - _emit_local_events( - response, request, - [ctrl, self._make_control_adapter(2, "ctrl-2")], - "trace123", "span456", "test-agent", - ) - assert mock_add.call_count == 2 - # Verify event fields for the match - event = mock_add.call_args_list[0][0][0] - assert event.trace_id == "trace123" - assert event.span_id == "span456" - assert event.agent_name == "test-agent" - assert event.matched is True - assert event.evaluator_name == "regex" - assert event.selector_path == "input" - - def test_skips_when_observability_disabled(self): - """Should not call add_event when observability is disabled.""" - from agent_control.evaluation import _emit_local_events - - ctrl = self._make_control_adapter(1, "ctrl-1") - match = self._make_match(1, "ctrl-1") - response = self._make_response(matches=[match]) + def test_builds_events_with_trace_context(self): + response = self._make_response(matches=[self._make_match(1, "ctrl-1")]) request = self._make_request() + control_lookup = { + 1: self._make_control( + 1, + "ctrl-1", + { + "evaluator": {"name": "regex", "config": {"pattern": "test"}}, + "selector": {"path": "input"}, + }, + ).control + } - with patch("agent_control.evaluation.is_observability_enabled", return_value=False), \ - patch("agent_control.evaluation.add_event") as mock_add: - _emit_local_events( - response, request, [ctrl], - "trace123", "span456", "test-agent", - ) - mock_add.assert_not_called() - - def test_maps_tool_step_to_tool_call(self): - """Should set applies_to='tool_call' for tool steps.""" - from agent_control.evaluation import _emit_local_events - - ctrl = self._make_control_adapter(1, "ctrl-1") - match = self._make_match(1, "ctrl-1") - response = self._make_response(matches=[match]) - request = self._make_request(step_type="tool") - - with patch("agent_control.evaluation.is_observability_enabled", return_value=True), \ - patch("agent_control.evaluation.add_event") as mock_add: - _emit_local_events( - response, request, [ctrl], - "trace123", "span456", "test-agent", - ) - event = mock_add.call_args_list[0][0][0] - assert event.applies_to == "tool_call" - - def test_uses_fallback_ids_when_trace_context_missing(self): - """Should emit events with all-zero fallback IDs when trace context is absent.""" - import agent_control.evaluation as eval_mod - from agent_control.evaluation import ( - _FALLBACK_SPAN_ID, - _FALLBACK_TRACE_ID, - _emit_local_events, + events = build_control_execution_events( + response, + request, + control_lookup, + "trace123", + "span456", + "test-agent", ) - ctrl = self._make_control_adapter(1, "ctrl-1") - match = self._make_match(1, "ctrl-1") - response = self._make_response(matches=[match]) - request = self._make_request() - - # Reset the once-only warning flag so the warning fires in this test - eval_mod._trace_warning_logged = False + assert len(events) == 1 + event = events[0] + assert event.trace_id == "trace123" + assert event.span_id == "span456" + assert event.agent_name == "test-agent" + assert event.evaluator_name == "regex" + assert event.selector_path == "input" - with patch("agent_control.evaluation.is_observability_enabled", return_value=True), \ - patch("agent_control.evaluation.add_event") as mock_add, \ - patch("agent_control.evaluation._logger") as mock_logger: - _emit_local_events( - response, request, [ctrl], - None, None, "test-agent", - ) - assert mock_add.call_count == 1 - event = mock_add.call_args_list[0][0][0] - assert event.trace_id == _FALLBACK_TRACE_ID - assert event.span_id == _FALLBACK_SPAN_ID - assert event.trace_id == "0" * 32 - assert event.span_id == "0" * 16 - # Warning should have been logged - mock_logger.warning.assert_called_once() - assert "fallback" in mock_logger.warning.call_args[0][0].lower() - - def test_composite_control_emits_representative_leaf_metadata(self): - """Composite local controls should emit stable representative metadata.""" - # Given: a composite local control and a non-match response for that control - ctrl = _ControlAdapter( - id=1, - name="composite-ctrl", - control=ControlDefinition( - execution="sdk", - condition={ + def test_composite_control_uses_representative_observability_identity(self): + response = self._make_response(non_matches=[self._make_match(1, "ctrl-1", matched=False)]) + request = self._make_request() + control_lookup = { + 1: self._make_control( + 1, + "ctrl-1", + { "and": [ { "selector": {"path": "input"}, @@ -273,27 +161,20 @@ def test_composite_control_emits_representative_leaf_metadata(self): }, ] }, - action={"decision": "allow"}, - ), - ) - non_match = self._make_match(1, "composite-ctrl", action="allow", matched=False) - response = self._make_response(non_matches=[non_match]) - request = self._make_request() + ).control + } - # When: emitting local observability events - with patch("agent_control.evaluation.is_observability_enabled", return_value=True), \ - patch("agent_control.evaluation.add_event") as mock_add: - _emit_local_events( - response, - request, - [ctrl], - "trace123", - "span456", - "test-agent", - ) - event = mock_add.call_args_list[0][0][0] + events = build_control_execution_events( + response, + request, + control_lookup, + "trace123", + "span456", + "test-agent", + ) - # Then: the first leaf becomes the event identity and full context is preserved + assert len(events) == 1 + event = events[0] assert event.evaluator_name == "regex" assert event.selector_path == "input" assert event.metadata["primary_evaluator"] == "regex" @@ -302,52 +183,45 @@ def test_composite_control_emits_representative_leaf_metadata(self): assert event.metadata["all_evaluators"] == ["regex"] assert event.metadata["all_selector_paths"] == ["input", "output"] - def test_fallback_warning_logged_only_once(self): - """The missing-trace-context warning should fire only on the first call.""" - import agent_control.evaluation as eval_mod - from agent_control.evaluation import _emit_local_events - - ctrl = self._make_control_adapter(1, "ctrl-1") - match = self._make_match(1, "ctrl-1") - response = self._make_response(matches=[match]) - request = self._make_request() - - eval_mod._trace_warning_logged = False + def test_enqueue_observability_events_uses_existing_batcher(self): + from agent_control_models import ControlExecutionEvent - with patch("agent_control.evaluation.is_observability_enabled", return_value=True), \ - patch("agent_control.evaluation.add_event"), \ - patch("agent_control.evaluation._logger") as mock_logger: - _emit_local_events(response, request, [ctrl], None, None, "agent-test-a1") - _emit_local_events(response, request, [ctrl], None, None, "agent-test-a1") - assert mock_logger.warning.call_count == 1 + events = [ + ControlExecutionEvent( + trace_id="a" * 32, + span_id="b" * 16, + agent_name="agent-000000000001", + control_id=1, + control_name="ctrl-1", + check_stage="pre", + applies_to="llm_call", + action="allow", + matched=False, + confidence=1.0, + ) + ] + with patch("agent_control.evaluation_events.is_observability_enabled", return_value=True), \ + patch("agent_control.evaluation_events.add_event") as mock_add: + enqueue_observability_events(events) -# ============================================================================= -# check_evaluation_with_local event emission + header forwarding -# ============================================================================= + mock_add.assert_called_once_with(events[0]) class TestCheckEvaluationWithLocal: - """Tests for check_evaluation_with_local event emission and non_matches.""" + def teardown_method(self) -> None: + clear_trace_context_provider() def teardown_method(self) -> None: clear_trace_context_provider() @pytest.mark.asyncio - async def test_emits_events_when_trace_context_provided(self): - """Should emit observability events when trace_id and span_id are passed.""" - from agent_control_models import ( - ControlMatch, - EvaluationResponse, - EvaluatorResult, - Step, - ) + async def test_delivers_local_events_in_oss_mode(self): + from agent_control_models import ControlMatch, EvaluationResponse, EvaluatorResult, Step mock_response = EvaluationResponse( is_safe=True, confidence=1.0, - matches=None, - errors=None, non_matches=[ ControlMatch( control_id=1, @@ -357,7 +231,6 @@ async def test_emits_events_when_trace_context_provided(self): ) ], ) - mock_engine = MagicMock() mock_engine.process = AsyncMock(return_value=mock_response) @@ -380,7 +253,7 @@ async def test_emits_events_when_trace_context_provided(self): with patch("agent_control.evaluation.ControlEngine", return_value=mock_engine), \ patch("agent_control.evaluation.list_evaluators", return_value=["regex"]), \ - patch("agent_control.evaluation._emit_local_events") as mock_emit: + patch("agent_control.evaluation.enqueue_observability_events") as mock_enqueue: result = await evaluation.check_evaluation_with_local( client=client, agent_name="agent-000000000001", @@ -392,29 +265,32 @@ async def test_emits_events_when_trace_context_provided(self): event_agent_name="test-agent", ) - mock_emit.assert_called_once() - call_args = mock_emit.call_args - assert call_args[0][2] is not None # local_controls - assert call_args[0][3] == "abc123" # trace_id - assert call_args[0][4] == "def456" # span_id - assert call_args.kwargs["agent_name"] == "test-agent" - - # Also verify non_matches propagated + mock_enqueue.assert_called_once() + delivered_events = mock_enqueue.call_args.args[0] + assert len(delivered_events) == 1 + assert delivered_events[0].trace_id == "abc123" + assert delivered_events[0].span_id == "def456" assert result.non_matches is not None assert len(result.non_matches) == 1 @pytest.mark.asyncio - async def test_emits_events_without_trace_context(self): - """Should resolve trace context from the provider when IDs are omitted.""" - from agent_control_models import EvaluationResponse, Step + async def test_resolves_provider_trace_context_for_local_events(self): + from agent_control_models import ControlMatch, EvaluationResponse, EvaluatorResult, Step mock_response = EvaluationResponse( - is_safe=True, confidence=1.0, matches=None, errors=None, non_matches=None, + is_safe=True, + confidence=1.0, + non_matches=[ + ControlMatch( + control_id=1, + control_name="test-ctrl", + action="allow", + result=EvaluatorResult(matched=False, confidence=0.1), + ) + ], ) - mock_engine = MagicMock() mock_engine.process = AsyncMock(return_value=mock_response) - controls = [{ "id": 1, "name": "test-ctrl", @@ -431,35 +307,27 @@ async def test_emits_events_without_trace_context(self): client = MagicMock() client.http_client = AsyncMock() step = Step(type="llm", name="test-step", input="hello") - set_trace_context_provider( - lambda: { - "trace_id": "a" * 32, - "span_id": "b" * 16, - } - ) + set_trace_context_provider(lambda: {"trace_id": "a" * 32, "span_id": "b" * 16}) with patch("agent_control.evaluation.ControlEngine", return_value=mock_engine), \ patch("agent_control.evaluation.list_evaluators", return_value=["regex"]), \ - patch("agent_control.evaluation._emit_local_events") as mock_emit: + patch("agent_control.evaluation.enqueue_observability_events") as mock_enqueue: await evaluation.check_evaluation_with_local( client=client, agent_name="agent-000000000001", step=step, stage="pre", controls=controls, - # No trace_id/span_id ) - mock_emit.assert_called_once() - call_args = mock_emit.call_args - assert call_args[0][3] == "a" * 32 - assert call_args[0][4] == "b" * 16 + + delivered_events = mock_enqueue.call_args.args[0] + assert delivered_events[0].trace_id == "a" * 32 + assert delivered_events[0].span_id == "b" * 16 @pytest.mark.asyncio - async def test_forwards_trace_headers_to_server(self): - """Server POST should include X-Trace-Id and X-Span-Id headers.""" + async def test_forwards_provider_trace_headers_to_server_when_ids_omitted(self): from agent_control_models import Step - # Only server controls, no local controls controls = [{ "id": 1, "name": "server-ctrl", @@ -487,6 +355,7 @@ async def test_forwards_trace_headers_to_server(self): client.http_client = AsyncMock() client.http_client.post = AsyncMock(return_value=mock_http_response) step = Step(type="llm", name="test-step", input="hello") + set_trace_context_provider(lambda: {"trace_id": "c" * 32, "span_id": "d" * 16}) with patch("agent_control.evaluation.list_evaluators", return_value=["regex"]): await evaluation.check_evaluation_with_local( @@ -495,10 +364,11 @@ async def test_forwards_trace_headers_to_server(self): step=step, stage="pre", controls=controls, - trace_id="aaaa1111bbbb2222cccc3333dddd4444", - span_id="eeee5555ffff6666", ) + headers = client.http_client.post.call_args.kwargs["headers"] + assert headers["X-Trace-Id"] == "c" * 32 + assert headers["X-Span-Id"] == "d" * 16 # Verify POST was called with headers call_kwargs = client.http_client.post.call_args headers = call_kwargs.kwargs.get("headers", {}) @@ -568,45 +438,97 @@ class TestControlDecoratorsNonMatches: """Tests for non_matches dict conversion in control_decorators._evaluate.""" @pytest.mark.asyncio - async def test_non_matches_populated_in_stats(self): - """non_matches should be properly converted to dicts for stats tracking.""" - from agent_control.control_decorators import ControlContext + async def test_merged_event_sink_emits_reconstructed_local_and_server_events_once(self): + from agent_control_models import ControlMatch, EvaluationResponse, EvaluatorResult, Step - # Simulate a result dict with non_matches - result = { + local_response = EvaluationResponse( + is_safe=True, + confidence=1.0, + matches=[ + ControlMatch( + control_id=1, + control_name="local-ctrl", + action="allow", + result=EvaluatorResult(matched=False, confidence=0.8), + ) + ], + ) + server_response = { "is_safe": True, - "confidence": 1.0, - "matches": None, - "errors": None, - "non_matches": [ - { - "control_id": 1, - "control_name": "ctrl-1", - "action": "allow", - "result": {"matched": False, "confidence": 0.1}, - }, + "confidence": 0.9, + "matches": [ { "control_id": 2, - "control_name": "ctrl-2", - "action": "deny", - "result": {"matched": False, "confidence": 0.2}, - }, + "control_name": "server-ctrl", + "action": "allow", + "control_execution_id": "ce-server", + "result": {"matched": False, "confidence": 0.4}, + } ], + "errors": None, + "non_matches": None, } - ctx = ControlContext( - agent_name="test-agent", - server_url="http://localhost:8000", - func=lambda: None, - args=(), - kwargs={}, - trace_id="trace123", - span_id="span456", - start_time=0, - ) + controls = [ + { + "id": 1, + "name": "local-ctrl", + "control": { + "condition": { + "evaluator": {"name": "regex", "config": {"pattern": "test"}}, + "selector": {"path": "input"}, + }, + "action": {"decision": "allow"}, + "execution": "sdk", + }, + }, + { + "id": 2, + "name": "server-ctrl", + "control": { + "condition": { + "evaluator": {"name": "regex", "config": {"pattern": "test"}}, + "selector": {"path": "input"}, + }, + "action": {"decision": "allow"}, + "execution": "server", + }, + }, + ] - ctx._update_stats(result) - assert ctx.total_executions == 2 - assert ctx.total_non_matches == 2 - assert ctx.total_matches == 0 - assert ctx.total_errors == 0 + mock_engine = MagicMock() + mock_engine.process = AsyncMock(return_value=local_response) + mock_http_response = MagicMock() + mock_http_response.raise_for_status = MagicMock() + mock_http_response.json.return_value = server_response + + client = MagicMock() + client.http_client = AsyncMock() + client.http_client.post = AsyncMock(return_value=mock_http_response) + step = Step(type="llm", name="test-step", input="hello") + + with patch("agent_control.evaluation.ControlEngine", return_value=mock_engine), \ + patch("agent_control.evaluation.list_evaluators", return_value=["regex"]), \ + patch("agent_control.evaluation.has_control_event_sink", return_value=True), \ + patch("agent_control.evaluation.emit_control_events") as mock_emit, \ + patch("agent_control.evaluation.enqueue_observability_events") as mock_enqueue: + result = await evaluation.check_evaluation_with_local( + client=client, + agent_name="agent-000000000001", + step=step, + stage="pre", + controls=controls, + trace_id="abc123", + span_id="def456", + event_agent_name="test-agent", + ) + + mock_enqueue.assert_not_called() + mock_emit.assert_called_once() + merged_events = mock_emit.call_args.args[0] + assert len(merged_events) == 2 + assert {event.control_id for event in merged_events} == {1, 2} + headers = client.http_client.post.call_args.kwargs["headers"] + assert headers["X-Agent-Control-Merge-Events"] == "true" + assert result.matches is not None + assert len(result.matches) == 2 diff --git a/sdks/python/tests/test_otel_telemetry.py b/sdks/python/tests/test_otel_telemetry.py new file mode 100644 index 00000000..d93b4680 --- /dev/null +++ b/sdks/python/tests/test_otel_telemetry.py @@ -0,0 +1,140 @@ +"""Tests for OTEL emission support for merged control execution events.""" + +from datetime import UTC, datetime +from unittest.mock import MagicMock, patch + +from agent_control.telemetry.event_sink import clear_control_event_sink +from agent_control.telemetry.otel import ( + configure_otel_event_sink, + control_event_to_otel_attributes, + control_event_to_otel_span, + create_otel_event_sink, + is_otel_event_emission_configured, +) +from agent_control.settings import SDKSettings +from agent_control_models import ControlExecutionEvent + + +class _FakeSpan: + def __init__(self) -> None: + self.ended_with: int | None = None + + def end(self, end_time: int | None = None) -> None: + self.ended_with = end_time + + +class _FakeTracer: + def __init__(self) -> None: + self.calls: list[dict] = [] + self.spans: list[_FakeSpan] = [] + + def start_span(self, **kwargs): + self.calls.append(kwargs) + span = _FakeSpan() + self.spans.append(span) + return span + + +def _make_event(**overrides) -> ControlExecutionEvent: + base = { + "control_execution_id": "ce-123", + "trace_id": "a" * 32, + "span_id": "b" * 16, + "agent_name": "agent-000000000001", + "control_id": 1, + "control_name": "policy-check", + "check_stage": "pre", + "applies_to": "llm_call", + "action": "deny", + "matched": True, + "confidence": 0.91, + "timestamp": datetime(2026, 3, 31, 12, 0, tzinfo=UTC), + "execution_duration_ms": 7.5, + "evaluator_name": "regex", + "selector_path": "input", + "error_message": None, + "metadata": {"leaf_count": 2, "condition_trace": {"kind": "and"}}, + } + base.update(overrides) + return ControlExecutionEvent(**base) + + +def teardown_function() -> None: + clear_control_event_sink() + + +def test_is_otel_event_emission_configured_detects_settings() -> None: + assert is_otel_event_emission_configured(SDKSettings(otel_enabled=True)) is True + assert is_otel_event_emission_configured(SDKSettings(otel_endpoint="http://collector")) is True + assert is_otel_event_emission_configured(SDKSettings()) is False + + +def test_control_event_to_otel_attributes_maps_expected_fields() -> None: + event = _make_event(error_message="blocked") + + attrs = control_event_to_otel_attributes(event) + + assert attrs["gen_ai.system"] == "agent-control" + assert attrs["agent_control.event_type"] == "control_execution" + assert attrs["agent_control.control_execution_id"] == "ce-123" + assert attrs["agent_control.agent_name"] == "agent-000000000001" + assert attrs["agent_control.control_id"] == 1 + assert attrs["agent_control.action"] == "deny" + assert attrs["agent_control.matched"] is True + assert attrs["agent_control.confidence"] == 0.91 + assert attrs["agent_control.evaluator_name"] == "regex" + assert attrs["agent_control.selector_path"] == "input" + assert attrs["agent_control.error_message"] == "blocked" + assert attrs["agent_control.metadata.leaf_count"] == 2 + assert attrs["agent_control.metadata.condition_trace"] == '{"kind": "and"}' + + +def test_control_event_to_otel_span_uses_parent_context_and_timing() -> None: + tracer = _FakeTracer() + event = _make_event() + + with patch("agent_control.telemetry.otel._build_parent_context", return_value="ctx"): + control_event_to_otel_span(event, tracer) + + assert len(tracer.calls) == 1 + call = tracer.calls[0] + assert call["name"] == "control:policy-check" + assert call["context"] == "ctx" + assert call["attributes"]["agent_control.control_name"] == "policy-check" + assert call["start_time"] == int(event.timestamp.timestamp() * 1_000_000_000) + assert tracer.spans[0].ended_with == call["start_time"] + 7_500_000 + + +def test_create_otel_event_sink_uses_configured_tracer() -> None: + tracer = _FakeTracer() + event = _make_event() + + with patch("agent_control.telemetry.otel._ensure_otel_tracer", return_value=tracer): + sink = create_otel_event_sink(SDKSettings(otel_enabled=True)) + + assert sink is not None + with patch("agent_control.telemetry.otel._build_parent_context", return_value=None): + sink([event]) + + assert tracer.calls[0]["attributes"]["agent_control.control_execution_id"] == "ce-123" + + +def test_configure_otel_event_sink_registers_when_no_sink_exists() -> None: + fake_sink = MagicMock() + + with patch("agent_control.telemetry.otel.has_control_event_sink", return_value=False), \ + patch("agent_control.telemetry.otel.create_otel_event_sink", return_value=fake_sink), \ + patch("agent_control.telemetry.otel.set_control_event_sink") as mock_set: + configured = configure_otel_event_sink(SDKSettings(otel_enabled=True)) + + assert configured is True + mock_set.assert_called_once_with(fake_sink) + + +def test_configure_otel_event_sink_does_not_override_existing_sink() -> None: + with patch("agent_control.telemetry.otel.has_control_event_sink", return_value=True), \ + patch("agent_control.telemetry.otel.create_otel_event_sink") as mock_create: + configured = configure_otel_event_sink(SDKSettings(otel_enabled=True)) + + assert configured is False + mock_create.assert_not_called() diff --git a/sdks/typescript/src/generated/funcs/evaluation-evaluate.ts b/sdks/typescript/src/generated/funcs/evaluation-evaluate.ts index 47ec39e4..81862c54 100644 --- a/sdks/typescript/src/generated/funcs/evaluation-evaluate.ts +++ b/sdks/typescript/src/generated/funcs/evaluation-evaluate.ts @@ -109,6 +109,11 @@ async function $do( const headers = new Headers(compactMap({ "Content-Type": "application/json", Accept: "application/json", + "X-Agent-Control-Merge-Events": encodeSimple( + "X-Agent-Control-Merge-Events", + payload["X-Agent-Control-Merge-Events"], + { explode: false, charEncoding: "none" }, + ), "X-Span-Id": encodeSimple("X-Span-Id", payload["X-Span-Id"], { explode: false, charEncoding: "none", diff --git a/sdks/typescript/src/generated/models/operations/evaluate-api-v1-evaluation-post.ts b/sdks/typescript/src/generated/models/operations/evaluate-api-v1-evaluation-post.ts index 026e4065..204841a6 100644 --- a/sdks/typescript/src/generated/models/operations/evaluate-api-v1-evaluation-post.ts +++ b/sdks/typescript/src/generated/models/operations/evaluate-api-v1-evaluation-post.ts @@ -9,6 +9,7 @@ import * as models from "../index.js"; export type EvaluateApiV1EvaluationPostRequest = { xTraceId?: string | null | undefined; xSpanId?: string | null | undefined; + xAgentControlMergeEvents?: string | null | undefined; body: models.EvaluationRequest; }; @@ -16,6 +17,7 @@ export type EvaluateApiV1EvaluationPostRequest = { export type EvaluateApiV1EvaluationPostRequest$Outbound = { "X-Trace-Id"?: string | null | undefined; "X-Span-Id"?: string | null | undefined; + "X-Agent-Control-Merge-Events"?: string | null | undefined; body: models.EvaluationRequest$Outbound; }; @@ -27,12 +29,14 @@ export const EvaluateApiV1EvaluationPostRequest$outboundSchema: z.ZodMiniType< z.object({ xTraceId: z.optional(z.nullable(z.string())), xSpanId: z.optional(z.nullable(z.string())), + xAgentControlMergeEvents: z.optional(z.nullable(z.string())), body: models.EvaluationRequest$outboundSchema, }), z.transform((v) => { return remap$(v, { xTraceId: "X-Trace-Id", xSpanId: "X-Span-Id", + xAgentControlMergeEvents: "X-Agent-Control-Merge-Events", }); }), ); diff --git a/server/src/agent_control_server/endpoints/evaluation.py b/server/src/agent_control_server/endpoints/evaluation.py index c92ea315..68a315b4 100644 --- a/server/src/agent_control_server/endpoints/evaluation.py +++ b/server/src/agent_control_server/endpoints/evaluation.py @@ -156,6 +156,7 @@ async def evaluate( db: AsyncSession = Depends(get_async_db), x_trace_id: str | None = Header(default=None, alias="X-Trace-Id"), x_span_id: str | None = Header(default=None, alias="X-Span-Id"), + x_merge_events: str | None = Header(default=None, alias="X-Agent-Control-Merge-Events"), ) -> EvaluationResponse: """Analyze content for safety and control violations. @@ -238,30 +239,32 @@ async def evaluate( # Calculate total execution time total_duration_ms = (time.perf_counter() - start_time) * 1000 - # Emit observability events if enabled - if observability_settings.enabled: + merge_events_requested = (x_merge_events or "").lower() == "true" + response_events = _build_observability_events( + response=raw_response, + request=request, + trace_id=trace_id, + span_id=span_id, + agent_name=agent_name, + applies_to=applies_to, + control_lookup=control_lookup, + total_duration_ms=total_duration_ms, + ) + + # OSS keeps server-side ingestion as the default. Enterprise merged mode + # returns events to the SDK and skips this server-side delivery step. + if observability_settings.enabled and not merge_events_requested: # Get ingestor from app.state (None if not initialized) try: ingestor = get_event_ingestor(req) except RuntimeError: ingestor = None - - await _emit_observability_events( - response=raw_response, - request=request, - trace_id=trace_id, - span_id=span_id, - agent_name=agent_name, - applies_to=applies_to, - control_lookup=control_lookup, - total_duration_ms=total_duration_ms, - ingestor=ingestor, - ) + await _ingest_observability_events(response_events, ingestor) return _sanitize_evaluation_response(raw_response) -async def _emit_observability_events( +def _build_observability_events( response: EvaluationResponse, request: EvaluationRequest, trace_id: str, @@ -270,12 +273,25 @@ async def _emit_observability_events( applies_to: Literal["llm_call", "tool_call"], control_lookup: dict, total_duration_ms: float, - ingestor: EventIngestor | None, -) -> None: - """Create and enqueue observability events for all evaluated controls. - - Uses control_execution_id from the engine response to ensure correlation - between SDK logs and server observability events. + ) -> list[ControlExecutionEvent]: + """Build observability events for all evaluated controls. + + This preserves the existing server-side event shape while allowing the + merged-event path to skip server-side ingestion and keep the response + lightweight. + + Args: + response: Raw evaluation response from the engine. + request: Original evaluation request. + trace_id: Trace ID to stamp on emitted events. + span_id: Span ID to stamp on emitted events. + agent_name: Agent name to stamp on emitted events. + applies_to: Observability applies_to value derived from the step type. + control_lookup: Controls keyed by control ID. + total_duration_ms: Total request execution duration in milliseconds. + + Returns: + A list of reconstructed server-side control execution events. """ events: list[ControlExecutionEvent] = [] now = datetime.now(UTC) @@ -379,11 +395,45 @@ async def _emit_observability_events( ) ) - # Ingest events - if events and ingestor: - result = await ingestor.ingest(events) - if result.dropped > 0: - _logger.warning( - f"Dropped {result.dropped} observability events, " - f"processed {result.processed}" - ) + return events + + +async def _ingest_observability_events( + events: list[ControlExecutionEvent], + ingestor: EventIngestor | None, +) -> None: + """Ingest server-side observability events when OSS batching is active.""" + if not events or ingestor is None: + return + + result = await ingestor.ingest(events) + if result.dropped > 0: + _logger.warning( + f"Dropped {result.dropped} observability events, " + f"processed {result.processed}" + ) + + +async def _emit_observability_events( + response: EvaluationResponse, + request: EvaluationRequest, + trace_id: str, + span_id: str, + agent_name: str, + applies_to: Literal["llm_call", "tool_call"], + control_lookup: dict, + total_duration_ms: float, + ingestor: EventIngestor | None, +) -> None: + """Backward-compatible wrapper around build + ingest observability helpers.""" + events = _build_observability_events( + response=response, + request=request, + trace_id=trace_id, + span_id=span_id, + agent_name=agent_name, + applies_to=applies_to, + control_lookup=control_lookup, + total_duration_ms=total_duration_ms, + ) + await _ingest_observability_events(events, ingestor) diff --git a/server/tests/test_evaluation_error_handling.py b/server/tests/test_evaluation_error_handling.py index 942dca66..dd2035eb 100644 --- a/server/tests/test_evaluation_error_handling.py +++ b/server/tests/test_evaluation_error_handling.py @@ -3,7 +3,13 @@ import uuid from unittest.mock import AsyncMock, MagicMock -from agent_control_models import ControlMatch, EvaluationRequest, EvaluatorResult, Step +from agent_control_models import ( + ControlExecutionEvent, + ControlMatch, + EvaluationRequest, + EvaluatorResult, + Step, +) from fastapi.testclient import TestClient from agent_control_server.endpoints.evaluation import ( @@ -198,8 +204,10 @@ def test_evaluation_observability_receives_raw_errors_while_api_response_is_sani lambda _config: mock_evaluator, ) - emit_mock = AsyncMock() - monkeypatch.setattr(evaluation_module, "_emit_observability_events", emit_mock) + build_mock = MagicMock(return_value=[]) + ingest_mock = AsyncMock() + monkeypatch.setattr(evaluation_module, "_build_observability_events", build_mock) + monkeypatch.setattr(evaluation_module, "_ingest_observability_events", ingest_mock) monkeypatch.setattr(evaluation_module.observability_settings, "enabled", True) # When: sending an evaluation request @@ -220,8 +228,8 @@ def test_evaluation_observability_receives_raw_errors_while_api_response_is_sani assert data["errors"][0]["result"]["error"] == SAFE_EVALUATOR_ERROR # And: observability receives the raw engine response with unsanitized diagnostics - emit_mock.assert_awaited_once() - raw_response = emit_mock.await_args.kwargs["response"] + build_mock.assert_called_once() + raw_response = build_mock.call_args.kwargs["response"] assert raw_response.errors is not None raw_error = raw_response.errors[0] assert raw_error.control_name == control_name @@ -229,6 +237,7 @@ def test_evaluation_observability_receives_raw_errors_while_api_response_is_sani raw_trace = raw_error.result.metadata["condition_trace"] assert raw_trace["error"] == "RuntimeError: Simulated evaluator crash" assert raw_trace["message"] == "Evaluation failed: RuntimeError: Simulated evaluator crash" + ingest_mock.assert_awaited_once() def test_sanitize_control_match_redacts_nested_condition_trace_errors() -> None: @@ -372,3 +381,43 @@ async def ingest(self, events): # type: ignore[no-untyped-def] del app.state.event_ingestor else: app.state.event_ingestor = previous_ingestor + + +def test_evaluation_skips_ingest_for_merge_mode( + client: TestClient, monkeypatch +) -> None: + """Merged-event mode should skip server-side observability ingestion.""" + agent_name, _ = create_and_assign_policy(client) + + import agent_control_server.endpoints.evaluation as evaluation_module + + event = ControlExecutionEvent( + trace_id="a" * 32, + span_id="b" * 16, + agent_name=agent_name, + control_id=1, + control_name="test-control", + check_stage="pre", + applies_to="llm_call", + action="deny", + matched=True, + confidence=0.9, + ) + build_mock = MagicMock(return_value=[event]) + ingest_mock = AsyncMock() + monkeypatch.setattr(evaluation_module, "_build_observability_events", build_mock) + monkeypatch.setattr(evaluation_module, "_ingest_observability_events", ingest_mock) + monkeypatch.setattr(evaluation_module.observability_settings, "enabled", True) + + payload = Step(type="llm", name="test-step", input="x", output=None) + req = EvaluationRequest(agent_name=agent_name, step=payload, stage="pre") + resp = client.post( + "/api/v1/evaluation", + json=req.model_dump(mode="json"), + headers={"X-Agent-Control-Merge-Events": "true"}, + ) + + assert resp.status_code == 200 + body = resp.json() + assert "events" not in body + ingest_mock.assert_not_awaited()