From 8a0383da8ccc43b8e9a5b458a31703d992af21e9 Mon Sep 17 00:00:00 2001 From: Cagri Yonca Date: Tue, 3 Feb 2026 11:29:16 +0100 Subject: [PATCH] feat: Add span filtering feature Signed-off-by: Cagri Yonca --- src/instana/agent/host.py | 87 ++-- .../kafka/confluent_kafka_python.py | 21 +- .../instrumentation/kafka/kafka_python.py | 25 +- src/instana/options.py | 114 +++-- src/instana/util/config.py | 297 +++++++----- src/instana/util/span_utils.py | 103 +++- tests/agent/test_host.py | 54 ++- tests/clients/boto3/test_boto3_dynamodb.py | 12 +- tests/clients/kafka/test_confluent_kafka.py | 78 ++- tests/clients/kafka/test_kafka_python.py | 87 +++- tests/clients/test_redis.py | 49 +- tests/test_options.py | 457 ++++++++++++------ tests/util/test_config.py | 214 ++++---- tests/util/test_config_reader.py | 153 ++++-- tests/util/test_configuration-1.yaml | 57 ++- tests/util/test_configuration-2.yaml | 38 +- tests/util/test_span_utils.py | 130 ++++- 17 files changed, 1350 insertions(+), 626 deletions(-) diff --git a/src/instana/agent/host.py b/src/instana/agent/host.py index 9ecc74ca..112ff6a1 100644 --- a/src/instana/agent/host.py +++ b/src/instana/agent/host.py @@ -22,7 +22,7 @@ from instana.options import StandardOptions from instana.util import to_json from instana.util.runtime import get_py_source, log_runtime_env_info -from instana.util.span_utils import get_operation_specifiers +from instana.util.span_utils import matches_rule from instana.version import VERSION if TYPE_CHECKING: @@ -357,51 +357,58 @@ def report_spans(self, payload: Dict[str, Any]) -> Optional[Response]: def filter_spans(self, spans: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ - Filters given span list using ignore-endpoint variable and returns the list of filtered spans. + Filters span list using new hierarchical filtering rules. """ filtered_spans = [] - endpoint = "" + for span in spans: - if (hasattr(span, "n") or hasattr(span, "name")) and hasattr(span, "data"): - service = span.n - operation_specifier_key, service_specifier_key = ( - get_operation_specifiers(service) - ) - if service == "kafka": - endpoint = span.data[service][service_specifier_key] - method = span.data[service][operation_specifier_key] - if isinstance(method, str) and self.__is_endpoint_ignored( - service, method, endpoint - ): - continue - else: - filtered_spans.append(span) - else: + if not (hasattr(span, "n") or hasattr(span, "name")) or not hasattr( + span, "data" + ): filtered_spans.append(span) + continue + + service_name = "" + + # Set the service name + for span_value in span.data.keys(): + if isinstance(span.data[span_value], dict): + service_name = span_value + + # Set span attributes for filtering + attributes_to_check = { + "type": service_name, + "kind": span.k, + } + + # Add operation specifiers to the attributes + for key, value in span.data[service_name].items(): + attributes_to_check[f"{service_name}.{key}"] = value + + # Check if the span need to be ignored + if self.__is_endpoint_ignored(attributes_to_check): + continue + + filtered_spans.append(span) + return filtered_spans - def __is_endpoint_ignored( - self, - service: str, - method: str = "", - endpoint: str = "", - ) -> bool: - """Check if the given service and endpoint combination should be ignored.""" - service = service.lower() - method = method.lower() - endpoint = endpoint.lower() - filter_rules = [ - f"{service}.{method}", # service.method - f"{service}.*", # service.* - ] - - if service == "kafka" and endpoint: - filter_rules += [ - f"{service}.{method}.{endpoint}", # service.method.endpoint - f"{service}.*.{endpoint}", # service.*.endpoint - f"{service}.{method}.*", # service.method.* - ] - return any(rule in self.options.ignore_endpoints for rule in filter_rules) + def __is_endpoint_ignored(self, span_attributes: dict) -> bool: + filters = self.options.span_filters + if not filters: + return False + + # Check include rules + for rule in filters.get("include", [{}]): + if matches_rule(rule.get("attributes", []), span_attributes): + return False + + # Check exclude rules + for rule in filters.get("exclude", [{}]): + if matches_rule(rule.get("attributes", []), span_attributes): + return True + + return False def handle_agent_tasks(self, task: Dict[str, Any]) -> None: """ diff --git a/src/instana/instrumentation/kafka/confluent_kafka_python.py b/src/instana/instrumentation/kafka/confluent_kafka_python.py index 3b226ec0..e622e661 100644 --- a/src/instana/instrumentation/kafka/confluent_kafka_python.py +++ b/src/instana/instrumentation/kafka/confluent_kafka_python.py @@ -74,10 +74,15 @@ def trace_kafka_produce( # Get the topic from either args or kwargs topic = args[0] if args else kwargs.get("topic", "") + attributes_to_check = { + "type": "kafka", + "kind": "exit", + "kafka.service": topic, + "kafka.access": "produce", + } + is_suppressed = tracer.exporter._HostAgent__is_endpoint_ignored( - "kafka", - "produce", - topic, + attributes_to_check ) with tracer.start_as_current_span( @@ -137,10 +142,14 @@ def create_span( is_suppressed = False if topic: + attributes_to_check = { + "type": "kafka", + "kind": "entry", + "kafka.service": topic, + "kafka.access": span_type, + } is_suppressed = tracer.exporter._HostAgent__is_endpoint_ignored( - "kafka", - span_type, - topic, + attributes_to_check ) if not is_suppressed and headers: diff --git a/src/instana/instrumentation/kafka/kafka_python.py b/src/instana/instrumentation/kafka/kafka_python.py index 25b05e13..fd28677d 100644 --- a/src/instana/instrumentation/kafka/kafka_python.py +++ b/src/instana/instrumentation/kafka/kafka_python.py @@ -39,20 +39,26 @@ def trace_kafka_send( # Get the topic from either args or kwargs topic = args[0] if args else kwargs.get("topic", "") + attributes_to_check = { + "type": "kafka", + "kind": "exit", + "kafka.service": topic, + "kafka.access": "send", + } is_suppressed = tracer.exporter._HostAgent__is_endpoint_ignored( - "kafka", - "send", - topic, + attributes_to_check ) + with tracer.start_as_current_span( "kafka-producer", span_context=parent_context, kind=SpanKind.PRODUCER ) as span: span.set_attribute("kafka.service", topic) span.set_attribute("kafka.access", "send") - # context propagation + # Context propagation headers = kwargs.get("headers", []) + if not is_suppressed and ("x_instana_l_s", b"0") in headers: is_suppressed = True @@ -70,6 +76,7 @@ def trace_kafka_send( if tracer.exporter.options.kafka_trace_correlation: kwargs["headers"] = headers + try: res = wrapped(*args, **kwargs) return res @@ -94,10 +101,14 @@ def create_span( is_suppressed = False if topic: + attributes_to_check = { + "type": "kafka", + "kind": "entry", + "kafka.service": topic, + "kafka.access": span_type, + } is_suppressed = tracer.exporter._HostAgent__is_endpoint_ignored( - "kafka", - span_type, - topic, + attributes_to_check ) if not is_suppressed and headers: diff --git a/src/instana/options.py b/src/instana/options.py index a5651db1..12afc710 100644 --- a/src/instana/options.py +++ b/src/instana/options.py @@ -27,9 +27,10 @@ get_disable_trace_configurations_from_yaml, get_stack_trace_config_from_yaml, is_truthy, - parse_ignored_endpoints, - parse_ignored_endpoints_from_yaml, + parse_filtered_endpoints, + parse_filtered_endpoints_from_yaml, parse_span_disabling, + parse_span_filter_env_vars, parse_technology_stack_trace_config, validate_stack_trace_length, validate_stack_trace_level, @@ -46,7 +47,7 @@ def __init__(self, **kwds: Dict[str, Any]) -> None: self.service_name = determine_service_name() self.extra_http_headers = None self.allow_exit_as_root = False - self.ignore_endpoints = [] + self.span_filters = {} self.kafka_trace_correlation = True # disabled_spans lists all categories and types that should be disabled @@ -108,20 +109,18 @@ def set_trace_configurations(self) -> None: # The priority is as follows: # environment variables > in-code configuration > # > agent config (configuration.yaml) > default value - if "INSTANA_IGNORE_ENDPOINTS" in os.environ: - self.ignore_endpoints = parse_ignored_endpoints( - os.environ["INSTANA_IGNORE_ENDPOINTS"] + if any(k.startswith("INSTANA_TRACING_FILTER_") for k in os.environ): + # Check for new span filtering env vars + parsed_filter = parse_span_filter_env_vars() + if parsed_filter["exclude"] or parsed_filter["include"]: + self.span_filters = parsed_filter + elif "INSTANA_CONFIG_PATH" in os.environ: + self.span_filters = parse_filtered_endpoints_from_yaml( + os.environ["INSTANA_CONFIG_PATH"] ) - elif "INSTANA_IGNORE_ENDPOINTS_PATH" in os.environ: - self.ignore_endpoints = parse_ignored_endpoints_from_yaml( - os.environ["INSTANA_IGNORE_ENDPOINTS_PATH"] - ) - elif ( - isinstance(config.get("tracing"), dict) - and "ignore_endpoints" in config["tracing"] - ): - self.ignore_endpoints = parse_ignored_endpoints( - config["tracing"]["ignore_endpoints"], + elif isinstance(config.get("tracing"), dict) and "filter" in config["tracing"]: + self.span_filters = parse_filtered_endpoints( + config["tracing"]["filter"], ) if "INSTANA_KAFKA_TRACE_CORRELATION" in os.environ: @@ -146,7 +145,8 @@ def _apply_env_stack_trace_config(self) -> None: if "INSTANA_STACK_TRACE_LENGTH" in os.environ: if validated_length := validate_stack_trace_length( - os.environ["INSTANA_STACK_TRACE_LENGTH"], "from INSTANA_STACK_TRACE_LENGTH" + os.environ["INSTANA_STACK_TRACE_LENGTH"], + "from INSTANA_STACK_TRACE_LENGTH", ): self.stack_trace_length = validated_length @@ -161,31 +161,41 @@ def _apply_yaml_stack_trace_config(self) -> None: def _apply_in_code_stack_trace_config(self) -> None: """Apply stack trace configuration from in-code config.""" - if not isinstance(config.get("tracing"), dict) or "global" not in config["tracing"]: + if ( + not isinstance(config.get("tracing"), dict) + or "global" not in config["tracing"] + ): return - + global_config = config["tracing"]["global"] - + if "INSTANA_STACK_TRACE" not in os.environ and "stack_trace" in global_config: - if validated_level := validate_stack_trace_level(global_config["stack_trace"], "from in-code config"): + if validated_level := validate_stack_trace_level( + global_config["stack_trace"], "from in-code config" + ): self.stack_trace_level = validated_level - - if "INSTANA_STACK_TRACE_LENGTH" not in os.environ and "stack_trace_length" in global_config: - if validated_length := validate_stack_trace_length(global_config["stack_trace_length"], "from in-code config"): + + if ( + "INSTANA_STACK_TRACE_LENGTH" not in os.environ + and "stack_trace_length" in global_config + ): + if validated_length := validate_stack_trace_length( + global_config["stack_trace_length"], "from in-code config" + ): self.stack_trace_length = validated_length - + # Technology-specific overrides from in-code config for tech_name, tech_data in config["tracing"].items(): if tech_name == "global" or not isinstance(tech_data, dict): continue - + tech_stack_config = parse_technology_stack_trace_config( tech_data, level_key="stack_trace", length_key="stack_trace_length", - tech_name=tech_name + tech_name=tech_name, ) - + if tech_stack_config: self.stack_trace_technology_config[tech_name] = tech_stack_config @@ -196,7 +206,7 @@ def set_stack_trace_configurations(self) -> None: """ # 1. Environment variables (highest priority) self._apply_env_stack_trace_config() - + # 2. INSTANA_CONFIG_PATH (YAML file) - includes tech-specific overrides if "INSTANA_CONFIG_PATH" in os.environ: self._apply_yaml_stack_trace_config() @@ -261,10 +271,10 @@ def get_stack_trace_config(self, span_name: str) -> Tuple[str, int]: """ Get stack trace configuration for a specific span type. Technology-specific configuration overrides global configuration. - + Args: span_name: The name of the span (e.g., "kafka-producer", "redis", "mysql") - + Returns: Tuple of (level, length) where: - level: "all", "error", or "none" @@ -273,17 +283,17 @@ def get_stack_trace_config(self, span_name: str) -> Tuple[str, int]: # Start with global defaults level = self.stack_trace_level length = self.stack_trace_length - + # Check for technology-specific overrides # Extract base technology name from span name # Examples: "kafka-producer" -> "kafka", "mysql" -> "mysql" tech_name = span_name.split("-")[0] if "-" in span_name else span_name - + if tech_name in self.stack_trace_technology_config: tech_config = self.stack_trace_technology_config[tech_name] level = tech_config.get("level", level) length = tech_config.get("length", length) - + return level, length @@ -331,8 +341,8 @@ def set_tracing(self, tracing: Dict[str, Any]) -> None: @param tracing: tracing configuration dictionary @return: None """ - if "ignore-endpoints" in tracing and not self.ignore_endpoints: - self.ignore_endpoints = parse_ignored_endpoints(tracing["ignore-endpoints"]) + if "filter" in tracing and not self.span_filters: + self.span_filters = parse_filtered_endpoints(tracing["filter"]) if "kafka" in tracing: if ( @@ -361,7 +371,7 @@ def set_tracing(self, tracing: Dict[str, Any]) -> None: # Handle span disabling configuration if "disable" in tracing: self.set_disable_tracing(tracing["disable"]) - + # Handle stack trace configuration from agent config self.set_stack_trace_from_agent(tracing) @@ -375,19 +385,27 @@ def _should_apply_agent_global_config(self) -> bool: has_in_code_config = ( isinstance(config.get("tracing"), dict) and "global" in config["tracing"] - and ("stack_trace" in config["tracing"]["global"] - or "stack_trace_length" in config["tracing"]["global"]) + and ( + "stack_trace" in config["tracing"]["global"] + or "stack_trace_length" in config["tracing"]["global"] + ) ) return not (has_env_vars or has_yaml_config or has_in_code_config) - def _apply_agent_global_stack_trace_config(self, global_config: Dict[str, Any]) -> None: + def _apply_agent_global_stack_trace_config( + self, global_config: Dict[str, Any] + ) -> None: """Apply global stack trace configuration from agent config.""" if "stack-trace" in global_config: - if validated_level := validate_stack_trace_level(global_config["stack-trace"], "in agent config"): + if validated_level := validate_stack_trace_level( + global_config["stack-trace"], "in agent config" + ): self.stack_trace_level = validated_level - + if "stack-trace-length" in global_config: - if validated_length := validate_stack_trace_length(global_config["stack-trace-length"], "in agent config"): + if validated_length := validate_stack_trace_length( + global_config["stack-trace-length"], "in agent config" + ): self.stack_trace_length = validated_length def _apply_agent_tech_stack_trace_config(self, tracing: Dict[str, Any]) -> None: @@ -395,14 +413,14 @@ def _apply_agent_tech_stack_trace_config(self, tracing: Dict[str, Any]) -> None: for tech_name, tech_config in tracing.items(): if tech_name == "global" or not isinstance(tech_config, dict): continue - + tech_stack_config = parse_technology_stack_trace_config( tech_config, level_key="stack-trace", length_key="stack-trace-length", - tech_name=tech_name + tech_name=tech_name, ) - + if tech_stack_config: self.stack_trace_technology_config[tech_name] = tech_stack_config @@ -410,13 +428,13 @@ def set_stack_trace_from_agent(self, tracing: Dict[str, Any]) -> None: """ Set stack trace configuration from agent config (configuration.yaml). Only applies if not already set by higher priority sources. - + @param tracing: tracing configuration dictionary from agent """ # Apply global config if no higher priority source exists if self._should_apply_agent_global_config() and "global" in tracing: self._apply_agent_global_stack_trace_config(tracing["global"]) - + # Apply technology-specific config if not already set by YAML or in-code config if not self.stack_trace_technology_config: self._apply_agent_tech_stack_trace_config(tracing) diff --git a/src/instana/util/config.py b/src/instana/util/config.py index a4bbe7a6..9ec951b0 100644 --- a/src/instana/util/config.py +++ b/src/instana/util/config.py @@ -1,6 +1,5 @@ # (c) Copyright IBM Corp. 2025 -import itertools import os from typing import Any, Dict, List, Sequence, Tuple, Union, Optional @@ -9,9 +8,7 @@ from instana.util.config_reader import ConfigReader # Constants -DEPRECATED_CONFIG_KEY_WARNING = ( - 'Please use "tracing" instead of "com.instana.tracing" for local configuration file.' -) +DEPRECATED_CONFIG_KEY_WARNING = 'Please use "tracing" instead of "com.instana.tracing" for local configuration file.' # List of supported span categories (technology or protocol) SPAN_CATEGORIES = [ @@ -66,7 +63,7 @@ def parse_service_pair(pair: str) -> List[str]: return pair_list -def parse_ignored_endpoints_string(params: Union[str, os.PathLike]) -> List[str]: +def parse_filtered_endpoints_string(params: Union[str, os.PathLike]) -> List[str]: """ Parses a string to prepare a list of ignored endpoints. @@ -74,85 +71,83 @@ def parse_ignored_endpoints_string(params: Union[str, os.PathLike]) -> List[str] - "service1:method1,method2;service2:method3" or "service1;service2" @return: List of strings in format ["service1.method1", "service1.method2", "service2.*"] """ - ignore_endpoints = [] + span_filters = [] if params: service_pairs = params.lower().split(";") for pair in service_pairs: if pair.strip(): - ignore_endpoints += parse_service_pair(pair) - return ignore_endpoints + span_filters += parse_service_pair(pair) + return span_filters -def parse_ignored_endpoints_dict(params: Dict[str, Any]) -> List[str]: +def parse_filtered_endpoints_dict(filter_dict: dict[str, Any]) -> dict[str, list[Any]]: """ - Parses a dictionary to prepare a list of ignored endpoints. + Parses 'exclude' and 'include' blocks from the filter dict. - @param params: Dict format: - - {"service1": ["method1", "method2"], "service2": ["method3"]} - @return: List of strings in format ["service1.method1", "service1.method2", "service2.*"] - """ - ignore_endpoints = [] - - for service, methods in params.items(): - if not methods: # filtering all service - ignore_endpoints.append(f"{service.lower()}.*") - else: # filtering specific endpoints - ignore_endpoints = parse_endpoints_of_service( - ignore_endpoints, service, methods - ) - - return ignore_endpoints - - -def parse_endpoints_of_service( - ignore_endpoints: List[str], - service: str, - methods: Union[str, List[str]], -) -> List[str]: + @param filter_dict: config_reader.data["com.instana.tracing"].get("filter") + @return: Dict containing parsed rules for both exclude and include """ - Parses endpoints of each service. + parsed_config = {"exclude": [], "include": []} - @param ignore_endpoints: A list of rules for endpoints to be filtered. - @param service: The name of the service to be filtered. - @param methods: A list of specific endpoints of the service to be filtered. - """ - if service == "kafka" and isinstance(methods, list): - for rule in methods: - ignore_endpoints.extend(parse_kafka_methods(rule)) - else: - for method in methods: - ignore_endpoints.append(f"{service.lower()}.{method.lower()}") - return ignore_endpoints - - -def parse_kafka_methods(rule: Union[str, Dict[str, any]]) -> List[str]: - parsed_rule = [] - if isinstance(rule, dict): - for method, endpoint in itertools.product(rule["methods"], rule["endpoints"]): - parsed_rule.append(f"kafka.{method.lower()}.{endpoint.lower()}") - elif isinstance(rule, list): - for method in rule: - parsed_rule.append(f"kafka.{method.lower()}.*") - else: - parsed_rule.append(f"kafka.{rule.lower()}.*") - return parsed_rule + if not filter_dict or not isinstance(filter_dict, dict): + return parsed_config + # Disable filtering + if filter_dict.get("deactivate", False): + return parsed_config -def parse_ignored_endpoints(params: Union[Dict[str, Any], str]) -> List[str]: + try: + for mode in ["exclude", "include"]: + raw_filters = filter_dict.get(mode, []) + + if not isinstance(raw_filters, list): + continue + + for item in raw_filters: + entry = { + "name": item.get("name", "unnamed"), + # Add suppression only for exclude mode + "suppression": item.get("suppression", True) + if mode == "exclude" + else None, + "attributes": [], + } + + attributes = item.get("attributes", []) + if isinstance(attributes, list): + for attr in attributes: + attr_data = { + "key": attr.get("key"), + "values": attr.get("values", []), + # match_type default: strict + "match_type": attr.get("match_type", "strict"), + } + entry["attributes"].append(attr_data) + + parsed_config[mode].append(entry) + + return parsed_config + except Exception: + return {"exclude": [], "include": []} + + +def parse_filtered_endpoints( + params: Union[Dict[str, Any], str], +) -> Union[List[str], dict[str, list[Any]]]: """ Parses input to prepare a list for ignored endpoints. @param params: Can be either: - String: "service1:method1,method2;service2:method3" or "service1;service2" - - Dict: {"service1": ["method1", "method2"], "service2": ["method3"]} + - Dict: {"exclude": [{"name": "foo", "attributes": ...}], "include": []} @return: List of strings in format ["service1.method1", "service1.method2", "service2.*"] """ try: if isinstance(params, str): - return parse_ignored_endpoints_string(params) + return parse_filtered_endpoints_string(params) elif isinstance(params, dict): - return parse_ignored_endpoints_dict(params) + return parse_filtered_endpoints_dict(params) else: return [] except Exception as e: @@ -160,7 +155,9 @@ def parse_ignored_endpoints(params: Union[Dict[str, Any], str]) -> List[str]: return [] -def parse_ignored_endpoints_from_yaml(file_path: str) -> List[str]: +def parse_filtered_endpoints_from_yaml( + file_path: str, +) -> Union[List[str], dict[str, list[Any]]]: """ Parses configuration yaml file and prepares a list of ignored endpoints. @@ -168,21 +165,96 @@ def parse_ignored_endpoints_from_yaml(file_path: str) -> List[str]: @return: List of strings in format ["service1.method1", "service1.method2", "service2.*", "kafka.method.topic", "kafka.*.topic", "kafka.method.*"] """ config_reader = ConfigReader(file_path) - ignore_endpoints_dict = None + span_filters_dict = None if "tracing" in config_reader.data: - ignore_endpoints_dict = config_reader.data["tracing"].get("ignore-endpoints") + span_filters_dict = config_reader.data["tracing"].get("filter") elif "com.instana.tracing" in config_reader.data: logger.warning(DEPRECATED_CONFIG_KEY_WARNING) - ignore_endpoints_dict = config_reader.data["com.instana.tracing"].get( - "ignore-endpoints" - ) - if ignore_endpoints_dict: - ignored_endpoints = parse_ignored_endpoints(ignore_endpoints_dict) - return ignored_endpoints + span_filters_dict = config_reader.data["com.instana.tracing"].get("filter") + if span_filters_dict: + span_filters = parse_filtered_endpoints(span_filters_dict) + return span_filters else: return [] +def parse_span_filter_env_vars() -> Dict[str, List[Any]]: + """ + Parses INSTANA_TRACING_FILTER___ATTRIBUTES environment variables. + + @return: Dict containing parsed rules for both exclude and include + """ + parsed_config = {"exclude": [], "include": []} + + # Intermediate storage: { "exclude": { "name": { "suppression": ..., "attributes": [] } } } + intermediate = {"exclude": {}, "include": {}} + + for env_key, env_value in os.environ.items(): + if not env_key.startswith("INSTANA_TRACING_FILTER_"): + continue + + parts = env_key.split("_") + + if len(parts) < 5: + continue + + policy = parts[3].lower() + if policy not in ["exclude", "include"]: + continue + + suffix = parts[-1] + name = "_".join(parts[4:-1]) + + if not name: + continue + + if name not in intermediate[policy]: + intermediate[policy][name] = { + "name": name, + "attributes": [], + "suppression": None, + } + + if suffix == "ATTRIBUTES": + # Rule format: key;values;match_type|key;values;match_type + rules = env_value.split("|") + for rule in rules: + rule_parts = rule.split(";") + if len(rule_parts) < 2: + continue + + key = rule_parts[0].strip() + values_str = rule_parts[1] + match_type = ( + rule_parts[2].strip().lower() if len(rule_parts) > 2 else "strict" + ) + + # Split values by comma (simple split, assuming no commas in values or user handles escaping if needed?) + # Spec says "values": Mandatory - List of Strings. + # Env var examples: "http.target;/health" -> values=["/health"] + # "kafka.service;topic1,topic2;strict" -> values=["topic1", "topic2"] + values = [v.strip() for v in values_str.split(",") if v.strip()] + + attr_data = {"key": key, "values": values, "match_type": match_type} + intermediate[policy][name]["attributes"].append(attr_data) + + elif suffix == "SUPPRESSION" and policy == "exclude": + intermediate[policy][name]["suppression"] = is_truthy(env_value) + + # Convert intermediate to final list format + for mode in ["exclude", "include"]: + for name, data in intermediate[mode].items(): + # If suppression not set for exclude, default to True (as per YAML spec) + if mode == "exclude" and data["suppression"] is None: + data["suppression"] = True + + # Attributes are mandatory + if data["attributes"]: + parsed_config[mode].append(data) + + return parsed_config + + def is_truthy(value: Any) -> bool: """ Check if a value is truthy, accepting various formats. @@ -302,10 +374,10 @@ def get_tracing_root_key(config_data: Dict[str, Any]) -> Optional[str]: """ Get the root key for tracing configuration from config data. Handles both 'tracing' and deprecated 'com.instana.tracing' keys. - + Args: config_data: Configuration data dictionary - + Returns: Root key string or None if not found """ @@ -319,7 +391,7 @@ def get_tracing_root_key(config_data: Dict[str, Any]) -> Optional[str]: def get_disable_trace_configurations_from_yaml() -> Tuple[List[str], List[str]]: config_reader = ConfigReader(os.environ.get("INSTANA_CONFIG_PATH", "")) - + root_key = get_tracing_root_key(config_reader.data) if not root_key: return [], [] @@ -339,18 +411,18 @@ def get_disable_trace_configurations_from_local() -> Tuple[List[str], List[str]] def validate_stack_trace_level(level_value: Any, context: str = "") -> Optional[str]: """ Validate stack trace level value. - + Args: level_value: The level value to validate context: Context string for error messages (e.g., "for kafka", "in agent config") - + Returns: Validated level string ("all", "error", or "none"), or None if invalid """ level = str(level_value).lower() if level in ["all", "error", "none"]: return level - + context_msg = f" {context}" if context else "" logger.warning( f"Invalid stack-trace value{context_msg}: {level}. Must be 'all', 'error', or 'none'. Using default 'all'." @@ -361,11 +433,11 @@ def validate_stack_trace_level(level_value: Any, context: str = "") -> Optional[ def validate_stack_trace_length(length_value: Any, context: str = "") -> Optional[int]: """ Validate stack trace length value. - + Args: length_value: The length value to validate context: Context string for error messages (e.g., "for kafka", "in agent config") - + Returns: Validated length integer (>= 1), or None if invalid """ @@ -373,7 +445,7 @@ def validate_stack_trace_length(length_value: Any, context: str = "") -> Optiona length = int(length_value) if length >= 1: return length - + context_msg = f" {context}" if context else "" logger.warning( f"stack-trace-length{context_msg} must be positive. Using default 30." @@ -395,89 +467,97 @@ def parse_technology_stack_trace_config( ) -> Dict[str, Union[str, int]]: """ Parse technology-specific stack trace configuration from a dictionary. - + Args: tech_data: Dictionary containing stack trace configuration level_key: Key name for level configuration (e.g., "stack-trace" or "stack_trace") length_key: Key name for length configuration (e.g., "stack-trace-length" or "stack_trace_length") tech_name: Technology name for error messages (e.g., "kafka", "redis") - + Returns: Dictionary with "level" and/or "length" keys, or empty dict if no valid config """ tech_stack_config = {} context = f"for {tech_name}" if tech_name else "" - + if level_key in tech_data: if validated_level := validate_stack_trace_level(tech_data[level_key], context): tech_stack_config["level"] = validated_level - + if length_key in tech_data: - if validated_length := validate_stack_trace_length(tech_data[length_key], context): + if validated_length := validate_stack_trace_length( + tech_data[length_key], context + ): tech_stack_config["length"] = validated_length - + return tech_stack_config def parse_global_stack_trace_config(global_config: Dict[str, Any]) -> Tuple[str, int]: """ Parse global stack trace configuration from a config dictionary. - + Args: global_config: Global configuration dictionary - + Returns: Tuple of (level, length) with defaults if not found """ level = "all" length = 30 - + if "stack-trace" in global_config: - if validated_level := validate_stack_trace_level(global_config["stack-trace"], "in YAML config"): + if validated_level := validate_stack_trace_level( + global_config["stack-trace"], "in YAML config" + ): level = validated_level - + if "stack-trace-length" in global_config: - if validated_length := validate_stack_trace_length(global_config["stack-trace-length"], "in YAML config"): + if validated_length := validate_stack_trace_length( + global_config["stack-trace-length"], "in YAML config" + ): length = validated_length - + return level, length def parse_tech_specific_stack_trace_configs( - tracing_data: Dict[str, Any] + tracing_data: Dict[str, Any], ) -> Dict[str, Dict[str, Union[str, int]]]: """ Parse technology-specific stack trace configurations from tracing data. - + Args: tracing_data: Tracing configuration dictionary - + Returns: Dictionary of technology-specific overrides """ tech_config = {} - + for tech_name, tech_data in tracing_data.items(): if tech_name == "global" or not isinstance(tech_data, dict): continue - + tech_stack_config = parse_technology_stack_trace_config( tech_data, level_key="stack-trace", length_key="stack-trace-length", - tech_name=tech_name + tech_name=tech_name, ) - + if tech_stack_config: tech_config[tech_name] = tech_stack_config - + return tech_config -def get_stack_trace_config_from_yaml() -> Tuple[str, int, Dict[str, Dict[str, Union[str, int]]]]: +def get_stack_trace_config_from_yaml() -> ( + Tuple[str, int, Dict[str, Dict[str, Union[str, int]]]] +): """ Get stack trace configuration from YAML file specified by INSTANA_CONFIG_PATH. - + Returns: Tuple of (level, length, tech_config) where: - level: "all", "error", or "none" @@ -486,24 +566,25 @@ def get_stack_trace_config_from_yaml() -> Tuple[str, int, Dict[str, Dict[str, Un Format: {"kafka": {"level": "all", "length": 35}, "redis": {"level": "none"}} """ config_reader = ConfigReader(os.environ.get("INSTANA_CONFIG_PATH", "")) - + level = "all" length = 30 tech_config = {} - + root_key = get_tracing_root_key(config_reader.data) if not root_key: return level, length, tech_config - + tracing_data = config_reader.data[root_key] - + # Read global configuration if "global" in tracing_data: level, length = parse_global_stack_trace_config(tracing_data["global"]) - + # Read technology-specific overrides tech_config = parse_tech_specific_stack_trace_configs(tracing_data) - + return level, length, tech_config + # Made with Bob diff --git a/src/instana/util/span_utils.py b/src/instana/util/span_utils.py index 2dda4759..656e59bb 100644 --- a/src/instana/util/span_utils.py +++ b/src/instana/util/span_utils.py @@ -1,17 +1,90 @@ # (c) Copyright IBM Corp. 2025 -from typing import Tuple - - -def get_operation_specifiers(span_name: str) -> Tuple[str, str]: - """Get the specific operation specifier for the given span.""" - operation_specifier_key = "" - service_specifier_key = "" - if span_name == "redis": - operation_specifier_key = "command" - elif span_name == "dynamodb": - operation_specifier_key = "op" - elif span_name == "kafka": - operation_specifier_key = "access" - service_specifier_key = "service" - return operation_specifier_key, service_specifier_key + +from typing import Any, List + +from instana.util.config import SPAN_TYPE_TO_CATEGORY + + +def matches_rule(rule_attributes: List[Any], span_attributes: List[Any]) -> bool: + """Check if the span attributes match the rule attributes.""" + for attr_rule in rule_attributes: + key = attr_rule.get("key") + target_values = attr_rule.get("values", []) + match_type = attr_rule.get("match_type", "strict") + + rule_matched = False + + if key == "category": + if ( + "type" in span_attributes + and span_attributes["type"] in SPAN_TYPE_TO_CATEGORY + ): + actual = SPAN_TYPE_TO_CATEGORY[span_attributes["type"]] + if actual in target_values: + rule_matched = True + + elif key == "kind": + if "kind" in span_attributes: + actual_kind = get_span_kind(span_attributes["kind"]) + if actual_kind in target_values: + rule_matched = True + + elif key == "type": + if "type" in span_attributes: + if span_attributes["type"] in target_values: + rule_matched = True + + else: + if key in span_attributes: + attribute_value = span_attributes[key] + for target_value in target_values: + if match_key_filter(attribute_value, target_value, match_type): + rule_matched = True + break + + if not rule_matched: + return False + + return True + + +def match_key_filter(first_value: str, second_value: str, match_type: str) -> bool: + """Check if the first value matches the second value based on the match type.""" + if second_value == "*": + return True + elif match_type == "strict" and first_value == second_value: + return True + elif match_type == "contains" and second_value in first_value: + return True + elif match_type == "startswith" and first_value.startswith(second_value): + return True + elif match_type == "endswith" and first_value.endswith(second_value): + return True + + return False + + +def get_span_kind(span_kind: Any) -> str: + res = "intermediate" + + val = span_kind + if hasattr(span_kind, "value"): + val = span_kind.value + + try: + k = int(val) + if k == 1: + res = "entry" + elif k == 2: + res = "exit" + except (ValueError, TypeError): + pass + + if res == "intermediate" and isinstance(span_kind, str): + if span_kind.lower() in ["entry", "server"]: + res = "entry" + if span_kind.lower() in ["exit", "client"]: + res = "exit" + + return res diff --git a/tests/agent/test_host.py b/tests/agent/test_host.py index 613d4478..e89c47ce 100644 --- a/tests/agent/test_host.py +++ b/tests/agent/test_host.py @@ -698,25 +698,53 @@ def test_diagnostics(self, caplog: pytest.LogCaptureFixture) -> None: assert "should_send_snapshot_data: True" in caplog.messages def test_is_service_or_endpoint_ignored(self) -> None: - self.agent.options.ignore_endpoints.append("service1.*") - self.agent.options.ignore_endpoints.append("service2.method1") + self.agent.options.span_filters = { + "include": [], + "exclude": [ + { + "name": "service1-all", + "suppression": True, + "attributes": [ + {"key": "type", "values": ["service1"], "match_type": "strict"} + ], + }, + { + "name": "service2-method1", + "suppression": True, + "attributes": [ + {"key": "type", "values": ["service2"], "match_type": "strict"}, + { + "key": "endpoint", + "values": ["method1"], + "match_type": "strict", + }, + ], + }, + ], + } # ignore all endpoints of service1 - assert self.agent._HostAgent__is_endpoint_ignored("service1") - assert self.agent._HostAgent__is_endpoint_ignored("service1", "method1") - assert self.agent._HostAgent__is_endpoint_ignored("service1", "method2") - - # case-insensitive - assert self.agent._HostAgent__is_endpoint_ignored("SERVICE1") - assert self.agent._HostAgent__is_endpoint_ignored("service1", "METHOD1") + assert self.agent._HostAgent__is_endpoint_ignored({"type": "service1"}) + assert self.agent._HostAgent__is_endpoint_ignored( + {"type": "service1", "endpoint": "method1"} + ) + assert self.agent._HostAgent__is_endpoint_ignored( + {"type": "service1", "endpoint": "method2"} + ) # ignore only endpoint1 of service2 - assert self.agent._HostAgent__is_endpoint_ignored("service2", "method1") - assert not self.agent._HostAgent__is_endpoint_ignored("service2", "method2") + assert self.agent._HostAgent__is_endpoint_ignored( + {"type": "service2", "endpoint": "method1"} + ) + assert not self.agent._HostAgent__is_endpoint_ignored( + {"type": "service2", "endpoint": "method2"} + ) # don't ignore other services - assert not self.agent._HostAgent__is_endpoint_ignored("service3") - assert not self.agent._HostAgent__is_endpoint_ignored("service3") + assert not self.agent._HostAgent__is_endpoint_ignored({"type": "service3"}) + assert not self.agent._HostAgent__is_endpoint_ignored( + {"type": "service3", "endpoint": "method1"} + ) @pytest.mark.parametrize( "input_data", diff --git a/tests/clients/boto3/test_boto3_dynamodb.py b/tests/clients/boto3/test_boto3_dynamodb.py index fad69d66..b90902b0 100644 --- a/tests/clients/boto3/test_boto3_dynamodb.py +++ b/tests/clients/boto3/test_boto3_dynamodb.py @@ -71,8 +71,10 @@ def test_dynamodb_create_table(self) -> None: assert dynamodb_span.data["dynamodb"]["region"] == "us-west-2" assert dynamodb_span.data["dynamodb"]["table"] == "dynamodb-table" - def test_ignore_dynamodb(self) -> None: - os.environ["INSTANA_IGNORE_ENDPOINTS"] = "dynamodb" + def test_filter_dynamodb(self) -> None: + os.environ["INSTANA_TRACING_FILTER_EXCLUDE_DYNAMODB_ATTRIBUTES"] = ( + "dynamodb.op;*;strict" + ) agent.options = StandardOptions() with self.tracer.start_as_current_span("test"): @@ -95,8 +97,10 @@ def test_ignore_dynamodb(self) -> None: assert dynamodb_span not in filtered_spans - def test_ignore_create_table(self) -> None: - os.environ["INSTANA_IGNORE_ENDPOINTS"] = "dynamodb:createtable" + def test_filter_create_table(self) -> None: + os.environ["INSTANA_TRACING_FILTER_EXCLUDE_DYNAMODB_ATTRIBUTES"] = ( + "dynamodb.op;CreateTable;strict" + ) agent.options = StandardOptions() with self.tracer.start_as_current_span("test"): diff --git a/tests/clients/kafka/test_confluent_kafka.py b/tests/clients/kafka/test_confluent_kafka.py index b8913649..d632265d 100644 --- a/tests/clients/kafka/test_confluent_kafka.py +++ b/tests/clients/kafka/test_confluent_kafka.py @@ -25,7 +25,7 @@ from instana.options import StandardOptions from instana.singletons import agent, get_tracer from instana.span.span import InstanaSpan -from instana.util.config import parse_ignored_endpoints_from_yaml +from instana.util.config import parse_filtered_endpoints_from_yaml from tests.helpers import get_first_span_by_filter, testenv @@ -283,8 +283,11 @@ def test_trace_confluent_kafka_error(self) -> None: == "num_messages must be between 0 and 1000000 (1M)" ) - @patch.dict(os.environ, {"INSTANA_IGNORE_ENDPOINTS": "kafka"}) - def test_ignore_confluent_kafka(self) -> None: + @patch.dict( + os.environ, + {"INSTANA_TRACING_FILTER_EXCLUDE_KAFKA_ATTRIBUTES": "type;kafka;strict"}, + ) + def test_filter_confluent_kafka(self) -> None: agent.options.set_trace_configurations() with self.tracer.start_as_current_span("test"): self.producer.produce(testenv["kafka_topic"], b"raw_bytes") @@ -296,8 +299,13 @@ def test_ignore_confluent_kafka(self) -> None: filtered_spans = agent.filter_spans(spans) assert len(filtered_spans) == 1 - @patch.dict(os.environ, {"INSTANA_IGNORE_ENDPOINTS": "kafka:produce"}) - def test_ignore_confluent_kafka_producer(self) -> None: + @patch.dict( + os.environ, + { + "INSTANA_TRACING_FILTER_EXCLUDE_KAFKA_PRODUCER_ATTRIBUTES": "type;kafka;strict|kafka.access;produce;strict" + }, + ) + def test_filter_confluent_kafka_producer(self) -> None: agent.options.set_trace_configurations() with self.tracer.start_as_current_span("test-span"): # Produce some events @@ -322,8 +330,13 @@ def test_ignore_confluent_kafka_producer(self) -> None: filtered_spans = agent.filter_spans(spans) assert len(filtered_spans) == 1 - @patch.dict(os.environ, {"INSTANA_IGNORE_ENDPOINTS": "kafka:consume"}) - def test_ignore_confluent_kafka_consumer(self) -> None: + @patch.dict( + os.environ, + { + "INSTANA_TRACING_FILTER_EXCLUDE_KAFKA_CONSUMER_ATTRIBUTES": "type;kafka;strict|kafka.access;consume;strict" + }, + ) + def test_filter_confluent_kafka_consumer(self) -> None: agent.options.set_trace_configurations() # Produce some events self.producer.produce(testenv["kafka_topic"], b"raw_bytes1") @@ -348,10 +361,10 @@ def test_ignore_confluent_kafka_consumer(self) -> None: @patch.dict( os.environ, { - "INSTANA_IGNORE_ENDPOINTS_PATH": "tests/util/test_configuration-1.yaml", + "INSTANA_TRACING_FILTER_EXCLUDE_KAFKA_ATTRIBUTES": "kafka.access;consume,send,produce;contains|kafka.service;span-topic,topic1,topic2;strict|kafka.access;*;strict", }, ) - def test_ignore_confluent_specific_topic(self) -> None: + def test_filter_confluent_specific_topic(self) -> None: agent.options.set_trace_configurations() self.kafka_client.create_topics( # noqa: F841 [ @@ -399,8 +412,8 @@ def test_ignore_confluent_specific_topic(self) -> None: ] ) - def test_ignore_confluent_specific_topic_with_config_file(self) -> None: - agent.options.ignore_endpoints = parse_ignored_endpoints_from_yaml( + def test_filter_confluent_specific_topic_with_config_file(self) -> None: + agent.options.span_filters = parse_filtered_endpoints_from_yaml( "tests/util/test_configuration-1.yaml" ) @@ -540,7 +553,7 @@ def test_confluent_kafka_poll_root_exit_without_trace_correlation(self) -> None: agent.options.kafka_trace_correlation = False # Produce some events - self.producer.produce(f'{testenv["kafka_topic"]}-wo-tc', b"raw_bytes1") + self.producer.produce(f"{testenv['kafka_topic']}-wo-tc", b"raw_bytes1") self.producer.flush() # Consume the events @@ -549,7 +562,7 @@ def test_confluent_kafka_poll_root_exit_without_trace_correlation(self) -> None: consumer_config["auto.offset.reset"] = "earliest" consumer = Consumer(consumer_config) - consumer.subscribe([f'{testenv["kafka_topic"]}-wo-tc']) + consumer.subscribe([f"{testenv['kafka_topic']}-wo-tc"]) msg = consumer.poll(timeout=30) # noqa: F841 @@ -562,14 +575,14 @@ def test_confluent_kafka_poll_root_exit_without_trace_correlation(self) -> None: spans, lambda span: span.n == "kafka" and span.data["kafka"]["access"] == "produce" - and span.data["kafka"]["service"] == f'{testenv["kafka_topic"]}-wo-tc', + and span.data["kafka"]["service"] == f"{testenv['kafka_topic']}-wo-tc", ) poll_span = get_first_span_by_filter( spans, lambda span: span.n == "kafka" and span.data["kafka"]["access"] == "poll" - and span.data["kafka"]["service"] == f'{testenv["kafka_topic"]}-wo-tc', + and span.data["kafka"]["service"] == f"{testenv['kafka_topic']}-wo-tc", ) # Different traceId @@ -608,12 +621,29 @@ def test_confluent_kafka_poll_root_exit_error(self) -> None: @patch.dict(os.environ, {"INSTANA_ALLOW_ROOT_EXIT_SPAN": "1"}) def test_confluent_kafka_downstream_suppression(self) -> None: - config["tracing"]["ignore_endpoints"] = { - "kafka": [ - {"methods": ["produce"], "endpoints": [f"{testenv['kafka_topic']}_1"]}, + config["tracing"]["filter"] = { + "exclude": [ + { + "name": "Kafka", + "attributes": [ + { + "key": "kafka.service", + "values": [f"{testenv['kafka_topic']}_1"], + }, + {"key": "kafka.access", "values": ["produce"]}, + ], + "suppression": True, + }, { - "methods": ["consume"], - "endpoints": [f"{testenv['kafka_topic']}_2"], + "name": "Kafka", + "attributes": [ + { + "key": "kafka.service", + "values": [f"{testenv['kafka_topic']}_2"], + }, + {"key": "kafka.access", "values": ["consume"]}, + ], + "suppression": True, }, ] } @@ -785,6 +815,12 @@ def test_confluent_kafka_poll_returns_none(self) -> None: consumer = Consumer(consumer_config) consumer.subscribe([testenv["kafka_topic"] + "_3"]) + # Consume any existing messages to ensure topic is empty + while True: + msg = consumer.poll(timeout=0.5) + if msg is None: + break + with self.tracer.start_as_current_span("test"): msg = consumer.poll(timeout=0.1) @@ -819,6 +855,8 @@ def test_confluent_kafka_poll_returns_none_with_context_cleanup(self) -> None: with self.tracer.start_as_current_span("test"): for _ in range(3): msg = consumer.poll(timeout=0.1) + if msg is not None: + print(f"DEBUG: Unexpected message: {msg.value()}") assert msg is None consumer.close() diff --git a/tests/clients/kafka/test_kafka_python.py b/tests/clients/kafka/test_kafka_python.py index 99cb8f2d..00e8c1dc 100644 --- a/tests/clients/kafka/test_kafka_python.py +++ b/tests/clients/kafka/test_kafka_python.py @@ -23,7 +23,7 @@ from instana.options import StandardOptions from instana.singletons import agent, get_tracer from instana.span.span import InstanaSpan -from instana.util.config import parse_ignored_endpoints_from_yaml +from instana.util.config import parse_filtered_endpoints_from_yaml from tests.helpers import get_first_span_by_filter, testenv @@ -96,6 +96,9 @@ def _resource(self) -> Generator[None, None, None]: ) self.kafka_client.close() + if "tracing" in config: + config.pop("tracing") + def test_trace_kafka_python_send(self) -> None: with self.tracer.start_as_current_span("test"): future = self.producer.send(testenv["kafka_topic"], b"raw_bytes") @@ -351,8 +354,11 @@ def consume_from_topic(self, topic_name: str) -> None: consumer.close() - @patch.dict(os.environ, {"INSTANA_IGNORE_ENDPOINTS": "kafka"}) - def test_ignore_kafka(self) -> None: + @patch.dict( + os.environ, + {"INSTANA_TRACING_FILTER_EXCLUDE_KAFKA_ATTRIBUTES": "type;kafka;strict"}, + ) + def test_filter_kafka(self) -> None: agent.options.set_trace_configurations() with self.tracer.start_as_current_span("test"): self.producer.send(testenv["kafka_topic"], b"raw_bytes") @@ -364,8 +370,11 @@ def test_ignore_kafka(self) -> None: filtered_spans = agent.filter_spans(spans) assert len(filtered_spans) == 1 - @patch.dict(os.environ, {"INSTANA_IGNORE_ENDPOINTS": "kafka:send"}) - def test_ignore_kafka_producer(self) -> None: + @patch.dict( + os.environ, + {"INSTANA_TRACING_FILTER_EXCLUDE_KAFKA_ATTRIBUTES": "kafka.access;send;strict"}, + ) + def test_filter_kafka_producer(self) -> None: agent.options.set_trace_configurations() with self.tracer.start_as_current_span("test-span"): # Produce some events @@ -394,8 +403,13 @@ def test_ignore_kafka_producer(self) -> None: filtered_spans = agent.filter_spans(spans) assert len(filtered_spans) == 1 - @patch.dict(os.environ, {"INSTANA_IGNORE_ENDPOINTS": "kafka:consume"}) - def test_ignore_kafka_consumer(self) -> None: + @patch.dict( + os.environ, + { + "INSTANA_TRACING_FILTER_EXCLUDE_KAFKA_ATTRIBUTES": "kafka.access;consume;strict" + }, + ) + def test_filter_kafka_consumer(self) -> None: agent.options.set_trace_configurations() # Produce some events self.producer.send(testenv["kafka_topic"], b"raw_bytes1") @@ -411,10 +425,10 @@ def test_ignore_kafka_consumer(self) -> None: @patch.dict( os.environ, { - "INSTANA_IGNORE_ENDPOINTS_PATH": "tests/util/test_configuration-1.yaml", + "INSTANA_TRACING_FILTER_EXCLUDE_KAFKA_ATTRIBUTES": "kafka.access;consume,send,produce;contains|kafka.service;span-topic,topic1,topic2;strict|kafka.access;*;strict", }, ) - def test_ignore_specific_topic(self) -> None: + def test_filter_specific_topic(self) -> None: agent.options.set_trace_configurations() with self.tracer.start_as_current_span("test-span"): # Produce some events @@ -439,8 +453,8 @@ def test_ignore_specific_topic(self) -> None: ) assert span_to_be_filtered not in filtered_spans - def test_ignore_specific_topic_with_config_file(self) -> None: - agent.options.ignore_endpoints = parse_ignored_endpoints_from_yaml( + def test_filter_specific_topic_with_config_file(self) -> None: + agent.options.span_filters = parse_filtered_endpoints_from_yaml( "tests/util/test_configuration-1.yaml" ) @@ -700,12 +714,37 @@ def test_kafka_poll_root_exit_without_trace_correlation(self) -> None: @patch.dict(os.environ, {"INSTANA_ALLOW_ROOT_EXIT_SPAN": "1"}) def test_kafka_downstream_suppression(self) -> None: - config["tracing"]["ignore_endpoints"] = { - "kafka": [ - {"methods": ["send"], "endpoints": [f"{testenv['kafka_topic']}_1"]}, + config["tracing"]["filter"] = { + "exclude": [ { - "methods": ["consume"], - "endpoints": [f"{testenv['kafka_topic']}_2"], + "name": "kafka-topic-1-suppression", + "attributes": [ + { + "key": "kafka.service", + "values": [f"{testenv['kafka_topic']}_1"], + "match_type": "strict", + }, + { + "key": "kafka.access", + "values": ["send"], + "match_type": "contains", + }, + ], + }, + { + "name": "kafka-topic-2-suppression", + "attributes": [ + { + "key": "kafka.service", + "values": [f"{testenv['kafka_topic']}_2"], + "match_type": "strict", + }, + { + "key": "kafka.access", + "values": ["consume"], + "match_type": "contains", + }, + ], }, ] } @@ -858,3 +897,19 @@ def test_clear_context(self, span: "InstanaSpan") -> None: # Verify all context is cleared assert consumer_span.get(None) is None assert kafka_python.consumer_token is None + + def test_kafka_producer_include_filter(self) -> None: + agent.options.span_filters = parse_filtered_endpoints_from_yaml( + "tests/util/test_configuration-1.yaml" + ) + with self.tracer.start_as_current_span("test-span"): + self.producer.send("topic", b"raw_bytes1") + self.producer.flush() + + spans = self.recorder.queued_spans() + assert len(spans) == 2 + + filtered_spans = agent.filter_spans(spans) + assert len(filtered_spans) == 1 + kafka_span = [s for s in filtered_spans if s.n == "kafka"][0] + assert kafka_span.data["kafka"]["service"] == "topic" diff --git a/tests/clients/test_redis.py b/tests/clients/test_redis.py index 7096ce0a..fda1c798 100644 --- a/tests/clients/test_redis.py +++ b/tests/clients/test_redis.py @@ -25,8 +25,11 @@ def _resource(self) -> Generator[None, None, None]: self.recorder.clear_spans() self.client = redis.Redis(host=testenv["redis_host"], db=testenv["redis_db"]) yield - if "INSTANA_IGNORE_ENDPOINTS" in os.environ.keys(): - del os.environ["INSTANA_IGNORE_ENDPOINTS"] + keys_to_remove = [ + k for k in os.environ.keys() if k.startswith("INSTANA_TRACING_FILTER_") + ] + for k in keys_to_remove: + del os.environ[k] agent.options.allow_exit_as_root = False def test_set_get(self) -> None: @@ -425,8 +428,9 @@ def test_pipelined_requests(self) -> None: ) @patch("instana.span.span.InstanaSpan.record_exception") def test_execute_command_with_instana_exception(self, mock_record_func, _) -> None: - with self.tracer.start_as_current_span("test"), pytest.raises( - Exception, match="test-error" + with ( + self.tracer.start_as_current_span("test"), + pytest.raises(Exception, match="test-error"), ): self.client.set("counter", "10") mock_record_func.assert_called() @@ -450,9 +454,12 @@ def test_execute_with_instana_exception( self, caplog: pytest.LogCaptureFixture ) -> None: caplog.set_level(logging.DEBUG, logger="instana") - with self.tracer.start_as_current_span("test"), patch( - "instana.instrumentation.redis.collect_attributes", - side_effect=Exception("test-error"), + with ( + self.tracer.start_as_current_span("test"), + patch( + "instana.instrumentation.redis.collect_attributes", + side_effect=Exception("test-error"), + ), ): pipe = self.client.pipeline() pipe.set("foox", "barX") @@ -461,10 +468,12 @@ def test_execute_with_instana_exception( pipe.execute() assert "Error collecting pipeline commands" in caplog.messages - def test_ignore_redis( + def test_filter_redis( self, ) -> None: - os.environ["INSTANA_IGNORE_ENDPOINTS"] = "redis" + os.environ["INSTANA_TRACING_FILTER_EXCLUDE_REDIS_ATTRIBUTES"] = ( + "redis.command;*;strict" + ) agent.options = StandardOptions() with self.tracer.start_as_current_span("test"): @@ -477,8 +486,10 @@ def test_ignore_redis( filtered_spans = agent.filter_spans(spans) assert len(filtered_spans) == 1 - def test_ignore_redis_single_command(self) -> None: - os.environ["INSTANA_IGNORE_ENDPOINTS"] = "redis:set" + def test_filter_redis_single_command(self) -> None: + os.environ["INSTANA_TRACING_FILTER_EXCLUDE_REDIS_ATTRIBUTES"] = ( + "redis.command;SET;strict" + ) agent.options = StandardOptions() with self.tracer.start_as_current_span("test"): @@ -499,8 +510,10 @@ def test_ignore_redis_single_command(self) -> None: assert sdk_span.n == "sdk" - def test_ignore_redis_multiple_commands(self) -> None: - os.environ["INSTANA_IGNORE_ENDPOINTS"] = "redis:set,get" + def test_filter_redis_multiple_commands(self) -> None: + os.environ["INSTANA_TRACING_FILTER_EXCLUDE_REDIS_ATTRIBUTES"] = ( + "redis.command;SET,GET;contains" + ) agent.options = StandardOptions() with self.tracer.start_as_current_span("test"): self.client.set("foox", "barX") @@ -516,8 +529,14 @@ def test_ignore_redis_multiple_commands(self) -> None: assert sdk_span.n == "sdk" - def test_ignore_redis_with_another_instrumentation(self) -> None: - os.environ["INSTANA_IGNORE_ENDPOINTS"] = "redis:set;something_else:something" + def test_filter_redis_with_another_instrumentation(self) -> None: + os.environ["INSTANA_TRACING_FILTER_EXCLUDE_REDIS_ATTRIBUTES"] = ( + "redis.command;SET;strict" + ) + # We simulate multiple rules by just setting the one relevant for this test + a dummy one if needed, + # or just rely on the fact that only redis interacts here. + # Original: "redis:set;something_else:something" + # Since we are setting ENV vars per policy/name, we can just set the redis one. agent.options = StandardOptions() with self.tracer.start_as_current_span("test"): self.client.set("foox", "barX") diff --git a/tests/test_options.py b/tests/test_options.py index 37c667f8..5caf47d7 100644 --- a/tests/test_options.py +++ b/tests/test_options.py @@ -30,13 +30,16 @@ def _resource(self) -> Generator[None, None, None]: def test_base_options(self) -> None: if "INSTANA_DEBUG" in os.environ: del os.environ["INSTANA_DEBUG"] + for key in list(os.environ.keys()): + if key.startswith("INSTANA_TRACING_FILTER_"): + del os.environ[key] self.base_options = BaseOptions() assert not self.base_options.debug assert self.base_options.log_level == logging.WARN assert not self.base_options.extra_http_headers assert not self.base_options.allow_exit_as_root - assert not self.base_options.ignore_endpoints + assert not self.base_options.span_filters assert self.base_options.kafka_trace_correlation assert self.base_options.secrets_matcher == "contains-ignore-case" assert self.base_options.secrets_list == ["key", "pass", "secret"] @@ -46,11 +49,11 @@ def test_base_options(self) -> None: def test_base_options_with_config(self) -> None: config["tracing"] = { - "ignore_endpoints": "service1;service3:method1,method2", + "filter": "service1;service3:method1,method2", "kafka": {"trace_correlation": True}, } self.base_options = BaseOptions() - assert self.base_options.ignore_endpoints == [ + assert self.base_options.span_filters == [ "service1.*", "service3.method1", "service3.method2", @@ -62,7 +65,8 @@ def test_base_options_with_config(self) -> None: { "INSTANA_DEBUG": "true", "INSTANA_EXTRA_HTTP_HEADERS": "SOMETHING;HERE", - "INSTANA_IGNORE_ENDPOINTS": "service1;service2:method1,method2", + "INSTANA_TRACING_FILTER_EXCLUDE_SERVICE1_ATTRIBUTES": "type;service1;strict", + "INSTANA_TRACING_FILTER_EXCLUDE_SERVICE2_ATTRIBUTES": "type;service2;strict", "INSTANA_SECRETS": "secret1:username,password", "INSTANA_TRACING_DISABLE": "logging, redis,kafka", }, @@ -74,11 +78,25 @@ def test_base_options_with_env_vars(self) -> None: assert self.base_options.extra_http_headers == ["something", "here"] - assert self.base_options.ignore_endpoints == [ - "service1.*", - "service2.method1", - "service2.method2", - ] + assert self.base_options.span_filters == { + "include": [], + "exclude": [ + { + "name": "SERVICE1", + "attributes": [ + {"key": "type", "values": ["service1"], "match_type": "strict"} + ], + "suppression": True, + }, + { + "name": "SERVICE2", + "attributes": [ + {"key": "type", "values": ["service2"], "match_type": "strict"} + ], + "suppression": True, + }, + ], + } assert self.base_options.secrets_matcher == "secret1" assert self.base_options.secrets_list == ["username", "password"] @@ -90,32 +108,95 @@ def test_base_options_with_env_vars(self) -> None: @patch.dict( os.environ, - {"INSTANA_IGNORE_ENDPOINTS_PATH": "tests/util/test_configuration-1.yaml"}, + {"INSTANA_CONFIG_PATH": "tests/util/test_configuration-1.yaml"}, ) def test_base_options_with_endpoint_file(self) -> None: self.base_options = BaseOptions() - assert self.base_options.ignore_endpoints == [ - "redis.get", - "redis.type", - "dynamodb.query", - "kafka.consume.span-topic", - "kafka.consume.topic1", - "kafka.consume.topic2", - "kafka.send.span-topic", - "kafka.send.topic1", - "kafka.send.topic2", - "kafka.consume.topic3", - "kafka.*.span-topic", - "kafka.*.topic4", - ] + assert self.base_options.span_filters == { + "include": [ + { + "name": "Kafka Producer", + "attributes": [ + {"key": "type", "values": ["kafka"], "match_type": "strict"}, + {"key": "kind", "values": ["exit"], "match_type": "strict"}, + { + "key": "kafka.service", + "values": ["topic"], + "match_type": "contains", + }, + ], + "suppression": None, + } + ], + "exclude": [ + { + "name": "Redis", + "attributes": [ + {"key": "command", "values": ["get"], "match_type": "strict"}, + {"key": "get", "values": ["type"], "match_type": "strict"}, + ], + "suppression": True, + }, + { + "name": "DynamoDB", + "attributes": [ + {"key": "op", "values": ["query"], "match_type": "strict"} + ], + "suppression": True, + }, + { + "name": "Kafka", + "attributes": [ + { + "key": "kafka.access", + "values": ["consume", "send", "produce"], + "match_type": "contains", + }, + { + "key": "kafka.service", + "values": ["span-topic", "topic1", "topic2"], + "match_type": "strict", + }, + { + "key": "kafka.access", + "values": ["*"], + "match_type": "strict", + }, + ], + "suppression": True, + }, + { + "name": "Protocols Category", + "suppression": True, + "attributes": [ + { + "key": "category", + "values": ["protocols"], + "match_type": "strict", + } + ], + }, + { + "name": "Entry Span Kind", + "suppression": True, + "attributes": [ + { + "key": "kind", + "values": ["intermediate"], + "match_type": "strict", + } + ], + }, + ], + } del self.base_options @patch.dict( os.environ, { - "INSTANA_IGNORE_ENDPOINTS": "env_service1;env_service2:method1,method2", + "INSTANA_TRACING_FILTER_EXCLUDE_SERVICE1_ATTRIBUTES": "type;env_service1;strict", + "INSTANA_TRACING_FILTER_EXCLUDE_SERVICE2_ATTRIBUTES": "type;env_service2.method1,env_service2.method2;strict", "INSTANA_KAFKA_TRACE_CORRELATION": "false", - "INSTANA_IGNORE_ENDPOINTS_PATH": "tests/util/test_configuration-1.yaml", "INSTANA_TRACING_DISABLE": "logging,redis, kafka", }, ) @@ -126,15 +207,13 @@ def test_set_trace_configurations_by_env_variable(self) -> None: # in-code configuration config["tracing"] = {} - config["tracing"]["ignore_endpoints"] = ( - "config_service1;config_service2:method1,method2" - ) + config["tracing"]["filter"] = "config_service1;config_service2:method1,method2" config["tracing"]["kafka"] = {"trace_correlation": True} config["tracing"]["disable"] = [{"databases": True}] # agent config (configuration.yaml) test_tracing = { - "ignore-endpoints": "service1;service2:method1,method2", + "filter": "service1;service2:method1,method2", "disable": [ {"messaging": True}, ], @@ -144,11 +223,33 @@ def test_set_trace_configurations_by_env_variable(self) -> None: self.base_options = StandardOptions() self.base_options.set_tracing(test_tracing) - assert self.base_options.ignore_endpoints == [ - "env_service1.*", - "env_service2.method1", - "env_service2.method2", - ] + assert self.base_options.span_filters == { + "include": [], + "exclude": [ + { + "name": "SERVICE1", + "attributes": [ + { + "key": "type", + "values": ["env_service1"], + "match_type": "strict", + } + ], + "suppression": True, + }, + { + "name": "SERVICE2", + "attributes": [ + { + "key": "type", + "values": ["env_service2.method1", "env_service2.method2"], + "match_type": "strict", + } + ], + "suppression": True, + }, + ], + } assert not self.base_options.kafka_trace_correlation # Check disabled_spans list @@ -163,24 +264,22 @@ def test_set_trace_configurations_by_env_variable(self) -> None: os.environ, { "INSTANA_KAFKA_TRACE_CORRELATION": "false", - "INSTANA_IGNORE_ENDPOINTS_PATH": "tests/util/test_configuration-1.yaml", + "INSTANA_CONFIG_PATH": "tests/util/test_configuration-1.yaml", }, ) def test_set_trace_configurations_by_in_code_configuration(self) -> None: # The priority is as follows: - # in-code configuration > agent config (configuration.yaml) > default value + # environment variables (INSTANA_CONFIG_PATH) > in-code configuration > agent config (configuration.yaml) > default value # in-code configuration config["tracing"] = {} - config["tracing"]["ignore_endpoints"] = ( - "config_service1;config_service2:method1,method2" - ) + config["tracing"]["filter"] = "config_service1;config_service2:method1,method2" config["tracing"]["kafka"] = {"trace_correlation": True} config["tracing"]["disable"] = [{"databases": True}] # agent config (configuration.yaml) test_tracing = { - "ignore-endpoints": "service1;service2:method1,method2", + "filter": "service1;service2:method1,method2", "disable": [ {"messaging": True}, ], @@ -189,41 +288,102 @@ def test_set_trace_configurations_by_in_code_configuration(self) -> None: self.base_options = StandardOptions() self.base_options.set_tracing(test_tracing) - assert self.base_options.ignore_endpoints == [ - "redis.get", - "redis.type", - "dynamodb.query", - "kafka.consume.span-topic", - "kafka.consume.topic1", - "kafka.consume.topic2", - "kafka.send.span-topic", - "kafka.send.topic1", - "kafka.send.topic2", - "kafka.consume.topic3", - "kafka.*.span-topic", - "kafka.*.topic4", - ] + assert self.base_options.span_filters == { + "include": [ + { + "name": "Kafka Producer", + "attributes": [ + {"key": "type", "values": ["kafka"], "match_type": "strict"}, + {"key": "kind", "values": ["exit"], "match_type": "strict"}, + { + "key": "kafka.service", + "values": ["topic"], + "match_type": "contains", + }, + ], + "suppression": None, + } + ], + "exclude": [ + { + "name": "Redis", + "attributes": [ + {"key": "command", "values": ["get"], "match_type": "strict"}, + {"key": "get", "values": ["type"], "match_type": "strict"}, + ], + "suppression": True, + }, + { + "name": "DynamoDB", + "attributes": [ + {"key": "op", "values": ["query"], "match_type": "strict"} + ], + "suppression": True, + }, + { + "name": "Kafka", + "attributes": [ + { + "key": "kafka.access", + "values": ["consume", "send", "produce"], + "match_type": "contains", + }, + { + "key": "kafka.service", + "values": ["span-topic", "topic1", "topic2"], + "match_type": "strict", + }, + { + "key": "kafka.access", + "values": ["*"], + "match_type": "strict", + }, + ], + "suppression": True, + }, + { + "name": "Protocols Category", + "suppression": True, + "attributes": [ + { + "key": "category", + "values": ["protocols"], + "match_type": "strict", + } + ], + }, + { + "name": "Entry Span Kind", + "suppression": True, + "attributes": [ + { + "key": "kind", + "values": ["intermediate"], + "match_type": "strict", + } + ], + }, + ], + } # Check disabled_spans list assert "databases" in self.base_options.disabled_spans - assert "logging" not in self.base_options.disabled_spans + assert "logging" in self.base_options.disabled_spans assert "redis" not in self.base_options.disabled_spans assert "kafka" not in self.base_options.disabled_spans assert "messaging" not in self.base_options.disabled_spans - assert len(self.base_options.enabled_spans) == 0 + assert "redis" in self.base_options.enabled_spans def test_set_trace_configurations_by_in_code_variable(self) -> None: config["tracing"] = {} - config["tracing"]["ignore_endpoints"] = ( - "config_service1;config_service2:method1,method2" - ) + config["tracing"]["filter"] = "config_service1;config_service2:method1,method2" config["tracing"]["kafka"] = {"trace_correlation": True} - test_tracing = {"ignore-endpoints": "service1;service2:method1,method2"} + test_tracing = {"filter": "service1;service2:method1,method2"} self.base_options = StandardOptions() self.base_options.set_tracing(test_tracing) - assert self.base_options.ignore_endpoints == [ + assert self.base_options.span_filters == [ "config_service1.*", "config_service2.method1", "config_service2.method2", @@ -232,7 +392,7 @@ def test_set_trace_configurations_by_in_code_variable(self) -> None: def test_set_trace_configurations_by_agent_configuration(self) -> None: test_tracing = { - "ignore-endpoints": "service1;service2:method1,method2", + "filter": "service1;service2:method1,method2", "trace-correlation": True, "disable": [ { @@ -246,7 +406,7 @@ def test_set_trace_configurations_by_agent_configuration(self) -> None: self.base_options = StandardOptions() self.base_options.set_tracing(test_tracing) - assert self.base_options.ignore_endpoints == [ + assert self.base_options.span_filters == [ "service1.*", "service2.method1", "service2.method2", @@ -263,7 +423,7 @@ def test_set_trace_configurations_by_default(self) -> None: self.base_options = StandardOptions() self.base_options.set_tracing({}) - assert not self.base_options.ignore_endpoints + assert not self.base_options.span_filters assert self.base_options.kafka_trace_correlation assert len(self.base_options.disabled_spans) == 0 assert len(self.base_options.enabled_spans) == 0 @@ -326,6 +486,52 @@ def test_is_span_disabled_method(self) -> None: assert self.base_options.is_span_disabled(span_type="mysql") assert not self.base_options.is_span_disabled(span_type="redis") + @patch.dict( + os.environ, + { + "INSTANA_TRACING_FILTER_EXCLUDE_KAFKA_ATTRIBUTES": "kafka.service;kafka;strict", + "INSTANA_TRACING_FILTER_EXCLUDE_REDIS_ATTRIBUTES": "redis.command;SET,GET;contains", + "INSTANA_TRACING_FILTER_INCLUDE_FOO_ATTRIBUTES": "http.url;foo;contains", + }, + ) + def test_tracing_filter_environment_variables(self) -> None: + self.base_options = StandardOptions() + assert self.base_options.span_filters == { + "include": [ + { + "name": "FOO", + "attributes": [ + {"key": "http.url", "values": ["foo"], "match_type": "contains"} + ], + "suppression": None, + } + ], + "exclude": [ + { + "name": "KAFKA", + "attributes": [ + { + "key": "kafka.service", + "values": ["kafka"], + "match_type": "strict", + } + ], + "suppression": True, + }, + { + "name": "REDIS", + "attributes": [ + { + "key": "redis.command", + "values": ["SET", "GET"], + "match_type": "contains", + } + ], + "suppression": True, + }, + ], + } + class TestStandardOptions: @pytest.fixture(autouse=True) @@ -364,12 +570,12 @@ def test_set_tracing( self.standart_options = StandardOptions() test_tracing = { - "ignore-endpoints": "service1;service2:method1,method2", + "filter": "service1;service2:method1,method2", "kafka": {"trace-correlation": "false", "header-format": "binary"}, } self.standart_options.set_tracing(test_tracing) - assert self.standart_options.ignore_endpoints == [ + assert self.standart_options.span_filters == [ "service1.*", "service2.method1", "service2.method2", @@ -404,7 +610,7 @@ def test_set_from(self) -> None: self.standart_options = StandardOptions() test_res_data = { "secrets": {"matcher": "sample-match", "list": ["sample", "list"]}, - "tracing": {"ignore-endpoints": "service1;service2:method1,method2"}, + "tracing": {"filter": "service1;service2:method1,method2"}, } self.standart_options.set_from(test_res_data) @@ -412,7 +618,7 @@ def test_set_from(self) -> None: self.standart_options.secrets_matcher == test_res_data["secrets"]["matcher"] ) assert self.standart_options.secrets_list == test_res_data["secrets"]["list"] - assert self.standart_options.ignore_endpoints == [ + assert self.standart_options.span_filters == [ "service1.*", "service2.method1", "service2.method2", @@ -443,7 +649,7 @@ def test_set_from_bool( ) assert self.standart_options.secrets_list == ["key", "pass", "secret"] - assert self.standart_options.ignore_endpoints == [] + assert self.standart_options.span_filters == {} assert not self.standart_options.extra_http_headers @@ -460,7 +666,7 @@ def test_serverless_options(self) -> None: assert self.serverless_options.log_level == logging.WARN assert not self.serverless_options.extra_http_headers assert not self.serverless_options.allow_exit_as_root - assert not self.serverless_options.ignore_endpoints + assert not self.serverless_options.span_filters assert self.serverless_options.secrets_matcher == "contains-ignore-case" assert self.serverless_options.secrets_list == ["key", "pass", "secret"] assert not self.serverless_options.secrets @@ -605,7 +811,7 @@ def test_gcr_options(self) -> None: assert self.gcr_options.log_level == logging.WARN assert not self.gcr_options.extra_http_headers assert not self.gcr_options.allow_exit_as_root - assert not self.gcr_options.ignore_endpoints + assert not self.gcr_options.span_filters assert self.gcr_options.secrets_matcher == "contains-ignore-case" assert self.gcr_options.secrets_list == ["key", "pass", "secret"] assert not self.gcr_options.secrets @@ -650,7 +856,7 @@ def _resource(self) -> Generator[None, None, None]: def test_stack_trace_defaults(self) -> None: """Test default stack trace configuration.""" self.options = BaseOptions() - + assert self.options.stack_trace_level == "all" assert self.options.stack_trace_length == 30 assert self.options.stack_trace_technology_config == {} @@ -692,7 +898,7 @@ def test_stack_trace_level_env_var_invalid( @pytest.mark.parametrize( "length_value,expected_length", [ - ("25", 25), + ("25", 25), ("60", 60), # Not capped here, capped when add_stack() is called ], ) @@ -744,10 +950,7 @@ def test_stack_trace_both_env_vars(self) -> None: def test_stack_trace_in_code_config(self) -> None: """Test in-code configuration for stack trace.""" config["tracing"] = { - "global": { - "stack_trace": "error", - "stack_trace_length": 20 - } + "global": {"stack_trace": "error", "stack_trace_length": 20} } self.options = BaseOptions() assert self.options.stack_trace_level == "error" @@ -756,27 +959,17 @@ def test_stack_trace_in_code_config(self) -> None: def test_stack_trace_agent_config(self) -> None: """Test agent configuration for stack trace.""" self.options = StandardOptions() - - test_tracing = { - "global": { - "stack-trace": "error", - "stack-trace-length": 15 - } - } + + test_tracing = {"global": {"stack-trace": "error", "stack-trace-length": 15}} self.options.set_tracing(test_tracing) - + assert self.options.stack_trace_level == "error" assert self.options.stack_trace_length == 15 def test_stack_trace_precedence_env_over_in_code(self) -> None: """Test environment variables take precedence over in-code config.""" - config["tracing"] = { - "global": { - "stack_trace": "all", - "stack_trace_length": 10 - } - } - + config["tracing"] = {"global": {"stack_trace": "all", "stack_trace_length": 10}} + with patch.dict( os.environ, { @@ -791,22 +984,14 @@ def test_stack_trace_precedence_env_over_in_code(self) -> None: def test_stack_trace_precedence_in_code_over_agent(self) -> None: """Test in-code config takes precedence over agent config.""" config["tracing"] = { - "global": { - "stack_trace": "error", - "stack_trace_length": 20 - } + "global": {"stack_trace": "error", "stack_trace_length": 20} } - + self.options = StandardOptions() - - test_tracing = { - "global": { - "stack-trace": "all", - "stack-trace-length": 10 - } - } + + test_tracing = {"global": {"stack-trace": "all", "stack-trace-length": 10}} self.options.set_tracing(test_tracing) - + # In-code config should win assert self.options.stack_trace_level == "error" assert self.options.stack_trace_length == 20 @@ -814,36 +999,28 @@ def test_stack_trace_precedence_in_code_over_agent(self) -> None: def test_stack_trace_technology_specific_override(self) -> None: """Test technology-specific stack trace configuration.""" self.options = StandardOptions() - + test_tracing = { - "global": { - "stack-trace": "error", - "stack-trace-length": 25 - }, - "kafka": { - "stack-trace": "all", - "stack-trace-length": 35 - }, - "redis": { - "stack-trace": "none" - } + "global": {"stack-trace": "error", "stack-trace-length": 25}, + "kafka": {"stack-trace": "all", "stack-trace-length": 35}, + "redis": {"stack-trace": "none"}, } self.options.set_tracing(test_tracing) - + # Global config assert self.options.stack_trace_level == "error" assert self.options.stack_trace_length == 25 - + # Kafka-specific override level, length = self.options.get_stack_trace_config("kafka-producer") assert level == "all" assert length == 35 - + # Redis-specific override (inherits length from global) level, length = self.options.get_stack_trace_config("redis") assert level == "none" assert length == 25 - + # Non-overridden span uses global level, length = self.options.get_stack_trace_config("mysql") assert level == "error" @@ -855,12 +1032,12 @@ def test_get_stack_trace_config_with_hyphenated_span_name(self) -> None: self.options.stack_trace_technology_config = { "kafka": {"level": "all", "length": 35} } - + # Should match "kafka" from "kafka-producer" level, length = self.options.get_stack_trace_config("kafka-producer") assert level == "all" assert length == 35 - + # Should match "kafka" from "kafka-consumer" level, length = self.options.get_stack_trace_config("kafka-consumer") assert level == "all" @@ -891,9 +1068,9 @@ def test_stack_trace_yaml_config_with_prefix( assert self.options.stack_trace_length == 20 assert ( - 'Please use "tracing" instead of "com.instana.tracing" for local configuration file.' - in caplog.messages - ) + 'Please use "tracing" instead of "com.instana.tracing" for local configuration file.' + in caplog.messages + ) def test_stack_trace_yaml_config_disabled(self) -> None: """Test YAML configuration with stack trace disabled.""" @@ -920,13 +1097,9 @@ def test_stack_trace_yaml_config_invalid( assert self.options.stack_trace_level == "all" assert self.options.stack_trace_length == 30 assert any( - "Invalid stack-trace value" in message - for message in caplog.messages - ) - assert any( - "must be positive" in message - for message in caplog.messages + "Invalid stack-trace value" in message for message in caplog.messages ) + assert any("must be positive" in message for message in caplog.messages) def test_stack_trace_yaml_config_partial(self) -> None: """Test YAML configuration with only stack-trace (no length).""" @@ -956,12 +1129,9 @@ def test_stack_trace_precedence_env_over_yaml(self) -> None: def test_stack_trace_precedence_yaml_over_in_code(self) -> None: """Test YAML config takes precedence over in-code config.""" config["tracing"] = { - "global": { - "stack_trace": "error", - "stack_trace_length": 10 - } + "global": {"stack_trace": "error", "stack_trace_length": 10} } - + with patch.dict( os.environ, {"INSTANA_CONFIG_PATH": "tests/util/test_stack_trace_config_1.yaml"}, @@ -978,15 +1148,10 @@ def test_stack_trace_precedence_yaml_over_agent(self) -> None: {"INSTANA_CONFIG_PATH": "tests/util/test_stack_trace_config_2.yaml"}, ): self.options = StandardOptions() - - test_tracing = { - "global": { - "stack-trace": "all", - "stack-trace-length": 30 - } - } + + test_tracing = {"global": {"stack-trace": "all", "stack-trace-length": 30}} self.options.set_tracing(test_tracing) - + # YAML should override agent config assert self.options.stack_trace_level == "error" assert self.options.stack_trace_length == 20 diff --git a/tests/util/test_config.py b/tests/util/test_config.py index 741d5658..b77398eb 100644 --- a/tests/util/test_config.py +++ b/tests/util/test_config.py @@ -2,10 +2,12 @@ import pytest -from instana.util.config import (is_truthy, parse_endpoints_of_service, - parse_ignored_endpoints, - parse_ignored_endpoints_dict, - parse_kafka_methods, parse_service_pair) +from instana.util.config import ( + is_truthy, + parse_filtered_endpoints, + parse_filtered_endpoints_dict, + parse_service_pair, +) class TestConfig: @@ -15,19 +17,19 @@ def test_parse_service_pair(self) -> None: assert response == ["service1.method1", "service1.method2"] test_string = "service1;service2" - response = parse_ignored_endpoints(test_string) + response = parse_filtered_endpoints(test_string) assert response == ["service1.*", "service2.*"] test_string = "service1" - response = parse_ignored_endpoints(test_string) + response = parse_filtered_endpoints(test_string) assert response == ["service1.*"] test_string = ";" - response = parse_ignored_endpoints(test_string) + response = parse_filtered_endpoints(test_string) assert response == [] test_string = "service1:method1,method2;;;service2:method1;;" - response = parse_ignored_endpoints(test_string) + response = parse_filtered_endpoints(test_string) assert response == [ "service1.method1", "service1.method2", @@ -35,28 +37,28 @@ def test_parse_service_pair(self) -> None: ] test_string = "" - response = parse_ignored_endpoints(test_string) + response = parse_filtered_endpoints(test_string) assert response == [] - def test_parse_ignored_endpoints_string(self) -> None: + def test_parse_filtered_endpoints_string(self) -> None: test_string = "service1:method1,method2" response = parse_service_pair(test_string) assert response == ["service1.method1", "service1.method2"] test_string = "service1;service2" - response = parse_ignored_endpoints(test_string) + response = parse_filtered_endpoints(test_string) assert response == ["service1.*", "service2.*"] test_string = "service1" - response = parse_ignored_endpoints(test_string) + response = parse_filtered_endpoints(test_string) assert response == ["service1.*"] test_string = ";" - response = parse_ignored_endpoints(test_string) + response = parse_filtered_endpoints(test_string) assert response == [] test_string = "service1:method1,method2;;;service2:method1;;" - response = parse_ignored_endpoints(test_string) + response = parse_filtered_endpoints(test_string) assert response == [ "service1.method1", "service1.method2", @@ -64,49 +66,66 @@ def test_parse_ignored_endpoints_string(self) -> None: ] test_string = "" - response = parse_ignored_endpoints(test_string) + response = parse_filtered_endpoints(test_string) assert response == [] - def test_parse_ignored_endpoints_dict(self) -> None: - test_dict = {"service1": ["method1", "method2"]} - response = parse_ignored_endpoints_dict(test_dict) - assert response == ["service1.method1", "service1.method2"] - - test_dict = {"SERVICE1": ["method1", "method2"]} - response = parse_ignored_endpoints_dict(test_dict) - assert response == ["service1.method1", "service1.method2"] - - test_dict = {"service1": [], "service2": []} - response = parse_ignored_endpoints_dict(test_dict) - assert response == ["service1.*", "service2.*"] - - test_dict = {"service1": []} - response = parse_ignored_endpoints_dict(test_dict) - assert response == ["service1.*"] + def test_parse_filtered_endpoints_dict(self) -> None: + test_dict = { + "exclude": [ + { + "name": "test_exclude", + "attributes": [ + { + "key": "http.url", + "values": ["/health"], + "match_type": "equals", + } + ], + } + ], + "include": [], + } + response = parse_filtered_endpoints_dict(test_dict) + assert response == { + "exclude": [ + { + "name": "test_exclude", + "suppression": True, + "attributes": [ + { + "key": "http.url", + "values": ["/health"], + "match_type": "equals", + } + ], + } + ], + "include": [], + } test_dict = {} - response = parse_ignored_endpoints_dict(test_dict) - assert response == [] + response = parse_filtered_endpoints_dict(test_dict) + assert response == {"exclude": [], "include": []} - def test_parse_ignored_endpoints(self) -> None: + def test_parse_filtered_endpoints(self) -> None: test_pair = "service1:method1,method2" - response = parse_ignored_endpoints(test_pair) + response = parse_filtered_endpoints(test_pair) assert response == ["service1.method1", "service1.method2"] test_pair = "service1;service2" - response = parse_ignored_endpoints(test_pair) + response = parse_filtered_endpoints(test_pair) assert response == ["service1.*", "service2.*"] test_pair = "service1" - response = parse_ignored_endpoints(test_pair) + response = parse_filtered_endpoints(test_pair) assert response == ["service1.*"] test_pair = ";" - response = parse_ignored_endpoints(test_pair) + response = parse_filtered_endpoints(test_pair) assert response == [] test_pair = "service1:method1,method2;;;service2:method1;;" - response = parse_ignored_endpoints(test_pair) + response = parse_filtered_endpoints(test_pair) assert response == [ "service1.method1", "service1.method2", @@ -114,77 +133,66 @@ def test_parse_ignored_endpoints(self) -> None: ] test_pair = "" - response = parse_ignored_endpoints(test_pair) + response = parse_filtered_endpoints(test_pair) assert response == [] - test_dict = {"service1": ["method1", "method2"]} - response = parse_ignored_endpoints(test_dict) - assert response == ["service1.method1", "service1.method2"] - - test_dict = {"service1": [], "service2": []} - response = parse_ignored_endpoints(test_dict) - assert response == ["service1.*", "service2.*"] - - test_dict = {"service1": []} - response = parse_ignored_endpoints(test_dict) - assert response == ["service1.*"] - - test_dict = {} - response = parse_ignored_endpoints(test_dict) - assert response == [] - - def test_parse_endpoints_of_service(self) -> None: - test_ignore_endpoints = { - "service1": ["method1", "method2"], - "service2": ["method3", "method4"], - "kafka": [ + test_dict = { + "exclude": [ { - "methods": ["method5", "method6"], - "endpoints": ["endpoint1", "endpoint2"], + "name": "test_exclude", + "attributes": [ + { + "key": "http.url", + "values": ["/health"], + "match_type": "equals", + } + ], } ], + "include": [], + } + response = parse_filtered_endpoints(test_dict) + assert response == { + "exclude": [ + { + "name": "test_exclude", + "suppression": True, + "attributes": [ + { + "key": "http.url", + "values": ["/health"], + "match_type": "equals", + } + ], + } + ], + "include": [], } - ignore_endpoints = [] - for service, methods in test_ignore_endpoints.items(): - ignore_endpoints.extend(parse_endpoints_of_service([], service, methods)) - assert ignore_endpoints == [ - "service1.method1", - "service1.method2", - "service2.method3", - "service2.method4", - "kafka.method5.endpoint1", - "kafka.method5.endpoint2", - "kafka.method6.endpoint1", - "kafka.method6.endpoint2", - ] - def test_parse_kafka_methods_as_dict(self) -> None: - test_rule_as_dict = {"methods": ["send"], "endpoints": ["topic1"]} - parsed_rule = parse_kafka_methods(test_rule_as_dict) - assert parsed_rule == ["kafka.send.topic1"] - - def test_parse_kafka_methods_as_str(self) -> None: - test_rule_as_str = ["send"] - parsed_rule = parse_kafka_methods(test_rule_as_str) - assert parsed_rule == ["kafka.send.*"] - - @pytest.mark.parametrize("value, expected", [ - (True, True), - (False, False), - ("True", True), - ("true", True), - ("1", True), - (1, True), - ("False", False), - ("false", False), - ("0", False), - (0, False), - (None, False), - ("TRUE", True), - ("FALSE", False), - ("yes", False), # Only "true" and "1" are considered truthy - ("no", False), - ]) + test_dict = {} + response = parse_filtered_endpoints(test_dict) + assert response == {"exclude": [], "include": []} + + @pytest.mark.parametrize( + "value, expected", + [ + (True, True), + (False, False), + ("True", True), + ("true", True), + ("1", True), + (1, True), + ("False", False), + ("false", False), + ("0", False), + (0, False), + (None, False), + ("TRUE", True), + ("FALSE", False), + ("yes", False), # Only "true" and "1" are considered truthy + ("no", False), + ], + ) def test_is_truthy(self, value, expected) -> None: """Test the is_truthy function with various input values.""" assert is_truthy(value) == expected diff --git a/tests/util/test_config_reader.py b/tests/util/test_config_reader.py index c5753f8e..365730d3 100644 --- a/tests/util/test_config_reader.py +++ b/tests/util/test_config_reader.py @@ -9,7 +9,7 @@ from instana.util.config import ( get_disable_trace_configurations_from_yaml, - parse_ignored_endpoints_from_yaml, + parse_filtered_endpoints_from_yaml, ) from instana.util.config_reader import ConfigReader @@ -75,24 +75,87 @@ def test_config_reader_yaml_error( def test_load_configuration_with_tracing(self, caplog: "LogCaptureFixture") -> None: caplog.set_level(logging.DEBUG, logger="instana") - ignore_endpoints = parse_ignored_endpoints_from_yaml( + span_filters = parse_filtered_endpoints_from_yaml( "tests/util/test_configuration-1.yaml" ) # test with tracing - assert ignore_endpoints == [ - "redis.get", - "redis.type", - "dynamodb.query", - "kafka.consume.span-topic", - "kafka.consume.topic1", - "kafka.consume.topic2", - "kafka.send.span-topic", - "kafka.send.topic1", - "kafka.send.topic2", - "kafka.consume.topic3", - "kafka.*.span-topic", - "kafka.*.topic4", - ] + assert span_filters == { + "exclude": [ + { + "name": "Redis", + "suppression": True, + "attributes": [ + {"key": "command", "values": ["get"], "match_type": "strict"}, + {"key": "get", "values": ["type"], "match_type": "strict"}, + ], + }, + { + "name": "DynamoDB", + "suppression": True, + "attributes": [ + {"key": "op", "values": ["query"], "match_type": "strict"}, + ], + }, + { + "name": "Kafka", + "suppression": True, + "attributes": [ + { + "key": "kafka.access", + "values": ["consume", "send", "produce"], + "match_type": "contains", + }, + { + "key": "kafka.service", + "values": ["span-topic", "topic1", "topic2"], + "match_type": "strict", + }, + { + "key": "kafka.access", + "values": ["*"], + "match_type": "strict", + }, + ], + }, + { + "name": "Protocols Category", + "suppression": True, + "attributes": [ + { + "key": "category", + "values": ["protocols"], + "match_type": "strict", + } + ], + }, + { + "name": "Entry Span Kind", + "suppression": True, + "attributes": [ + { + "key": "kind", + "values": ["intermediate"], + "match_type": "strict", + } + ], + }, + ], + "include": [ + { + "name": "Kafka Producer", + "suppression": None, + "attributes": [ + {"key": "type", "values": ["kafka"], "match_type": "strict"}, + {"key": "kind", "values": ["exit"], "match_type": "strict"}, + { + "key": "kafka.service", + "values": ["topic"], + "match_type": "contains", + }, + ], + } + ], + } os.environ["INSTANA_CONFIG_PATH"] = "tests/util/test_configuration-1.yaml" disabled_spans, enabled_spans = get_disable_trace_configurations_from_yaml() @@ -110,24 +173,50 @@ def test_load_configuration_with_tracing(self, caplog: "LogCaptureFixture") -> N def test_load_configuration_legacy(self, caplog: "LogCaptureFixture") -> None: caplog.set_level(logging.DEBUG, logger="instana") - ignore_endpoints = parse_ignored_endpoints_from_yaml( + span_filters = parse_filtered_endpoints_from_yaml( "tests/util/test_configuration-2.yaml" ) - assert ignore_endpoints == [ - "redis.get", - "redis.type", - "dynamodb.query", - "kafka.send.*", - "kafka.consume.span-topic", - "kafka.consume.topic1", - "kafka.consume.topic2", - "kafka.send.span-topic", - "kafka.send.topic1", - "kafka.send.topic2", - "kafka.consume.topic3", - "kafka.*.span-topic", - "kafka.*.topic4", - ] + assert span_filters == { + "exclude": [ + { + "name": "Redis", + "suppression": True, + "attributes": [ + {"key": "command", "values": ["get"], "match_type": "strict"}, + {"key": "get", "values": ["type"], "match_type": "strict"}, + ], + }, + { + "name": "DynamoDB", + "suppression": True, + "attributes": [ + {"key": "op", "values": ["query"], "match_type": "strict"}, + ], + }, + { + "name": "Kafka", + "suppression": True, + "attributes": [ + { + "key": "kafka.access", + "values": ["consume", "send", "produce"], + "match_type": "contains", + }, + { + "key": "kafka.service", + "values": ["span-topic", "topic1", "topic2"], + "match_type": "strict", + }, + { + "key": "kafka.access", + "values": ["*"], + "match_type": "strict", + }, + ], + }, + ], + "include": [], + } os.environ["INSTANA_CONFIG_PATH"] = "tests/util/test_configuration-2.yaml" disabled_spans, enabled_spans = get_disable_trace_configurations_from_yaml() diff --git a/tests/util/test_configuration-1.yaml b/tests/util/test_configuration-1.yaml index ac61d362..3f19a384 100644 --- a/tests/util/test_configuration-1.yaml +++ b/tests/util/test_configuration-1.yaml @@ -2,21 +2,48 @@ # service-level configuration, aligning with in-code settings tracing: - ignore-endpoints: - redis: - - get - - type - dynamodb: - - query - kafka: - - methods: ["consume", "send"] - endpoints: ["span-topic", "topic1", "topic2"] - - methods: ["consume"] - endpoints: ["topic3"] - - methods: ["*"] # Applied to all methods - endpoints: ["span-topic", "topic4"] - # - methods: ["consume", "send"] - # endpoints: ["*"] # Applied to all topics + filter: + exclude: + - name: "Redis" + attributes: + - key: "command" + values: ["get"] + - key: "get" + values: ["type"] + - name: "DynamoDB" + attributes: + - key: "op" + values: ["query"] + - name: "Kafka" + attributes: + - key: "kafka.access" + values: ["consume", "send", "produce"] + match_type: "contains" + - key: "kafka.service" + values: ["span-topic", "topic1", "topic2"] + match_type: "strict" + - key: "kafka.access" + values: ["*"] + - name: "Protocols Category" + attributes: + - key: "category" + values: ["protocols"] + match_type: "strict" + - name: "Entry Span Kind" + attributes: + - key: "kind" + values: ["intermediate"] + match_type: "strict" + include: + - name: "Kafka Producer" + attributes: + - key: "type" + values: ["kafka"] + - key: "kind" + values: ["exit"] + - key: "kafka.service" + values: ["topic"] + match_type: "contains" disable: - "logging": true - "databases": true diff --git a/tests/util/test_configuration-2.yaml b/tests/util/test_configuration-2.yaml index 5ed83ec1..9021cc26 100644 --- a/tests/util/test_configuration-2.yaml +++ b/tests/util/test_configuration-2.yaml @@ -2,22 +2,28 @@ # service-level configuration, aligning with in-code settings com.instana.tracing: - ignore-endpoints: - redis: - - get - - type - dynamodb: - - query - kafka: - - send - - methods: ["consume", "send"] - endpoints: ["span-topic", "topic1", "topic2"] - - methods: ["consume"] - endpoints: ["topic3"] - - methods: ["*"] # Applied to all methods - endpoints: ["span-topic", "topic4"] - # - methods: ["consume", "send"] - # endpoints: ["*"] # Applied to all topics + filter: + exclude: + - name: "Redis" + attributes: + - key: "command" + values: ["get"] + - key: "get" + values: ["type"] + - name: "DynamoDB" + attributes: + - key: "op" + values: ["query"] + - name: "Kafka" + attributes: + - key: "kafka.access" + values: ["consume", "send", "produce"] + match_type: "contains" + - key: "kafka.service" + values: ["span-topic", "topic1", "topic2"] + match_type: "strict" + - key: "kafka.access" + values: ["*"] disable: - "logging": true - "databases": true diff --git a/tests/util/test_span_utils.py b/tests/util/test_span_utils.py index c2018b04..4dee18d8 100644 --- a/tests/util/test_span_utils.py +++ b/tests/util/test_span_utils.py @@ -1,24 +1,110 @@ # (c) Copyright IBM Corp. 2025 -from typing import List, Optional -import pytest - -from instana.util.span_utils import get_operation_specifiers - - -@pytest.mark.parametrize( - "span_name, expected_result", - [ - ("something", ["", ""]), - ("redis", ["command", ""]), - ("dynamodb", ["op", ""]), - ("kafka", ["access", "service"]), - ], -) -def test_get_operation_specifiers( - span_name: str, - expected_result: Optional[List[str]], -) -> None: - operation_specifier, service_specifier = get_operation_specifiers(span_name) - assert operation_specifier == expected_result[0] - assert service_specifier == expected_result[1] +from instana.util.span_utils import matches_rule, match_key_filter, get_span_kind + + +class TestSpanUtils: + def test_get_span_kind(self) -> None: + assert get_span_kind(1) == "entry" + assert get_span_kind(2) == "exit" + assert get_span_kind(3) == "intermediate" + assert get_span_kind("foo") == "intermediate" + + def test_match_key_filter(self) -> None: + # Strict + assert match_key_filter("foo", "foo", "strict") + assert not match_key_filter("foo", "bar", "strict") + + # Contains + assert match_key_filter("foobar", "oba", "contains") + assert not match_key_filter("foobar", "baz", "contains") + + # Startswith + assert match_key_filter("foobar", "foo", "startswith") + assert not match_key_filter("foobar", "bar", "startswith") + + # Endswith + assert match_key_filter("foobar", "bar", "endswith") + assert not match_key_filter("foobar", "foo", "endswith") + + # Wildcard + assert match_key_filter("whatever", "*", "strict") + assert match_key_filter("whatever", "*", "contains") + + def test_matches_rule_category(self) -> None: + # Redis is in databases category + span_attrs = {"type": "redis"} + + rule_positive = [{"key": "category", "values": ["databases"]}] + assert matches_rule(rule_positive, span_attrs) + + rule_negative = [{"key": "category", "values": ["messaging"]}] + assert not matches_rule(rule_negative, span_attrs) + + # Unknown type + span_attrs_unknown = {"type": "unknown_db"} + assert not matches_rule(rule_positive, span_attrs_unknown) + + def test_matches_rule_kind(self) -> None: + span_attrs_entry = {"kind": 1} + + rule_entry = [{"key": "kind", "values": ["entry"]}] + assert matches_rule(rule_entry, span_attrs_entry) + + rule_exit = [{"key": "kind", "values": ["exit"]}] + assert not matches_rule(rule_exit, span_attrs_entry) + + def test_matches_rule_type(self) -> None: + span_attrs = {"type": "http"} + + rule_http = [{"key": "type", "values": ["http"]}] + assert matches_rule(rule_http, span_attrs) + + rule_rpc = [{"key": "type", "values": ["rpc"]}] + assert not matches_rule(rule_rpc, span_attrs) + + def test_matches_rule_attributes(self) -> None: + span_attrs = {"http.url": "http://example.com/health", "http.status_code": 200} + + # Strict match + rule_url = [ + { + "key": "http.url", + "values": ["http://example.com/health"], + "match_type": "strict", + } + ] + assert matches_rule(rule_url, span_attrs) + + # Contains match + rule_contains = [ + {"key": "http.url", "values": ["health"], "match_type": "contains"} + ] + assert matches_rule(rule_contains, span_attrs) + + def test_matches_rule_multiple_rules(self) -> None: + # matches_rule iterates over rule_attributes (list of rules). + # Inside loop: if not rule_matched: return False (AND logic). + # So all rules must match. + + span_attrs = {"type": "http", "http.url": "http://example.com/health"} + + rules = [ + {"key": "type", "values": ["http"]}, + { + "key": "http.url", + "values": ["http://example.com/health"], + "match_type": "strict", + }, + ] + assert matches_rule(rules, span_attrs) + + rules_fail = [ + {"key": "type", "values": ["http"]}, + { + "key": "http.url", + "values": ["http://example.com/login"], + "match_type": "strict", + }, + ] + assert not matches_rule(rules_fail, span_attrs)