From 5b80a7a296b6315227b2756504a5830061073de8 Mon Sep 17 00:00:00 2001 From: ashishsk-kv <145089595+ashishsk-kv@users.noreply.github.com> Date: Thu, 19 Mar 2026 11:24:16 +0530 Subject: [PATCH 1/5] feat: Add support for custom metrics in SDK --- examples/07_custom_metrics/custom_metrics.py | 399 +++++++++++++++++++ netra/__init__.py | 77 +++- netra/config.py | 27 ++ netra/meter.py | 203 ++++++++++ pyproject.toml | 1 + uv.lock | 6 +- 6 files changed, 708 insertions(+), 5 deletions(-) create mode 100644 examples/07_custom_metrics/custom_metrics.py create mode 100644 netra/meter.py diff --git a/examples/07_custom_metrics/custom_metrics.py b/examples/07_custom_metrics/custom_metrics.py new file mode 100644 index 0000000..2ab4194 --- /dev/null +++ b/examples/07_custom_metrics/custom_metrics.py @@ -0,0 +1,399 @@ +""" +Netra SDK — Custom Metrics Example + +Demonstrates how to use Netra's OpenTelemetry-based metrics pipeline +to emit counters, histograms, and up/down counters with rich attribute +dimensions. Combines custom metrics with @workflow/@task decorators +to show a realistic AI-service monitoring pattern. + +Prerequisites: + pip install netra-sdk python-dotenv + + export NETRA_API_KEY='your-api-key' + +Usage: + python custom_metrics.py +""" + +import asyncio +import logging +import os +import random +import sys +import time +from typing import Any, Dict, List + +from dotenv import load_dotenv + +from opentelemetry.metrics import Observation + +from netra import Netra +from netra.decorators import agent, task, workflow + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + stream=sys.stdout, +) +logger = logging.getLogger(__name__) + +load_dotenv() + + +# --------------------------------------------------------------------------- +# 1. Initialise the SDK with metrics enabled +# --------------------------------------------------------------------------- + +def init_sdk() -> None: + Netra.init( + app_name="custom-metrics-example", + environment="development", + trace_content=True, + enable_metrics=True, + headers=f"x-api-key={os.getenv('NETRA_API_KEY', 'demo-key')}", + ) + Netra.set_user_id("metrics_demo_user") + Netra.set_session_id("metrics_demo_session") + logger.info("Netra SDK initialised with metrics pipeline enabled") + + +# --------------------------------------------------------------------------- +# 2. Create instruments from a named Meter +# --------------------------------------------------------------------------- + +def create_instruments(): + """Return a dict of OTel instruments scoped to an 'ai_service' meter.""" + meter = Netra.get_meter("ai_service") + + return { + # How many LLM requests were made, sliced by model / status + "llm_requests": meter.create_counter( + name="ai.llm.requests", + description="Total LLM inference requests", + unit="1", + ), + # Tokens consumed per request (input + output) + "token_usage": meter.create_counter( + name="ai.llm.token_usage", + description="Total tokens consumed", + unit="tokens", + ), + # End-to-end latency distribution + "request_latency": meter.create_histogram( + name="ai.llm.request_latency", + description="LLM request latency", + unit="ms", + ), + # Cost per request in USD-micros + "request_cost": meter.create_histogram( + name="ai.llm.request_cost", + description="Estimated cost per LLM request", + unit="USD_micro", + ), + # Current number of in-flight requests (gauge-like) + "inflight_requests": meter.create_up_down_counter( + name="ai.llm.inflight_requests", + description="Currently in-flight LLM requests", + unit="1", + ), + # Pending items in the task queue + "queue_depth": meter.create_up_down_counter( + name="ai.task_queue.depth", + description="Items waiting in the task queue", + unit="1", + ), + } + + +# --------------------------------------------------------------------------- +# 3. Simulated AI service operations that emit metrics +# --------------------------------------------------------------------------- + +MODELS = ["gpt-4o", "claude-sonnet", "gemini-pro"] +COST_PER_1K_TOKENS = {"gpt-4o": 5.0, "claude-sonnet": 3.0, "gemini-pro": 1.25} + + +@task(name="call_llm") # type: ignore[arg-type] +def call_llm( + prompt: str, + model: str, + instruments: Dict[str, Any], +) -> Dict[str, Any]: + """Simulate an LLM call and record metrics around it.""" + attrs = {"model": model} + + instruments["inflight_requests"].add(1, attrs) + instruments["queue_depth"].add(-1, attrs) + + start = time.perf_counter() + try: + latency_ms = random.uniform(80, 600) + time.sleep(latency_ms / 4000) + + input_tokens = len(prompt.split()) * 2 + output_tokens = random.randint(50, 300) + total_tokens = input_tokens + output_tokens + + cost_micro = total_tokens / 1000 * COST_PER_1K_TOKENS.get(model, 2.0) * 1000 + + if random.random() < 0.05: + raise RuntimeError("Simulated transient LLM error") + + instruments["llm_requests"].add(1, {**attrs, "status": "success"}) + instruments["token_usage"].add(input_tokens, {**attrs, "direction": "input"}) + instruments["token_usage"].add(output_tokens, {**attrs, "direction": "output"}) + instruments["request_latency"].record(latency_ms, attrs) + instruments["request_cost"].record(cost_micro, attrs) + + return { + "model": model, + "input_tokens": input_tokens, + "output_tokens": output_tokens, + "latency_ms": round(latency_ms, 2), + "cost_usd": round(cost_micro / 1000, 4), + "response": f"[simulated {model} response to '{prompt[:40]}…']", + } + + except Exception as exc: + instruments["llm_requests"].add(1, {**attrs, "status": "error"}) + elapsed = (time.perf_counter() - start) * 1000 + instruments["request_latency"].record(elapsed, attrs) + raise exc + + finally: + instruments["inflight_requests"].add(-1, attrs) + + +@agent(name="select_model") # type: ignore[arg-type] +def select_model(prompt: str) -> str: + """Pick the best model for a given prompt based on length heuristic.""" + if len(prompt) > 200: + return "gpt-4o" + if len(prompt) > 80: + return "claude-sonnet" + return "gemini-pro" + + +@workflow(name="ai_inference_workflow") # type: ignore[arg-type] +def run_inference( + prompts: List[str], + instruments: Dict[str, Any], +) -> List[Dict[str, Any]]: + """Process a batch of prompts through model selection and LLM calls.""" + logger.info("Starting inference workflow for %d prompts", len(prompts)) + + instruments["queue_depth"].add(len(prompts), {"source": "batch"}) + + results: List[Dict[str, Any]] = [] + for prompt in prompts: + model = select_model(prompt) # type: ignore[misc] + try: + result = call_llm(prompt, model, instruments) # type: ignore[misc] + results.append(result) + except Exception as exc: + logger.warning("Prompt failed: %s", exc) + results.append({"prompt": prompt[:40], "error": str(exc)}) + + successes = sum(1 for r in results if "error" not in r) + logger.info( + "Inference workflow complete: %d/%d succeeded", successes, len(results) + ) + return results + + +# --------------------------------------------------------------------------- +# 4. Observable instruments (callback-based gauges) +# --------------------------------------------------------------------------- + +_simulated_cache_items = 0 + + +def setup_observable_instruments() -> None: + """Register observable gauges that are read on each export cycle.""" + meter = Netra.get_meter("ai_service") + + def _cache_size_callback(_): + global _simulated_cache_items + _simulated_cache_items = random.randint(100, 500) + return [Observation(_simulated_cache_items, {"cache": "prompt_cache"})] + + def _gpu_utilization_callback(_): + return [ + Observation(random.uniform(0.1, 0.95), {"device": "gpu:0"}), + Observation(random.uniform(0.1, 0.95), {"device": "gpu:1"}), + ] + + meter.create_observable_gauge( + name="ai.cache.size", + description="Current prompt-cache entry count", + unit="1", + callbacks=[_cache_size_callback], + ) + + meter.create_observable_gauge( + name="ai.gpu.utilization", + description="GPU utilization ratio (0-1)", + unit="1", + callbacks=[_gpu_utilization_callback], + ) + + logger.info("Observable gauges registered") + + +# --------------------------------------------------------------------------- +# 5. Async example — concurrent requests with shared instruments +# --------------------------------------------------------------------------- + +@task(name="async_llm_call") # type: ignore[arg-type] +async def async_llm_call( + prompt: str, + model: str, + instruments: Dict[str, Any], +) -> Dict[str, Any]: + """Async variant of call_llm for concurrent fan-out.""" + attrs = {"model": model} + instruments["inflight_requests"].add(1, attrs) + instruments["queue_depth"].add(-1, attrs) + + latency_ms = random.uniform(80, 600) + await asyncio.sleep(latency_ms / 4000) + + input_tokens = len(prompt.split()) * 2 + output_tokens = random.randint(50, 300) + cost_micro = (input_tokens + output_tokens) / 1000 * COST_PER_1K_TOKENS.get(model, 2.0) * 1000 + + instruments["llm_requests"].add(1, {**attrs, "status": "success"}) + instruments["token_usage"].add(input_tokens, {**attrs, "direction": "input"}) + instruments["token_usage"].add(output_tokens, {**attrs, "direction": "output"}) + instruments["request_latency"].record(latency_ms, attrs) + instruments["request_cost"].record(cost_micro, attrs) + instruments["inflight_requests"].add(-1, attrs) + + return { + "model": model, + "tokens": input_tokens + output_tokens, + "latency_ms": round(latency_ms, 2), + } + + +@workflow(name="async_inference_workflow") # type: ignore[arg-type] +async def run_async_inference( + prompts: List[str], + instruments: Dict[str, Any], +) -> List[Dict[str, Any]]: + """Fan-out prompts concurrently and collect results.""" + logger.info("Starting async inference for %d prompts", len(prompts)) + instruments["queue_depth"].add(len(prompts), {"source": "async_batch"}) + + tasks = [ + async_llm_call(prompt, select_model(prompt), instruments) # type: ignore[misc] + for prompt in prompts + ] + results = await asyncio.gather(*tasks, return_exceptions=True) + + ok = [r for r in results if isinstance(r, dict)] + logger.info("Async inference complete: %d/%d succeeded", len(ok), len(results)) + return [r if isinstance(r, dict) else {"error": str(r)} for r in results] + + +# --------------------------------------------------------------------------- +# 6. Dedicated-meter example — separate meter per domain +# --------------------------------------------------------------------------- + +def demonstrate_multiple_meters(instruments: Dict[str, Any]) -> None: + """Show that different parts of an app can own independent meters.""" + billing_meter = Netra.get_meter("billing_service") + guardrails_meter = Netra.get_meter("guardrails") + + invoice_total = billing_meter.create_counter( + name="billing.invoice_total", + description="Running invoice total in USD-micros", + unit="USD_micro", + ) + pii_detections = guardrails_meter.create_counter( + name="guardrails.pii_detections", + description="PII detections across inputs", + unit="1", + ) + scan_latency = guardrails_meter.create_histogram( + name="guardrails.scan_latency", + description="Input-scan latency", + unit="ms", + ) + + for _ in range(5): + invoice_total.add(random.randint(500, 5000), {"plan": "pro"}) + pii_detections.add(random.randint(0, 3), {"type": "email"}) + scan_latency.record(random.uniform(1, 15), {"scanner": "presidio"}) + + logger.info("Multiple-meter demo complete") + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main() -> None: + print("=" * 65) + print(" Netra SDK — Custom Metrics Example") + print("=" * 65) + + # Initialise + init_sdk() + instruments = create_instruments() + setup_observable_instruments() + + # --- Sync batch --- + print("\n--- Synchronous batch inference ---") + prompts = [ + "Summarise the key points of this quarterly earnings report.", + "Translate to French: Hello world", + "Write a haiku about distributed systems and observability tooling in production.", + "Explain quantum computing to a five-year-old in simple terms.", + "Generate a SQL query to find the top 10 customers by revenue this quarter.", + ] + sync_results = run_inference(prompts, instruments) # type: ignore[misc] + for r in sync_results: + if "error" in r: + print(f" FAIL: {r}") + else: + print( + f" {r['model']:>15} " + f"tokens={r['input_tokens']+r['output_tokens']:<5} " + f"latency={r['latency_ms']}ms " + f"cost=${r['cost_usd']}" + ) + + # --- Async fan-out --- + print("\n--- Async concurrent inference ---") + async_prompts = [f"Async prompt #{i}: tell me something interesting" for i in range(8)] + async_results = asyncio.run( + run_async_inference(async_prompts, instruments) # type: ignore[misc] + ) + for r in async_results: + if "error" in r: + print(f" FAIL: {r}") + else: + print( + f" {r['model']:>15} " + f"tokens={r['tokens']:<5} " + f"latency={r['latency_ms']}ms" + ) + + # --- Multiple meters --- + print("\n--- Multiple independent meters ---") + demonstrate_multiple_meters(instruments) + print(" Billing and guardrails metrics recorded.") + + # --- Summary --- + print("\n" + "=" * 65) + print(" All metrics have been recorded and will be exported on the") + print(" next flush cycle (default 60 s) or at shutdown.") + print("=" * 65) + + Netra.shutdown() + logger.info("SDK shut down — metrics flushed") + + +if __name__ == "__main__": + main() diff --git a/netra/__init__.py b/netra/__init__.py index 033f973..d272f6e 100644 --- a/netra/__init__.py +++ b/netra/__init__.py @@ -4,6 +4,7 @@ from typing import Any, Dict, List, Optional, Set from opentelemetry import context as context_api +from opentelemetry import metrics as otel_metrics from opentelemetry import trace from opentelemetry.trace import SpanKind @@ -13,6 +14,8 @@ from netra.instrumentation import init_instrumentations from netra.instrumentation.instruments import NetraInstruments from netra.logging_utils import configure_package_logging +from netra.meter import MetricsSetup +from netra.meter import get_meter as _get_meter from netra.prompts import Prompts from netra.session_manager import ConversationType, SessionManager from netra.simulation import Simulation @@ -41,6 +44,7 @@ class Netra: _init_lock = threading.RLock() _root_span = None _root_ctx_token = None + _metrics_enabled = False @classmethod def is_initialized(cls) -> bool: @@ -68,6 +72,9 @@ def init( blocked_spans: Optional[List[str]] = None, instruments: Optional[Set[NetraInstruments]] = None, block_instruments: Optional[Set[NetraInstruments]] = None, + enable_metrics: Optional[bool] = None, + metrics_export_interval_ms: Optional[int] = None, + export_auto_metrics: Optional[bool] = None, ) -> None: """ Thread-safe initialization of Netra. @@ -85,6 +92,9 @@ def init( blocked_spans: List of spans to be blocked instruments: Set of instruments to be enabled block_instruments: Set of instruments to be blocked + enable_metrics: Whether to enable OTLP custom metrics export (default: False) + metrics_export_interval_ms: Metrics push interval in milliseconds (default: 60000) + export_auto_metrics: Whether to export OTel auto-instrumented metrics (default: False) Returns: None @@ -106,6 +116,9 @@ def init( environment=environment, enable_scrubbing=enable_scrubbing, blocked_spans=blocked_spans, + enable_metrics=enable_metrics, + metrics_export_interval_ms=metrics_export_interval_ms, + export_auto_metrics=export_auto_metrics, ) # Configure logging based on debug mode @@ -114,6 +127,14 @@ def init( # Initialize tracer (OTLP exporter, span processor, resource) Tracer(cfg) + # Initialize metrics pipeline when explicitly enabled + if cfg.enable_metrics: + try: + MetricsSetup(cfg) + cls._metrics_enabled = True + except Exception as e: + logger.warning("Failed to initialize metrics pipeline: %s", e, exc_info=True) + # Initialize evaluation client and expose as class attribute try: cls.evaluation = Evaluation(cfg) # type:ignore[attr-defined] @@ -184,7 +205,7 @@ def init( @classmethod def shutdown(cls) -> None: - """Optional cleanup to end the root span and detach context.""" + """Flush all pending telemetry and release SDK resources.""" with cls._init_lock: if cls._root_ctx_token is not None: try: @@ -200,7 +221,7 @@ def shutdown(cls) -> None: pass finally: cls._root_span = None - # Try to flush and shutdown the tracer provider to ensure export + # Flush and shutdown the tracer provider try: provider = trace.get_tracer_provider() if hasattr(provider, "force_flush"): @@ -209,6 +230,56 @@ def shutdown(cls) -> None: provider.shutdown() except Exception: pass + # Flush and shutdown the metrics provider + if cls._metrics_enabled: + try: + meter_provider = otel_metrics.get_meter_provider() + if hasattr(meter_provider, "force_flush"): + meter_provider.force_flush() + if hasattr(meter_provider, "shutdown"): + meter_provider.shutdown() + except Exception: + pass + + @classmethod + def get_meter(cls, name: str = "netra", version: Optional[str] = None) -> otel_metrics.Meter: + """ + Return an OpenTelemetry ``Meter`` for recording custom metrics. + + This follows the same pattern as Datadog, New Relic, and other OTel-compatible + platforms: a named ``Meter`` is obtained from the global metrics API, backed by + whichever ``MeterProvider`` was installed during ``Netra.init()``. + + If ``enable_metrics=False`` was passed to ``init()``, a no-op ``Meter`` is + returned — instrumented code never needs to guard against ``None``. + + Args: + name: Instrumentation scope name, e.g. ``"payment_service"`` or + ``"my_agent"``. Defaults to ``"netra"``. + version: Optional scope version string. + + Returns: + An OTel ``Meter`` instance. + + Example:: + + meter = Netra.get_meter("order_service") + + # Counter — total requests processed + requests = meter.create_counter("order.requests", unit="1") + requests.add(1, {"status": "success"}) + + # Histogram — end-to-end latency + latency = meter.create_histogram("order.latency_ms", unit="ms") + latency.record(42, {"provider": "stripe"}) + + # Gauge (via UpDownCounter) — queue depth + queue_depth = meter.create_up_down_counter("order.queue_depth", unit="1") + queue_depth.add(5) + """ + if not cls._initialized: + logger.warning("Netra.get_meter() called before Netra.init(); returning no-op meter.") + return _get_meter(name, version) @classmethod def set_session_id(cls, session_id: str) -> None: @@ -322,4 +393,4 @@ def start_span( return SpanWrapper(name, attributes, module_name, as_type=as_type) -__all__ = ["Netra", "UsageModel", "ActionModel", "SpanType", "EvaluationScore", "Prompts"] +__all__ = ["Netra", "UsageModel", "ActionModel", "SpanType", "EvaluationScore", "Prompts", "ConversationType"] diff --git a/netra/config.py b/netra/config.py index e98f71d..e72c6dc 100644 --- a/netra/config.py +++ b/netra/config.py @@ -32,6 +32,9 @@ def __init__( environment: Optional[str] = None, enable_scrubbing: Optional[bool] = None, blocked_spans: Optional[List[str]] = None, + enable_metrics: Optional[bool] = None, + metrics_export_interval_ms: Optional[int] = None, + export_auto_metrics: Optional[bool] = None, ): """ Initialize the configuration. @@ -46,6 +49,9 @@ def __init__( resource_attributes: Custom resource attributes dict (e.g., {'env': 'prod', 'version': '1.0.0'}) enable_scrubbing: Whether to enable pydantic logfire scrubbing (default: False) blocked_spans: List of span names (prefix/suffix patterns) to block from export + enable_metrics: Whether to enable custom metrics export via OTLP (default: False) + metrics_export_interval_ms: How often to push metrics to the collector in ms (default: 60000) + export_auto_metrics: Whether to export OTel auto-instrumented system metrics (default: False) """ self.app_name = self._get_app_name(app_name) self.otlp_endpoint = self._get_otlp_endpoint() @@ -60,10 +66,17 @@ def __init__( self.debug_mode = self._get_bool_config(debug_mode, "NETRA_DEBUG", default=False) self.enable_root_span = self._get_bool_config(enable_root_span, "NETRA_ENABLE_ROOT_SPAN", default=False) self.enable_scrubbing = self._get_bool_config(enable_scrubbing, "NETRA_ENABLE_SCRUBBING", default=False) + self.enable_metrics = self._get_bool_config(enable_metrics, "NETRA_ENABLE_METRICS", default=False) + self.export_auto_metrics = self._get_bool_config( + export_auto_metrics, "NETRA_EXPORT_AUTO_METRICS", default=False + ) self.environment = environment or os.getenv("NETRA_ENV", "default") self.resource_attributes = self._get_resource_attributes(resource_attributes) self.blocked_spans = blocked_spans + self.metrics_export_interval_ms = self._get_int_config( + metrics_export_interval_ms, "NETRA_METRICS_EXPORT_INTERVAL", default=60000 + ) self._set_trace_content_env() @@ -131,6 +144,20 @@ def _get_resource_attributes(self, resource_attributes: Optional[Dict[str, Any]] logger.warning(f"Failed to parse NETRA_RESOURCE_ATTRS: {e}") return {} + def _get_int_config(self, param: Optional[int], env_var: str, default: int) -> int: + """Get integer configuration from parameter or environment variable.""" + if param is not None: + return param + + env_value = os.getenv(env_var) + if env_value is None: + return default + + try: + return int(env_value) + except ValueError: + return default + def _set_trace_content_env(self) -> None: """Set TRACELOOP_TRACE_CONTENT environment variable based on trace_content.""" os.environ["TRACELOOP_TRACE_CONTENT"] = "true" if self.trace_content else "false" diff --git a/netra/meter.py b/netra/meter.py new file mode 100644 index 0000000..0c0ec8b --- /dev/null +++ b/netra/meter.py @@ -0,0 +1,203 @@ +import json +import logging +import threading +from typing import Any, Optional + +from google.protobuf.json_format import MessageToDict +from opentelemetry import metrics +from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter, encode_metrics +from opentelemetry.sdk.metrics import ( + Counter, + Histogram, + MeterProvider, + ObservableCounter, + ObservableGauge, + ObservableUpDownCounter, + UpDownCounter, +) +from opentelemetry.sdk.metrics.export import ( + AggregationTemporality, + MetricExportResult, + MetricsData, + PeriodicExportingMetricReader, +) +from opentelemetry.sdk.resources import DEPLOYMENT_ENVIRONMENT, SERVICE_NAME, Resource + +from netra.config import Config + +logger = logging.getLogger(__name__) + +_provider_install_lock = threading.Lock() + +# Map every OTel instrument type to DELTA so the backend receives +# incremental values on each export cycle, matching standard +# observability platform behavior (Datadog, Prometheus pull model, etc.) +# NOTE: Keys must be the SDK instrument classes (opentelemetry.sdk.metrics), +# not the public API classes (opentelemetry.metrics). +_DELTA_TEMPORALITY: dict = { + Counter: AggregationTemporality.DELTA, + UpDownCounter: AggregationTemporality.DELTA, + Histogram: AggregationTemporality.DELTA, + ObservableCounter: AggregationTemporality.DELTA, + ObservableUpDownCounter: AggregationTemporality.DELTA, + ObservableGauge: AggregationTemporality.DELTA, +} + + +class _JsonOTLPMetricExporter(OTLPMetricExporter): + """Thin wrapper that sends OTLP metrics as JSON instead of protobuf. + + The upstream ``OTLPMetricExporter`` serialises to protobuf and sets + ``Content-Type: application/x-protobuf``. The Netra backend currently + only reliably parses the JSON encoding (matching the JS SDK), so this + subclass converts the protobuf ``ExportMetricsServiceRequest`` to its + JSON representation before posting. + """ + + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + self._session.headers["Content-Type"] = "application/json" + + def export( + self, + metrics_data: MetricsData, + timeout_millis: Optional[float] = 10_000, + **kwargs: Any, + ) -> MetricExportResult: + if self._shutdown: + logger.warning("Exporter already shutdown, ignoring batch") + return MetricExportResult.FAILURE + + pb_message = encode_metrics(metrics_data) + payload = MessageToDict(pb_message, preserving_proto_field_name=True) + serialized = json.dumps(payload).encode("utf-8") + + try: + resp = self._session.post( + url=self._endpoint, + data=serialized, + verify=self._certificate_file, + timeout=self._timeout, + cert=getattr(self, "_client_cert", None), + ) + except ConnectionError: + resp = self._session.post( + url=self._endpoint, + data=serialized, + verify=self._certificate_file, + timeout=self._timeout, + cert=getattr(self, "_client_cert", None), + ) + + if resp.ok: + return MetricExportResult.SUCCESS + + logger.error( + "Failed to export metrics batch code: %s, reason: %s", + resp.status_code, + resp.text, + ) + return MetricExportResult.FAILURE + + +class MetricsSetup: + """ + Configures Netra's OpenTelemetry metrics pipeline. + + Sets up a MeterProvider backed by an OTLPMetricExporter that sends + OTLP/HTTP JSON payloads to ``{otlp_endpoint}/v1/metrics`` at a + configurable interval. Delta temporality is used for all instrument + types so the backend receives incremental values per export cycle. + + Usage:: + + # Inside Netra.init() when enable_metrics=True + MetricsSetup(cfg) + + # In application code + meter = Netra.get_meter("my_service") + counter = meter.create_counter("request_count") + counter.add(1, {"route": "/api/health"}) + """ + + def __init__(self, cfg: Config) -> None: + self.cfg = cfg + self._setup_meter() + + def _setup_meter(self) -> None: + """Install a global MeterProvider with an OTLP exporter.""" + if not self.cfg.otlp_endpoint: + logger.warning( + "OTLP endpoint not configured; metrics pipeline will not be started. " + "Set NETRA_OTLP_ENDPOINT or pass otlp_endpoint to Netra.init()." + ) + return + + with _provider_install_lock: + current_provider = metrics.get_meter_provider() + if isinstance(current_provider, MeterProvider): + logger.info("Reusing existing MeterProvider; skipping metrics setup.") + return + + resource_attrs = { + SERVICE_NAME: self.cfg.app_name, + DEPLOYMENT_ENVIRONMENT: self.cfg.environment, + } + if self.cfg.resource_attributes: + resource_attrs.update(self.cfg.resource_attributes) + + resource = Resource(attributes=resource_attrs) + metrics_endpoint = _format_metrics_endpoint(self.cfg.otlp_endpoint) + + exporter = _JsonOTLPMetricExporter( + endpoint=metrics_endpoint, + headers=self.cfg.headers, + preferred_temporality=_DELTA_TEMPORALITY, + ) + + reader = PeriodicExportingMetricReader( + exporter=exporter, + export_interval_millis=self.cfg.metrics_export_interval_ms, + ) + + provider = MeterProvider(resource=resource, metric_readers=[reader]) + metrics.set_meter_provider(provider) + + logger.info( + "Netra metrics pipeline started (JSON): endpoint=%s, interval=%dms", + metrics_endpoint, + self.cfg.metrics_export_interval_ms, + ) + + +def _format_metrics_endpoint(endpoint: str) -> str: + """Append ``/v1/metrics`` to the base OTLP endpoint if not already present.""" + if not endpoint.endswith("/v1/metrics"): + return endpoint.rstrip("/") + "/v1/metrics" + return endpoint + + +def get_meter(name: str = "netra", version: Optional[str] = None) -> metrics.Meter: + """ + Return an OpenTelemetry ``Meter`` from the global metrics API. + + This is the standard OTel pattern used by Datadog, New Relic, and other + platforms: the global API dispatches to whichever MeterProvider was + installed by ``MetricsSetup``. If metrics were not enabled, a no-op + Meter is returned, so instrumented code never needs to check for ``None``. + + Args: + name: Instrumentation scope name — typically your module or service + name, e.g. ``"order_service"`` or ``"netra"``. + version: Optional instrumentation scope version string. + + Returns: + An OTel ``Meter`` instance. + + Example:: + + meter = Netra.get_meter("payment_service") + latency = meter.create_histogram("payment.latency_ms", unit="ms") + latency.record(42, {"provider": "stripe"}) + """ + return metrics.get_meter(name, version or "") diff --git a/pyproject.toml b/pyproject.toml index 4b2658a..a487623 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ keywords = ["netra", "tracing", "observability", "sdk", "ai", "llm", "vector", " dependencies = [ "opentelemetry-api>=1.34.1,<1.40.0", "opentelemetry-sdk>=1.34.1,<1.40.0", + "opentelemetry-exporter-otlp-proto-http>=0.55b1,<1.40.0", "opentelemetry-instrumentation-fastapi>=0.55b1,<=0.60b1", "traceloop-sdk>=0.45.6,<0.49.2", "opentelemetry-instrumentation-httpx>=0.55b1,<=0.60b1", diff --git a/uv.lock b/uv.lock index c452894..cc77764 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 3 +revision = 2 requires-python = ">=3.10, <3.14" resolution-markers = [ "python_full_version >= '3.13'", @@ -1488,12 +1488,13 @@ wheels = [ [[package]] name = "netra-sdk" -version = "0.1.49" +version = "0.1.69" source = { editable = "." } dependencies = [ { name = "dspy" }, { name = "json-repair" }, { name = "opentelemetry-api" }, + { name = "opentelemetry-exporter-otlp-proto-http" }, { name = "opentelemetry-instrumentation-aio-pika" }, { name = "opentelemetry-instrumentation-aiohttp-client" }, { name = "opentelemetry-instrumentation-aiokafka" }, @@ -1556,6 +1557,7 @@ requires-dist = [ { name = "dspy", specifier = ">=3.0.3" }, { name = "json-repair", specifier = "==0.44.1" }, { name = "opentelemetry-api", specifier = ">=1.34.0,<2.0.0" }, + { name = "opentelemetry-exporter-otlp-proto-http", specifier = ">=0.55b1,<2.0.0" }, { name = "opentelemetry-instrumentation-aio-pika", specifier = ">=0.55b1,<1.0.0" }, { name = "opentelemetry-instrumentation-aiohttp-client", specifier = ">=0.55b1,<1.0.0" }, { name = "opentelemetry-instrumentation-aiokafka", specifier = ">=0.55b1,<1.0.0" }, From 8206c1f12611aae548e1db12905d1ff6d797fea4 Mon Sep 17 00:00:00 2001 From: Akash Vijay Date: Mon, 23 Mar 2026 09:26:38 +0530 Subject: [PATCH 2/5] [NET-447] feat: Add root instrument filter processor and default instruments (#227) * [NET-447] feat: Add root instrument filter processor and default instruments * fix: Update the runtime merging of InstrumentSet with static merging --- netra/__init__.py | 28 ++- netra/instrumentation/__init__.py | 18 +- netra/instrumentation/instruments.py | 145 ++++++++++++- netra/processors/__init__.py | 2 + .../instrumentation_span_processor.py | 2 +- .../root_instrument_filter_processor.py | 205 ++++++++++++++++++ netra/tracer.py | 13 +- 7 files changed, 391 insertions(+), 22 deletions(-) create mode 100644 netra/processors/root_instrument_filter_processor.py diff --git a/netra/__init__.py b/netra/__init__.py index d272f6e..9189f2a 100644 --- a/netra/__init__.py +++ b/netra/__init__.py @@ -12,7 +12,7 @@ from netra.dashboard import Dashboard from netra.evaluation import Evaluation from netra.instrumentation import init_instrumentations -from netra.instrumentation.instruments import NetraInstruments +from netra.instrumentation.instruments import DEFAULT_INSTRUMENTS_FOR_ROOT, NetraInstruments from netra.logging_utils import configure_package_logging from netra.meter import MetricsSetup from netra.meter import get_meter as _get_meter @@ -75,6 +75,7 @@ def init( enable_metrics: Optional[bool] = None, metrics_export_interval_ms: Optional[int] = None, export_auto_metrics: Optional[bool] = None, + root_instruments: Optional[Set[NetraInstruments]] = None, ) -> None: """ Thread-safe initialization of Netra. @@ -95,6 +96,12 @@ def init( enable_metrics: Whether to enable OTLP custom metrics export (default: False) metrics_export_interval_ms: Metrics push interval in milliseconds (default: 60000) export_auto_metrics: Whether to export OTel auto-instrumented metrics (default: False) + root_instruments: Set of instruments allowed to produce root-level + spans. When a root span is blocked, its entire subtree is + discarded. Resolution priority: + 1. Explicit ``root_instruments`` value if provided. + 2. The ``instruments`` value if provided (but ``root_instruments`` is not). + 3. ``DEFAULT_INSTRUMENTS_FOR_ROOT`` if neither is provided. Returns: None @@ -124,8 +131,25 @@ def init( # Configure logging based on debug mode configure_package_logging(debug_mode=cfg.debug_mode) + # Resolve root_instruments → set of instrumentation-name strings. + resolved_root: Optional[Set[str]] = None + if root_instruments is not None: + resolved_root = {m.value for m in root_instruments} + elif instruments is not None: + resolved_root = {m.value for m in instruments} + else: + resolved_root = {m.value for m in DEFAULT_INSTRUMENTS_FOR_ROOT} + # Initialize tracer (OTLP exporter, span processor, resource) - Tracer(cfg) + Tracer(cfg, root_instrument_names=resolved_root) + + # Initialize metrics pipeline when explicitly enabled + if cfg.enable_metrics: + try: + MetricsSetup(cfg) + cls._metrics_enabled = True + except Exception as e: + logger.warning("Failed to initialize metrics pipeline: %s", e, exc_info=True) # Initialize metrics pipeline when explicitly enabled if cfg.enable_metrics: diff --git a/netra/instrumentation/__init__.py b/netra/instrumentation/__init__.py index f9ec039..968f90d 100644 --- a/netra/instrumentation/__init__.py +++ b/netra/instrumentation/__init__.py @@ -7,7 +7,7 @@ from traceloop.sdk import Instruments, Telemetry from traceloop.sdk.utils.package_check import is_package_installed -from netra.instrumentation.instruments import CustomInstruments, NetraInstruments +from netra.instrumentation.instruments import DEFAULT_INSTRUMENTS, CustomInstruments, NetraInstruments def init_instrumentations( @@ -18,12 +18,15 @@ def init_instrumentations( ) -> None: from traceloop.sdk.tracing.tracing import init_instrumentations + # When the user does not pass instruments, fall back to the curated default set. + resolved_instruments = instruments if instruments is not None else DEFAULT_INSTRUMENTS + traceloop_instruments = set() traceloop_block_instruments = set() netra_custom_instruments = set() netra_custom_block_instruments = set() - if instruments: - for instrument in instruments: + if resolved_instruments: + for instrument in resolved_instruments: if instrument.origin == CustomInstruments: # type: ignore[attr-defined] netra_custom_instruments.add(getattr(CustomInstruments, instrument.name)) else: @@ -36,18 +39,13 @@ def init_instrumentations( traceloop_block_instruments.add(getattr(Instruments, instrument.name)) # If no instruments in traceloop are provided for instrumentation - if instruments and not traceloop_instruments and not traceloop_block_instruments: + if resolved_instruments and not traceloop_instruments and not traceloop_block_instruments: traceloop_block_instruments = set(Instruments) # If no custom instruments in netra are provided for instrumentation - if instruments and not netra_custom_instruments and not netra_custom_block_instruments: + if resolved_instruments and not netra_custom_instruments and not netra_custom_block_instruments: netra_custom_block_instruments = set(CustomInstruments) - # If no instruments are provided for instrumentation, instrument all instruments - if not instruments and not block_instruments: - traceloop_instruments = set(Instruments) - netra_custom_instruments = set(CustomInstruments) - netra_custom_instruments = netra_custom_instruments - netra_custom_block_instruments traceloop_instruments = traceloop_instruments - traceloop_block_instruments if not traceloop_instruments: diff --git a/netra/instrumentation/instruments.py b/netra/instrumentation/instruments.py index 9a694f7..33b5ba7 100644 --- a/netra/instrumentation/instruments.py +++ b/netra/instrumentation/instruments.py @@ -74,7 +74,7 @@ class CustomInstruments(Enum): CLAUDE_AGENT_SDK = "claude_agent_sdk" -class NetraInstruments(Enum): +class InstrumentSet(Enum): """Custom enum that stores the original enum class in an 'origin' attribute.""" def __new__(cls: Any, value: Any, origin: Any = None) -> Any: @@ -83,16 +83,147 @@ def __new__(cls: Any, value: Any, origin: Any = None) -> Any: member.origin = origin return member + ADK = ("adk", CustomInstruments) + AIOHTTP = ("aiohttp", CustomInstruments) + AIOHTTP_SERVER = ("aiohttp_server", CustomInstruments) + AIO_PIKA = ("aio_pika", CustomInstruments) + AIOKAFKA = ("aiokafka", CustomInstruments) + AIOPG = ("aiopg", CustomInstruments) + ALEPHALPHA = ("alephalpha", Instruments) + ANTHROPIC = ("anthropic", Instruments) + ASGI = ("asgi", CustomInstruments) + ASYNCCLICK = ("asyncclick", CustomInstruments) + ASYNCIO = ("asyncio", CustomInstruments) + ASYNCPG = ("asyncpg", CustomInstruments) + AWS_LAMBDA = ("aws_lambda", CustomInstruments) + BEDROCK = ("bedrock", Instruments) + BOTO = ("boto", CustomInstruments) + BOTO3SQS = ("boto3sqs", CustomInstruments) + BOTOCORE = ("botocore", CustomInstruments) + CARTESIA = ("cartesia", CustomInstruments) + CASSANDRA = ("cassandra", CustomInstruments) + CEREBRAS = ("cerebras", CustomInstruments) + CELERY = ("celery", CustomInstruments) + CHROMA = ("chroma", Instruments) + CLICK = ("click", CustomInstruments) + COHEREAI = ("cohere_ai", CustomInstruments) + CONFLUENT_KAFKA = ("confluent_kafka", CustomInstruments) + CREWAI = ("crewai", Instruments) + DEEPGRAM = ("deepgram", CustomInstruments) + DBAPI = ("dbapi", CustomInstruments) + DJANGO = ("django", CustomInstruments) + DSPY = ("dspy", CustomInstruments) + ELASTICSEARCH = ("elasticsearch", CustomInstruments) + ELEVENLABS = ("elevenlabs", CustomInstruments) + FALCON = ("falcon", CustomInstruments) + FASTAPI = ("fastapi", CustomInstruments) + FLASK = ("flask", CustomInstruments) + GOOGLE_GENERATIVEAI = ("google_genai", CustomInstruments) + GROQ = ("groq", CustomInstruments) + GRPC = ("grpc", CustomInstruments) + HAYSTACK = ("haystack", Instruments) + HTTPX = ("httpx", CustomInstruments) + JINJA2 = ("jinja2", CustomInstruments) + KAFKA_PYTHON = ("kafka_python", CustomInstruments) + LANCEDB = ("lancedb", Instruments) + LANGCHAIN = ("langchain", Instruments) + LITELLM = ("litellm", CustomInstruments) + LLAMA_INDEX = ("llama_index", Instruments) + LOGGING = ("logging", CustomInstruments) + MARQO = ("marqo", Instruments) + MCP = ("mcp", Instruments) + MILVUS = ("milvus", Instruments) + MISTRALAI = ("mistral_ai", CustomInstruments) + MYSQL = ("mysql", CustomInstruments) + MYSQLCLIENT = ("mysqlclient", CustomInstruments) + OLLAMA = ("ollama", Instruments) + OPENAI = ("openai", CustomInstruments) + OPENAI_AGENTS = ("openai_agents", Instruments) + PIKA = ("pika", CustomInstruments) + PINECONE = ("pinecone", Instruments) + PSYCOPG = ("psycopg", CustomInstruments) + PSYCOPG2 = ("psycopg2", CustomInstruments) + PYDANTIC_AI = ("pydantic_ai", CustomInstruments) + PYMEMCACHE = ("pymemcache", CustomInstruments) + PYMONGO = ("pymongo", CustomInstruments) + PYMSSQL = ("pymssql", CustomInstruments) + PYMYSQL = ("pymysql", CustomInstruments) + PYRAMID = ("pyramid", CustomInstruments) + QDRANTDB = ("qdrant_db", CustomInstruments) + REDIS = ("redis", CustomInstruments) + REMOULADE = ("remoulade", CustomInstruments) + REPLICATE = ("replicate", Instruments) + REQUESTS = ("requests", CustomInstruments) + SAGEMAKER = ("sagemaker", Instruments) + SQLALCHEMY = ("sqlalchemy", CustomInstruments) + SQLITE3 = ("sqlite3", CustomInstruments) + STARLETTE = ("starlette", CustomInstruments) + SYSTEM_METRICS = ("system_metrics", CustomInstruments) + THREADING = ("threading", CustomInstruments) + TOGETHER = ("together", Instruments) + TORNADO = ("tornado", CustomInstruments) + TORTOISEORM = ("tortoiseorm", CustomInstruments) + TRANSFORMERS = ("transformers", Instruments) + URLLIB = ("urllib", CustomInstruments) + URLLIB3 = ("urllib3", CustomInstruments) + VERTEXAI = ("vertexai", Instruments) + WATSONX = ("watsonx", Instruments) + WEAVIATEDB = ("weaviate_db", CustomInstruments) + WRITER = ("writer", Instruments) + WSGI = ("wsgi", CustomInstruments) + -merged_members = {} +NetraInstruments = InstrumentSet -for member in Instruments: - merged_members[member.name] = (member.value, Instruments) -for member in CustomInstruments: - merged_members[member.name] = (member.value, CustomInstruments) +# Curated default instrument set used for root_instruments when the user does +# not pass an explicit value. Covers core LLM/AI providers and frameworks. +DEFAULT_INSTRUMENTS_FOR_ROOT = { + InstrumentSet.ANTHROPIC, + InstrumentSet.CARTESIA, + InstrumentSet.COHEREAI, + InstrumentSet.CREWAI, + InstrumentSet.DEEPGRAM, + InstrumentSet.ELEVENLABS, + InstrumentSet.GOOGLE_GENERATIVEAI, + InstrumentSet.ADK, + InstrumentSet.GROQ, + InstrumentSet.LANGCHAIN, + InstrumentSet.LITELLM, + InstrumentSet.CEREBRAS, + InstrumentSet.MISTRALAI, + InstrumentSet.OPENAI, + InstrumentSet.OLLAMA, + InstrumentSet.VERTEXAI, + InstrumentSet.LLAMA_INDEX, + InstrumentSet.PYDANTIC_AI, + InstrumentSet.DSPY, + InstrumentSet.HAYSTACK, + InstrumentSet.BEDROCK, + InstrumentSet.TOGETHER, + InstrumentSet.REPLICATE, + InstrumentSet.ALEPHALPHA, + InstrumentSet.WATSONX, +} -InstrumentSet = NetraInstruments("InstrumentSet", merged_members) +# Broader default instrument set used for the ``instruments`` parameter when +# the user does not pass an explicit value. Includes the root defaults plus +# common vector DBs, HTTP client/server, and database ORM/client libraries. +DEFAULT_INSTRUMENTS = DEFAULT_INSTRUMENTS_FOR_ROOT.union( + { + InstrumentSet.PINECONE, + InstrumentSet.CHROMA, + InstrumentSet.WEAVIATEDB, + InstrumentSet.QDRANTDB, + InstrumentSet.MILVUS, + InstrumentSet.LANCEDB, + InstrumentSet.MARQO, + InstrumentSet.PYMYSQL, + InstrumentSet.REQUESTS, + InstrumentSet.SQLALCHEMY, + InstrumentSet.HTTPX, + } +) ##################################################################################### diff --git a/netra/processors/__init__.py b/netra/processors/__init__.py index fb3b0ee..7c8e43f 100644 --- a/netra/processors/__init__.py +++ b/netra/processors/__init__.py @@ -1,6 +1,7 @@ from netra.processors.instrumentation_span_processor import InstrumentationSpanProcessor from netra.processors.llm_trace_identifier_span_processor import LlmTraceIdentifierSpanProcessor from netra.processors.local_filtering_span_processor import LocalFilteringSpanProcessor +from netra.processors.root_instrument_filter_processor import RootInstrumentFilterProcessor from netra.processors.scrubbing_span_processor import ScrubbingSpanProcessor from netra.processors.session_span_processor import SessionSpanProcessor @@ -10,4 +11,5 @@ "LlmTraceIdentifierSpanProcessor", "ScrubbingSpanProcessor", "LocalFilteringSpanProcessor", + "RootInstrumentFilterProcessor", ] diff --git a/netra/processors/instrumentation_span_processor.py b/netra/processors/instrumentation_span_processor.py index 35e9c81..0ad66d6 100644 --- a/netra/processors/instrumentation_span_processor.py +++ b/netra/processors/instrumentation_span_processor.py @@ -62,7 +62,7 @@ def _get_blocked_url_patterns() -> frozenset[str]: # Pre-computed allowed instrumentation names -_ALLOWED_INSTRUMENTATION_NAMES: Set[str] = {member.value for member in InstrumentSet} # type: ignore[attr-defined] +_ALLOWED_INSTRUMENTATION_NAMES: Set[str] = {member.value for member in InstrumentSet} class InstrumentationSpanProcessor(SpanProcessor): # type: ignore[misc] diff --git a/netra/processors/root_instrument_filter_processor.py b/netra/processors/root_instrument_filter_processor.py new file mode 100644 index 0000000..9b564e4 --- /dev/null +++ b/netra/processors/root_instrument_filter_processor.py @@ -0,0 +1,205 @@ +import logging +import threading +from typing import Optional, Set, cast + +from opentelemetry import context as otel_context +from opentelemetry import trace +from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor +from opentelemetry.trace import INVALID_SPAN_ID + +logger = logging.getLogger(__name__) + +# Attribute written on blocked spans so that the FilteringSpanExporter drops them. +_LOCAL_BLOCKED_ATTR = "netra.local_blocked" + + +class RootInstrumentFilterProcessor(SpanProcessor): # type: ignore[misc] + """Blocks root spans (and their entire subtree) from instrumentations not in + the allowed *root_instruments* set. + + The set stores the **instrumentation name values** (e.g. ``"openai"``, + ``"adk"``, ``"google_genai"``) that are permitted to create root-level spans. + Any root span whose instrumentation name is *not* in this set is marked with + ``netra.local_blocked = True`` and its ``span_id`` is recorded. Child spans + whose parent ``span_id`` appears in the blocked registry inherit the block. + + Args: + allowed_root_instrument_names: Set of instrumentation-name strings + (matching ``InstrumentSet`` member values) that are allowed to + produce root spans. + """ + + def __init__(self, allowed_root_instrument_names: Set[str]) -> None: + """ + Initialize the processor with a set of allowed root instrument names. + + Args: + allowed_root_instrument_names: Set of instrumentation-name strings + (matching ``InstrumentSet`` member values) that are allowed to + produce root spans. + """ + self._allowed: frozenset[str] = frozenset(allowed_root_instrument_names) + # span_id -> True for every span that belongs to a blocked root tree. + self._blocked_span_ids: dict[int, bool] = {} + self._lock = threading.Lock() + + def on_start( + self, + span: Span, + parent_context: Optional[otel_context.Context] = None, + ) -> None: + """ + Called when a span is started. + + Args: + span: The span that is being started. + parent_context: The parent context of the span. + """ + try: + self._process_span_start(span, parent_context) + except Exception: + logger.debug("RootInstrumentFilterProcessor.on_start failed", exc_info=True) + + def on_end(self, span: ReadableSpan) -> None: + """ + Called when a span is ended. + + Args: + span: The span that is being ended. + """ + try: + span_id = self._get_span_id(span) + if span_id is not None: + with self._lock: + self._blocked_span_ids.pop(span_id, None) + except Exception: + pass + + def shutdown(self) -> None: + """ + Called when the processor is shut down. + """ + with self._lock: + self._blocked_span_ids.clear() + + def force_flush(self, timeout_millis: int = 30000) -> bool: + """ + Called when the processor is forced to flush. + + Args: + timeout_millis: The timeout in milliseconds. + + Returns: + True if the flush was successful, False otherwise. + """ + return True + + def _process_span_start( + self, + span: Span, + parent_context: Optional[otel_context.Context], + ) -> None: + """ + Processes the start of a span. + + Args: + span: The span that is being started. + parent_context: The parent context of the span. + """ + parent_span_id = self._resolve_parent_span_id(parent_context) + + if parent_span_id is not None and parent_span_id != INVALID_SPAN_ID: + # This is a child span – inherit blocked status from parent. + with self._lock: + if parent_span_id in self._blocked_span_ids: + own_id = self._get_span_id(span) + if own_id is not None: + self._blocked_span_ids[own_id] = True + self._mark_blocked(span) + return + + # Root span – check instrumentation name against the allow-list. + instr_name = self._extract_instrumentation_name(span) + if instr_name is not None and instr_name not in self._allowed: + own_id = self._get_span_id(span) + if own_id is not None: + with self._lock: + self._blocked_span_ids[own_id] = True + self._mark_blocked(span) + + @staticmethod + def _resolve_parent_span_id( + parent_context: Optional[otel_context.Context], + ) -> Optional[int]: + """ + Return the parent span's ``span_id`` from the supplied context, or ``None``. + + Args: + parent_context: The parent context of the span. + + Returns: + The parent span's ``span_id`` or ``None``. + """ + if parent_context is None: + return None + parent_span = trace.get_current_span(parent_context) + if parent_span is None: + return None + sc = parent_span.get_span_context() + if sc is None: + return None + return cast(Optional[int], sc.span_id) + + @staticmethod + def _get_span_id(span: object) -> Optional[int]: + """ + Get the span ID from the span. + + Args: + span: The span to get the ID from. + + Returns: + The span ID or None. + """ + ctx = getattr(span, "context", None) or getattr(span, "get_span_context", lambda: None)() + if ctx is None: + return None + return cast(Optional[int], getattr(ctx, "span_id", None)) + + @staticmethod + def _mark_blocked(span: Span) -> None: + """ + Mark the span as blocked. + + Args: + span: The span to mark as blocked. + """ + try: + span.set_attribute(_LOCAL_BLOCKED_ATTR, True) + except Exception: + pass + + @staticmethod + def _extract_instrumentation_name(span: Span) -> Optional[str]: + """ + Extract the short instrumentation name from the span's scope. + + Mirrors the logic in ``InstrumentationSpanProcessor._extract_instrumentation_name``. + + Args: + span: The span to extract the instrumentation name from. + + Returns: + The instrumentation name or None. + """ + scope = getattr(span, "instrumentation_scope", None) + if scope is None: + return None + name = getattr(scope, "name", None) + if not isinstance(name, str) or not name: + return None + for prefix in ("opentelemetry.instrumentation.", "netra.instrumentation."): + if name.startswith(prefix): + base = name.rsplit(".", 1)[-1].strip() + return base if base else name + return name diff --git a/netra/tracer.py b/netra/tracer.py index eb3d9c9..1918f32 100644 --- a/netra/tracer.py +++ b/netra/tracer.py @@ -1,6 +1,6 @@ import logging import threading -from typing import Any, Dict +from typing import Any, Dict, Optional, Set from opentelemetry import trace from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter @@ -27,13 +27,18 @@ class Tracer: and appropriate span processors. """ - def __init__(self, cfg: Config) -> None: + def __init__(self, cfg: Config, root_instrument_names: Optional[Set[str]] = None) -> None: """Initialize the Netra tracer with the provided configuration. Args: cfg: Configuration object with tracer settings + root_instrument_names: Optional set of instrumentation-name strings + that are allowed to produce root-level spans. When provided, a + ``RootInstrumentFilterProcessor`` is installed that discards root + spans (and their entire subtree) from all other instrumentations. """ self.cfg = cfg + self._root_instrument_names = root_instrument_names self._setup_tracer() def _setup_tracer(self) -> None: @@ -93,10 +98,14 @@ def _setup_tracer(self) -> None: InstrumentationSpanProcessor, LlmTraceIdentifierSpanProcessor, LocalFilteringSpanProcessor, + RootInstrumentFilterProcessor, ScrubbingSpanProcessor, SessionSpanProcessor, ) + if self._root_instrument_names is not None: + provider.add_span_processor(RootInstrumentFilterProcessor(self._root_instrument_names)) + provider.add_span_processor(LocalFilteringSpanProcessor()) provider.add_span_processor(InstrumentationSpanProcessor()) provider.add_span_processor(SessionSpanProcessor()) From bbabcec202edc7a3dd95113446ff51b1a0deb051 Mon Sep 17 00:00:00 2001 From: Nithish-KV Date: Mon, 23 Mar 2026 14:04:13 +0530 Subject: [PATCH 3/5] [NET-447] refactor: Cleanup and reformat claude agent sdk instrumentation (#231) * [NET-447] refactor: Cleanup and reformat claude agent sdk instrumentation * fix: Update library version --- CHANGELOG.md | 7 ++- examples/07_custom_metrics/custom_metrics.py | 25 +++------ netra/instrumentation/__init__.py | 4 +- .../claude_agent_sdk/__init__.py | 55 ++++++++----------- .../instrumentation/claude_agent_sdk/utils.py | 18 +++--- .../claude_agent_sdk/version.py | 2 +- .../claude_agent_sdk/wrappers.py | 20 ++++--- netra/meter.py | 4 +- .../instrumentation_span_processor.py | 5 +- netra/version.py | 2 +- pyproject.toml | 2 +- 11 files changed, 68 insertions(+), 76 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1a96eb9..97457cb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file. The format is based on Keep a Changelog and this project adheres to Semantic Versioning. +## [0.1.77] - 2026-03-23 + +- Cleanup and refactor claude agent sdk instrumentation + + ## [0.1.76] - 2026-03-19 - Update block instrument functionality to correctly block Redis and SQLAlchemy @@ -211,4 +216,4 @@ The format is based on Keep a Changelog and this project adheres to Semantic Ver - Added utility to set input and output data for any active span in a trace -[0.1.76]: https://github.com/KeyValueSoftwareSystems/netra-sdk-py/tree/main +[0.1.77]: https://github.com/KeyValueSoftwareSystems/netra-sdk-py/tree/main diff --git a/examples/07_custom_metrics/custom_metrics.py b/examples/07_custom_metrics/custom_metrics.py index 2ab4194..60a7900 100644 --- a/examples/07_custom_metrics/custom_metrics.py +++ b/examples/07_custom_metrics/custom_metrics.py @@ -24,7 +24,6 @@ from typing import Any, Dict, List from dotenv import load_dotenv - from opentelemetry.metrics import Observation from netra import Netra @@ -44,6 +43,7 @@ # 1. Initialise the SDK with metrics enabled # --------------------------------------------------------------------------- + def init_sdk() -> None: Netra.init( app_name="custom-metrics-example", @@ -61,6 +61,7 @@ def init_sdk() -> None: # 2. Create instruments from a named Meter # --------------------------------------------------------------------------- + def create_instruments(): """Return a dict of OTel instruments scoped to an 'ai_service' meter.""" meter = Netra.get_meter("ai_service") @@ -195,9 +196,7 @@ def run_inference( results.append({"prompt": prompt[:40], "error": str(exc)}) successes = sum(1 for r in results if "error" not in r) - logger.info( - "Inference workflow complete: %d/%d succeeded", successes, len(results) - ) + logger.info("Inference workflow complete: %d/%d succeeded", successes, len(results)) return results @@ -244,6 +243,7 @@ def _gpu_utilization_callback(_): # 5. Async example — concurrent requests with shared instruments # --------------------------------------------------------------------------- + @task(name="async_llm_call") # type: ignore[arg-type] async def async_llm_call( prompt: str, @@ -285,10 +285,7 @@ async def run_async_inference( logger.info("Starting async inference for %d prompts", len(prompts)) instruments["queue_depth"].add(len(prompts), {"source": "async_batch"}) - tasks = [ - async_llm_call(prompt, select_model(prompt), instruments) # type: ignore[misc] - for prompt in prompts - ] + tasks = [async_llm_call(prompt, select_model(prompt), instruments) for prompt in prompts] # type: ignore[misc] results = await asyncio.gather(*tasks, return_exceptions=True) ok = [r for r in results if isinstance(r, dict)] @@ -300,6 +297,7 @@ async def run_async_inference( # 6. Dedicated-meter example — separate meter per domain # --------------------------------------------------------------------------- + def demonstrate_multiple_meters(instruments: Dict[str, Any]) -> None: """Show that different parts of an app can own independent meters.""" billing_meter = Netra.get_meter("billing_service") @@ -333,6 +331,7 @@ def demonstrate_multiple_meters(instruments: Dict[str, Any]) -> None: # Main # --------------------------------------------------------------------------- + def main() -> None: print("=" * 65) print(" Netra SDK — Custom Metrics Example") @@ -367,18 +366,12 @@ def main() -> None: # --- Async fan-out --- print("\n--- Async concurrent inference ---") async_prompts = [f"Async prompt #{i}: tell me something interesting" for i in range(8)] - async_results = asyncio.run( - run_async_inference(async_prompts, instruments) # type: ignore[misc] - ) + async_results = asyncio.run(run_async_inference(async_prompts, instruments)) # type: ignore[misc] for r in async_results: if "error" in r: print(f" FAIL: {r}") else: - print( - f" {r['model']:>15} " - f"tokens={r['tokens']:<5} " - f"latency={r['latency_ms']}ms" - ) + print(f" {r['model']:>15} " f"tokens={r['tokens']:<5} " f"latency={r['latency_ms']}ms") # --- Multiple meters --- print("\n--- Multiple independent meters ---") diff --git a/netra/instrumentation/__init__.py b/netra/instrumentation/__init__.py index 968f90d..0d073f9 100644 --- a/netra/instrumentation/__init__.py +++ b/netra/instrumentation/__init__.py @@ -60,7 +60,7 @@ def init_instrumentations( Instruments.OPENAI, Instruments.GROQ, Instruments.REDIS, - Instruments.PYMYSQL + Instruments.PYMYSQL, } ) @@ -1366,4 +1366,4 @@ def init_claude_agent_sdk_instrumentation() -> bool: except Exception as e: logging.error(f"Error initializing Claude Agent SDK instrumentor: {e}") Telemetry().log_exception(e) - return False \ No newline at end of file + return False diff --git a/netra/instrumentation/claude_agent_sdk/__init__.py b/netra/instrumentation/claude_agent_sdk/__init__.py index 894137b..47a52b8 100644 --- a/netra/instrumentation/claude_agent_sdk/__init__.py +++ b/netra/instrumentation/claude_agent_sdk/__init__.py @@ -1,22 +1,21 @@ -import wrapt import logging -from opentelemetry.trace import Tracer, get_tracer -from opentelemetry.instrumentation.utils import unwrap +from typing import Any + +import wrapt from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.instrumentation.utils import unwrap +from opentelemetry.trace import Tracer, get_tracer from netra.instrumentation.claude_agent_sdk.version import __version__ -from netra.instrumentation.claude_agent_sdk.wrappers import ( - client_query_wrapper, - client_response_wrapper, - query_wrapper -) +from netra.instrumentation.claude_agent_sdk.wrappers import client_query_wrapper, client_response_wrapper, query_wrapper logger = logging.getLogger(__name__) -_instruments = ("claude_agent_sdk >= 0.1.0", ) +_instruments = ("claude_agent_sdk >= 0.1.0",) + -class NetraClaudeAgentSDKInstrumentor(BaseInstrumentor): - def instrumentation_dependencies(self): +class NetraClaudeAgentSDKInstrumentor(BaseInstrumentor): # type: ignore[misc] + def instrumentation_dependencies(self) -> tuple[str, ...]: """ Return the list of packages required for this instrumentation to function. @@ -27,8 +26,8 @@ def instrumentation_dependencies(self): tuple: A tuple of pip requirement strings for the instrumented library. """ return _instruments - - def _instrument(self, **kwargs): + + def _instrument(self, **kwargs: Any) -> None: """ Set up OpenTelemetry instrumentation for the Claude Agent SDK. @@ -52,7 +51,7 @@ def _instrument(self, **kwargs): self._instrument_client_query(tracer) self._instrument_client_response(tracer) - def _uninstrument(self, **kwargs): + def _uninstrument(self, **kwargs: Any) -> None: """ Remove all custom instrumentation wrappers from the Claude Agent SDK. @@ -66,7 +65,7 @@ def _uninstrument(self, **kwargs): self._uninstrument_client_query() self._uninstrument_client_response() - def _instrument_query(self, tracer: Tracer): + def _instrument_query(self, tracer: Tracer) -> None: """ Wrap InternalClient.process_query with a tracing wrapper. @@ -78,14 +77,12 @@ def _instrument_query(self, tracer: Tracer): """ try: wrapt.wrap_function_wrapper( - "claude_agent_sdk._internal.client", - "InternalClient.process_query", - query_wrapper(tracer) + "claude_agent_sdk._internal.client", "InternalClient.process_query", query_wrapper(tracer) ) except Exception as e: logger.error(f"Failed to instrument claude-agent-sdk query: {e}") - def _instrument_client_query(self, tracer: Tracer): + def _instrument_client_query(self, tracer: Tracer) -> None: """ Wrap ClaudeSDKClient.query to capture the prompt for downstream tracing. @@ -96,15 +93,11 @@ def _instrument_client_query(self, tracer: Tracer): None """ try: - wrapt.wrap_function_wrapper( - "claude_agent_sdk.client", - "ClaudeSDKClient.query", - client_query_wrapper() - ) + wrapt.wrap_function_wrapper("claude_agent_sdk.client", "ClaudeSDKClient.query", client_query_wrapper()) except Exception as e: logger.error(f"Failed to instrument claude-sdk-client query: {e}") - def _instrument_client_response(self, tracer: Tracer): + def _instrument_client_response(self, tracer: Tracer) -> None: """ Wrap ClaudeSDKClient.receive_messages with a tracing wrapper. @@ -116,14 +109,12 @@ def _instrument_client_response(self, tracer: Tracer): """ try: wrapt.wrap_function_wrapper( - "claude_agent_sdk.client", - "ClaudeSDKClient.receive_messages", - client_response_wrapper(tracer) + "claude_agent_sdk.client", "ClaudeSDKClient.receive_messages", client_response_wrapper(tracer) ) except Exception as e: logger.error(f"Failed to instrument claude-sdk-client response: {e}") - def _uninstrument_query(self): + def _uninstrument_query(self) -> None: """ Remove the tracing wrapper from InternalClient.process_query. @@ -138,7 +129,7 @@ def _uninstrument_query(self): except (AttributeError, ModuleNotFoundError): logger.error(f"Failed to uninstrument claude-agent-sdk query") - def _uninstrument_client_query(self): + def _uninstrument_client_query(self) -> None: """ Remove the tracing wrapper from ClaudeSDKClient.query. @@ -153,7 +144,7 @@ def _uninstrument_client_query(self): except (AttributeError, ModuleNotFoundError): logger.error(f"Failed to uninstrument claude-sdk-client query") - def _uninstrument_client_response(self): + def _uninstrument_client_response(self) -> None: """ Remove the tracing wrapper from ClaudeSDKClient.receive_messages. @@ -166,4 +157,4 @@ def _uninstrument_client_response(self): try: unwrap("claude_agent_sdk.client", "ClaudeSDKClient.receive_messages") except (AttributeError, ModuleNotFoundError): - logger.error(f"Failed to uninstrument claude-sdk-client response") \ No newline at end of file + logger.error(f"Failed to uninstrument claude-sdk-client response") diff --git a/netra/instrumentation/claude_agent_sdk/utils.py b/netra/instrumentation/claude_agent_sdk/utils.py index f63af73..96628b8 100644 --- a/netra/instrumentation/claude_agent_sdk/utils.py +++ b/netra/instrumentation/claude_agent_sdk/utils.py @@ -2,20 +2,22 @@ import logging import threading from typing import Any -from opentelemetry.context import Context -from opentelemetry.trace import Span, Tracer -from opentelemetry.semconv_ai import SpanAttributes + from claude_agent_sdk import ( - ClaudeAgentOptions, - SystemMessage, AssistantMessage, - UserMessage, + ClaudeAgentOptions, ResultMessage, + SystemMessage, TextBlock, ThinkingBlock, - ToolUseBlock, ToolResultBlock, + ToolUseBlock, + UserMessage, ) +from opentelemetry.context import Context +from opentelemetry.semconv_ai import SpanAttributes +from opentelemetry.trace import Span, Tracer + from netra.config import Config logger = logging.getLogger(__name__) @@ -67,7 +69,7 @@ def _set_conversation(span: Span, role: str, content: str, prompt_index: int = 0 return prompt_index -def _set_usage(span: Span, usage: dict) -> None: +def _set_usage(span: Span, usage: dict[str, Any]) -> None: """ Write token usage attributes to the span. diff --git a/netra/instrumentation/claude_agent_sdk/version.py b/netra/instrumentation/claude_agent_sdk/version.py index d538f87..5becc17 100644 --- a/netra/instrumentation/claude_agent_sdk/version.py +++ b/netra/instrumentation/claude_agent_sdk/version.py @@ -1 +1 @@ -__version__ = "1.0.0" \ No newline at end of file +__version__ = "1.0.0" diff --git a/netra/instrumentation/claude_agent_sdk/wrappers.py b/netra/instrumentation/claude_agent_sdk/wrappers.py index 25eb13d..cf6cb78 100644 --- a/netra/instrumentation/claude_agent_sdk/wrappers.py +++ b/netra/instrumentation/claude_agent_sdk/wrappers.py @@ -1,17 +1,18 @@ import logging from typing import Any, AsyncIterator, Callable, Tuple + +from claude_agent_sdk import AssistantMessage, ResultMessage, SystemMessage, UserMessage from opentelemetry import trace from opentelemetry.context import Context from opentelemetry.trace import Span, SpanKind, Tracer from opentelemetry.trace.status import Status, StatusCode -from claude_agent_sdk import SystemMessage, AssistantMessage, UserMessage, ResultMessage from netra.instrumentation.claude_agent_sdk.utils import ( + set_assistant_message_attributes, set_request_attributes, + set_result_message_attributes, set_system_message_attributes, - set_assistant_message_attributes, set_user_message_attributes, - set_result_message_attributes, ) logger = logging.getLogger(__name__) @@ -24,9 +25,9 @@ async def _dispatch_messages( tracer: Tracer, root_span: Span, root_ctx: Context, - aiterator: AsyncIterator, + aiterator: AsyncIterator[Any], prompt_index: int, -) -> AsyncIterator: +) -> AsyncIterator[Any]: """ Dispatch each incoming SDK message to its span attribute handler and yield it. @@ -61,7 +62,7 @@ async def _dispatch_messages( yield message -def query_wrapper(tracer: Tracer): +def query_wrapper(tracer: Tracer) -> Callable[..., Any]: """ Return a wrapper that traces a single InternalClient.process_query call with child spans per message. @@ -71,6 +72,7 @@ def query_wrapper(tracer: Tracer): Returns: Callable: An async generator wrapper function for InternalClient.process_query. """ + async def wrapper( wrapped: Callable[..., Any], instance: Any, @@ -126,7 +128,7 @@ async def wrapper( return wrapper -def client_query_wrapper(): +def client_query_wrapper() -> Callable[..., Any]: """ Return a wrapper that captures the prompt from ClaudeSDKClient.query for later tracing. @@ -136,6 +138,7 @@ def client_query_wrapper(): Returns: Callable: An async wrapper function for ClaudeSDKClient.query. """ + async def wrapper( wrapped: Callable[..., Any], instance: Any, @@ -160,7 +163,7 @@ async def wrapper( return wrapper -def client_response_wrapper(tracer: Tracer): +def client_response_wrapper(tracer: Tracer) -> Callable[..., Any]: """ Return a wrapper that traces a full ClaudeSDKClient.receive_messages call covering all messages. @@ -170,6 +173,7 @@ def client_response_wrapper(tracer: Tracer): Returns: Callable: An async generator wrapper function for ClaudeSDKClient.receive_messages. """ + async def wrapper( wrapped: Callable[..., Any], instance: Any, diff --git a/netra/meter.py b/netra/meter.py index 0c0ec8b..ee341b5 100644 --- a/netra/meter.py +++ b/netra/meter.py @@ -34,7 +34,7 @@ # observability platform behavior (Datadog, Prometheus pull model, etc.) # NOTE: Keys must be the SDK instrument classes (opentelemetry.sdk.metrics), # not the public API classes (opentelemetry.metrics). -_DELTA_TEMPORALITY: dict = { +_DELTA_TEMPORALITY: dict[type, AggregationTemporality] = { Counter: AggregationTemporality.DELTA, UpDownCounter: AggregationTemporality.DELTA, Histogram: AggregationTemporality.DELTA, @@ -44,7 +44,7 @@ } -class _JsonOTLPMetricExporter(OTLPMetricExporter): +class _JsonOTLPMetricExporter(OTLPMetricExporter): # type: ignore[misc] """Thin wrapper that sends OTLP metrics as JSON instead of protobuf. The upstream ``OTLPMetricExporter`` serialises to protobuf and sets diff --git a/netra/processors/instrumentation_span_processor.py b/netra/processors/instrumentation_span_processor.py index 0ad66d6..dd249fe 100644 --- a/netra/processors/instrumentation_span_processor.py +++ b/netra/processors/instrumentation_span_processor.py @@ -115,10 +115,7 @@ def _wrap_set_attribute(self, span: Span) -> None: span: The span whose set_attribute method will be wrapped. """ original_set_attribute: SetAttributeFunc = span.set_attribute - instrumentation_name = self._extract_instrumentation_name(span) - # is_httpx = self._is_httpx_instrumentation(instrumentation_name) - - # if is_httpx: + self._extract_instrumentation_name(span) self._check_and_mark_blocked_url(span, original_set_attribute) def wrapped_set_attribute(key: str, value: Any) -> None: diff --git a/netra/version.py b/netra/version.py index a3f3cf2..bd9f1fc 100644 --- a/netra/version.py +++ b/netra/version.py @@ -1 +1 @@ -__version__ = "0.1.76" +__version__ = "0.1.77" diff --git a/pyproject.toml b/pyproject.toml index a487623..a4b33de 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [project] name = "netra-sdk" -version = "0.1.76" +version = "0.1.77" description = "A Python SDK for AI application observability that provides OpenTelemetry-based monitoring, tracing, and PII protection for LLM and vector database applications. Enables easy instrumentation, session tracking, and privacy-focused data collection for AI systems in production environments." authors = [ {name = "Sooraj Thomas",email = "sooraj@keyvalue.systems"} From ff8d1aa83860027e3996c90c6cae73d5e4b4819c Mon Sep 17 00:00:00 2001 From: Akash Vijay Date: Mon, 23 Mar 2026 15:01:21 +0530 Subject: [PATCH 4/5] [NET-447] fix: Bug in allowing netra spawned spans from root instrument filter (#232) --- netra/__init__.py | 8 ----- .../root_instrument_filter_processor.py | 35 +++++++++++++++++-- 2 files changed, 33 insertions(+), 10 deletions(-) diff --git a/netra/__init__.py b/netra/__init__.py index 9189f2a..c301b21 100644 --- a/netra/__init__.py +++ b/netra/__init__.py @@ -143,14 +143,6 @@ def init( # Initialize tracer (OTLP exporter, span processor, resource) Tracer(cfg, root_instrument_names=resolved_root) - # Initialize metrics pipeline when explicitly enabled - if cfg.enable_metrics: - try: - MetricsSetup(cfg) - cls._metrics_enabled = True - except Exception as e: - logger.warning("Failed to initialize metrics pipeline: %s", e, exc_info=True) - # Initialize metrics pipeline when explicitly enabled if cfg.enable_metrics: try: diff --git a/netra/processors/root_instrument_filter_processor.py b/netra/processors/root_instrument_filter_processor.py index 9b564e4..67cad5f 100644 --- a/netra/processors/root_instrument_filter_processor.py +++ b/netra/processors/root_instrument_filter_processor.py @@ -12,6 +12,9 @@ # Attribute written on blocked spans so that the FilteringSpanExporter drops them. _LOCAL_BLOCKED_ATTR = "netra.local_blocked" +# Scope-name prefixes that identify auto-instrumentation libraries. +_INSTRUMENTATION_PREFIXES = ("opentelemetry.instrumentation.", "netra.instrumentation.") + class RootInstrumentFilterProcessor(SpanProcessor): # type: ignore[misc] """Blocks root spans (and their entire subtree) from instrumentations not in @@ -118,7 +121,12 @@ def _process_span_start( self._mark_blocked(span) return - # Root span – check instrumentation name against the allow-list. + # Root span – only apply the allow-list to auto-instrumentation spans. + # Spans created directly through netra (decorators / Netra.start_span) + # use arbitrary tracer names and must never be blocked. + if not self._is_from_instrumentation_library(span): + return + instr_name = self._extract_instrumentation_name(span) if instr_name is not None and instr_name not in self._allowed: own_id = self._get_span_id(span) @@ -179,6 +187,29 @@ def _mark_blocked(span: Span) -> None: except Exception: pass + @staticmethod + def _is_from_instrumentation_library(span: Span) -> bool: + """Return ``True`` if the span originates from a known auto-instrumentation library. + + Spans created by netra decorators or ``Netra.start_span`` use arbitrary + tracer names that do not match the instrumentation naming convention and + will return ``False``. + + Args: + span: The span to check. + + Returns: + ``True`` when the span's instrumentation scope starts with a known + instrumentation prefix, ``False`` otherwise. + """ + scope = getattr(span, "instrumentation_scope", None) + if scope is None: + return False + name = getattr(scope, "name", None) + if not isinstance(name, str) or not name: + return False + return name.startswith(_INSTRUMENTATION_PREFIXES) + @staticmethod def _extract_instrumentation_name(span: Span) -> Optional[str]: """ @@ -198,7 +229,7 @@ def _extract_instrumentation_name(span: Span) -> Optional[str]: name = getattr(scope, "name", None) if not isinstance(name, str) or not name: return None - for prefix in ("opentelemetry.instrumentation.", "netra.instrumentation."): + for prefix in _INSTRUMENTATION_PREFIXES: if name.startswith(prefix): base = name.rsplit(".", 1)[-1].strip() return base if base else name From eb23dc305300f3deacb25c3d3f689f5806c7af6d Mon Sep 17 00:00:00 2001 From: Nithish-KV Date: Thu, 26 Mar 2026 10:20:40 +0530 Subject: [PATCH 5/5] [NET-447] fix: Disable Traceloop requests instrumentation --- CHANGELOG.md | 7 ++++++- netra/instrumentation/__init__.py | 1 + netra/version.py | 2 +- pyproject.toml | 2 +- 4 files changed, 9 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 97457cb..0ac6367 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file. The format is based on Keep a Changelog and this project adheres to Semantic Versioning. +## [0.1.78] - 2026-03-26 + +- Block requests instrumentation from traceloop + + ## [0.1.77] - 2026-03-23 - Cleanup and refactor claude agent sdk instrumentation @@ -216,4 +221,4 @@ The format is based on Keep a Changelog and this project adheres to Semantic Ver - Added utility to set input and output data for any active span in a trace -[0.1.77]: https://github.com/KeyValueSoftwareSystems/netra-sdk-py/tree/main +[0.1.78]: https://github.com/KeyValueSoftwareSystems/netra-sdk-py/tree/main diff --git a/netra/instrumentation/__init__.py b/netra/instrumentation/__init__.py index 0d073f9..d55eea3 100644 --- a/netra/instrumentation/__init__.py +++ b/netra/instrumentation/__init__.py @@ -61,6 +61,7 @@ def init_instrumentations( Instruments.GROQ, Instruments.REDIS, Instruments.PYMYSQL, + Instruments.REQUESTS, } ) diff --git a/netra/version.py b/netra/version.py index bd9f1fc..19a9250 100644 --- a/netra/version.py +++ b/netra/version.py @@ -1 +1 @@ -__version__ = "0.1.77" +__version__ = "0.1.78" diff --git a/pyproject.toml b/pyproject.toml index a4b33de..4b29f2a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [project] name = "netra-sdk" -version = "0.1.77" +version = "0.1.78" description = "A Python SDK for AI application observability that provides OpenTelemetry-based monitoring, tracing, and PII protection for LLM and vector database applications. Enables easy instrumentation, session tracking, and privacy-focused data collection for AI systems in production environments." authors = [ {name = "Sooraj Thomas",email = "sooraj@keyvalue.systems"}