-
Notifications
You must be signed in to change notification settings - Fork 931
[WIP] gRPC instrumentation migration to release candidate semconv (1.40.0) #4279
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -267,25 +267,47 @@ async def serve(): | |
| ``filter_`` option also applies to both global and manual client intrumentors. | ||
|
|
||
|
|
||
| Environment variable | ||
| -------------------- | ||
| Environment variables | ||
| --------------------- | ||
|
|
||
| If you'd like to exclude specific services for the instrumentations, you can use | ||
| ``OTEL_PYTHON_GRPC_EXCLUDED_SERVICES`` environment variables. | ||
| ``OTEL_PYTHON_GRPC_EXCLUDED_SERVICES`` | ||
| Comma-separated list of service names to exclude from instrumentation. | ||
| For example, ``"GRPCTestServer,GRPCHealthServer"`` will exclude those services. | ||
|
|
||
| For example, if you assign ``"GRPCTestServer,GRPCHealthServer"`` to the variable, | ||
| then the global interceptor automatically adds the filters to exclude requests to | ||
| services ``GRPCTestServer`` and ``GRPCHealthServer``. | ||
| ``OTEL_SEMCONV_STABILITY_OPT_IN`` | ||
| Controls which version of the RPC semantic conventions the instrumentation | ||
| emits. Accepted values (comma-separated): | ||
|
|
||
| - ``rpc`` — emit only the stable (new) RPC conventions. Key changes: | ||
|
|
||
| - ``rpc.system`` → ``rpc.system.name`` | ||
| - ``rpc.grpc.status_code`` (int) → ``rpc.response.status_code`` (string, | ||
| e.g. ``"OK"``, ``"UNAVAILABLE"``); non-OK codes also set ``error.type`` | ||
| - ``rpc.method`` now contains the fully-qualified name | ||
| (e.g. ``"helloworld.Greeter/SayHello"``); ``rpc.service`` is removed | ||
| - ``net.peer.ip`` / ``net.peer.name`` / ``net.peer.port`` (server spans) | ||
| → ``client.address`` / ``client.port`` | ||
|
|
||
| - ``rpc/dup`` — emit both old and new RPC conventions simultaneously, | ||
| useful for a phased rollout. | ||
|
|
||
| - *(default, no value)* — continue emitting the old RPC conventions. | ||
|
|
||
| """ | ||
|
|
||
| import os | ||
| from typing import Callable, Collection, List, Union | ||
| from typing import Callable, Collection, List, Optional, Tuple, Union | ||
|
|
||
| import grpc # pylint:disable=import-self | ||
| from wrapt import wrap_function_wrapper as _wrap | ||
|
|
||
| from opentelemetry import trace | ||
| from opentelemetry.instrumentation._semconv import ( | ||
| _StabilityMode, | ||
| _OpenTelemetrySemanticConventionStability, | ||
| _OpenTelemetryStabilitySignalType, | ||
| ) | ||
| from opentelemetry.instrumentation.grpc._semconv import _parse_grpc_target | ||
| from opentelemetry.instrumentation.grpc.filters import ( | ||
| any_of, | ||
| negate, | ||
|
|
@@ -296,6 +318,7 @@ async def serve(): | |
| from opentelemetry.instrumentation.grpc.version import __version__ | ||
| from opentelemetry.instrumentation.instrumentor import BaseInstrumentor | ||
| from opentelemetry.instrumentation.utils import unwrap | ||
| from opentelemetry.semconv.schemas import Schemas | ||
|
|
||
| # pylint:disable=import-outside-toplevel | ||
| # pylint:disable=import-self | ||
|
|
@@ -482,15 +505,17 @@ def _uninstrument(self, **kwargs): | |
| def wrapper_fn(self, original_func, instance, args, kwargs): | ||
| channel = original_func(*args, **kwargs) | ||
| tracer_provider = kwargs.get("tracer_provider") | ||
| request_hook = self._request_hook | ||
| response_hook = self._response_hook | ||
| target = args[0] if args else kwargs.get("target", "") | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. todo: are there other ways to pass target /host:port to public api? |
||
| host, port = _parse_grpc_target(target) | ||
| return intercept_channel( | ||
| channel, | ||
| client_interceptor( | ||
| tracer_provider=tracer_provider, | ||
| filter_=self._filter, | ||
| request_hook=request_hook, | ||
| response_hook=response_hook, | ||
| request_hook=self._request_hook, | ||
| response_hook=self._response_hook, | ||
| host=host, | ||
| port=port, | ||
| ), | ||
| ) | ||
|
|
||
|
|
@@ -522,26 +547,21 @@ def __init__(self, filter_=None): | |
| def instrumentation_dependencies(self) -> Collection[str]: | ||
| return _instruments | ||
|
|
||
| def _add_interceptors(self, tracer_provider, kwargs): | ||
| def _add_interceptors(self, tracer_provider, args, kwargs): | ||
| target = args[0] if args else kwargs.get("target", "") | ||
| host, port = _parse_grpc_target(target) | ||
| ours = aio_client_interceptors( | ||
| tracer_provider=tracer_provider, | ||
| filter_=self._filter, | ||
| request_hook=self._request_hook, | ||
| response_hook=self._response_hook, | ||
| host=host, | ||
| port=port, | ||
| ) | ||
| if "interceptors" in kwargs and kwargs["interceptors"]: | ||
| kwargs["interceptors"] = list(kwargs["interceptors"]) | ||
| kwargs["interceptors"] = ( | ||
| aio_client_interceptors( | ||
| tracer_provider=tracer_provider, | ||
| filter_=self._filter, | ||
| request_hook=self._request_hook, | ||
| response_hook=self._response_hook, | ||
| ) | ||
| + kwargs["interceptors"] | ||
| ) | ||
| kwargs["interceptors"] = ours + list(kwargs["interceptors"]) | ||
| else: | ||
| kwargs["interceptors"] = aio_client_interceptors( | ||
| tracer_provider=tracer_provider, | ||
| filter_=self._filter, | ||
| request_hook=self._request_hook, | ||
| response_hook=self._response_hook, | ||
| ) | ||
|
|
||
| kwargs["interceptors"] = ours | ||
| return kwargs | ||
|
|
||
| def _instrument(self, **kwargs): | ||
|
|
@@ -552,13 +572,11 @@ def _instrument(self, **kwargs): | |
| tracer_provider = kwargs.get("tracer_provider") | ||
|
|
||
| def insecure(*args, **kwargs): | ||
| kwargs = self._add_interceptors(tracer_provider, kwargs) | ||
|
|
||
| kwargs = self._add_interceptors(tracer_provider, args, kwargs) | ||
| return self._original_insecure(*args, **kwargs) | ||
|
|
||
| def secure(*args, **kwargs): | ||
| kwargs = self._add_interceptors(tracer_provider, kwargs) | ||
|
|
||
| kwargs = self._add_interceptors(tracer_provider, args, kwargs) | ||
| return self._original_secure(*args, **kwargs) | ||
|
|
||
| grpc.aio.insecure_channel = insecure | ||
|
|
@@ -570,7 +588,8 @@ def _uninstrument(self, **kwargs): | |
|
|
||
|
|
||
| def client_interceptor( | ||
| tracer_provider=None, filter_=None, request_hook=None, response_hook=None | ||
| tracer_provider=None, filter_=None, request_hook=None, response_hook=None, | ||
| host=None, port=None, | ||
| ): | ||
| """Create a gRPC client channel interceptor. | ||
|
|
||
|
|
@@ -581,23 +600,37 @@ def client_interceptor( | |
| matches the condition. Default is None and intercept | ||
| all requests. | ||
|
|
||
| host: Server hostname parsed from the channel target. Used to set | ||
| ``server.address`` / ``net.peer.name`` on client spans. | ||
|
|
||
| port: Server port parsed from the channel target. Used to set | ||
| ``server.port`` / ``net.peer.port`` on client spans. | ||
|
|
||
| Returns: | ||
| An invocation-side interceptor object. | ||
| """ | ||
| from . import _client # noqa: PLC0415 | ||
|
|
||
| _OpenTelemetrySemanticConventionStability._initialize() | ||
| sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode( | ||
| _OpenTelemetryStabilitySignalType.RPC | ||
| ) | ||
|
|
||
| tracer = trace.get_tracer( | ||
| __name__, | ||
| __version__, | ||
| tracer_provider, | ||
| schema_url="https://opentelemetry.io/schemas/1.11.0", | ||
| schema_url=_get_rpc_schema_url(sem_conv_opt_in_mode), | ||
| ) | ||
|
|
||
| return _client.OpenTelemetryClientInterceptor( | ||
| tracer, | ||
| filter_=filter_, | ||
| request_hook=request_hook, | ||
| response_hook=response_hook, | ||
| sem_conv_opt_in_mode=sem_conv_opt_in_mode, | ||
| host=host, | ||
| port=port, | ||
| ) | ||
|
|
||
|
|
||
|
|
@@ -616,61 +649,65 @@ def server_interceptor(tracer_provider=None, filter_=None): | |
| """ | ||
| from . import _server # noqa: PLC0415 | ||
|
|
||
| _OpenTelemetrySemanticConventionStability._initialize() | ||
| sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode( | ||
| _OpenTelemetryStabilitySignalType.RPC | ||
| ) | ||
|
|
||
| tracer = trace.get_tracer( | ||
| __name__, | ||
| __version__, | ||
| tracer_provider, | ||
| schema_url="https://opentelemetry.io/schemas/1.11.0", | ||
| schema_url=_get_rpc_schema_url(sem_conv_opt_in_mode), | ||
| ) | ||
|
|
||
| return _server.OpenTelemetryServerInterceptor(tracer, filter_=filter_) | ||
| return _server.OpenTelemetryServerInterceptor( | ||
| tracer, filter_=filter_, sem_conv_opt_in_mode=sem_conv_opt_in_mode | ||
| ) | ||
|
|
||
|
|
||
| def aio_client_interceptors( | ||
| tracer_provider=None, filter_=None, request_hook=None, response_hook=None | ||
| tracer_provider=None, filter_=None, request_hook=None, response_hook=None, | ||
| host=None, port=None, | ||
| ): | ||
| """Create a gRPC client channel interceptor. | ||
|
|
||
| Args: | ||
| tracer: The tracer to use to create client-side spans. | ||
|
|
||
| host: Server hostname parsed from the channel target. | ||
| port: Server port parsed from the channel target. | ||
|
|
||
| Returns: | ||
| An invocation-side interceptor object. | ||
| """ | ||
| from . import _aio_client # noqa: PLC0415 | ||
|
|
||
| _OpenTelemetrySemanticConventionStability._initialize() | ||
| sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode( | ||
| _OpenTelemetryStabilitySignalType.RPC | ||
| ) | ||
|
|
||
| tracer = trace.get_tracer( | ||
| __name__, | ||
| __version__, | ||
| tracer_provider, | ||
| schema_url="https://opentelemetry.io/schemas/1.11.0", | ||
| schema_url=_get_rpc_schema_url(sem_conv_opt_in_mode), | ||
| ) | ||
|
|
||
| interceptor_kwargs = dict( | ||
| filter_=filter_, | ||
| request_hook=request_hook, | ||
| response_hook=response_hook, | ||
| sem_conv_opt_in_mode=sem_conv_opt_in_mode, | ||
| host=host, | ||
| port=port, | ||
| ) | ||
| return [ | ||
| _aio_client.UnaryUnaryAioClientInterceptor( | ||
| tracer, | ||
| filter_=filter_, | ||
| request_hook=request_hook, | ||
| response_hook=response_hook, | ||
| ), | ||
| _aio_client.UnaryStreamAioClientInterceptor( | ||
| tracer, | ||
| filter_=filter_, | ||
| request_hook=request_hook, | ||
| response_hook=response_hook, | ||
| ), | ||
| _aio_client.StreamUnaryAioClientInterceptor( | ||
| tracer, | ||
| filter_=filter_, | ||
| request_hook=request_hook, | ||
| response_hook=response_hook, | ||
| ), | ||
| _aio_client.StreamStreamAioClientInterceptor( | ||
| tracer, | ||
| filter_=filter_, | ||
| request_hook=request_hook, | ||
| response_hook=response_hook, | ||
| ), | ||
| _aio_client.UnaryUnaryAioClientInterceptor(tracer, **interceptor_kwargs), | ||
| _aio_client.UnaryStreamAioClientInterceptor(tracer, **interceptor_kwargs), | ||
| _aio_client.StreamUnaryAioClientInterceptor(tracer, **interceptor_kwargs), | ||
| _aio_client.StreamStreamAioClientInterceptor(tracer, **interceptor_kwargs), | ||
| ] | ||
|
|
||
|
|
||
|
|
@@ -685,15 +722,20 @@ def aio_server_interceptor(tracer_provider=None, filter_=None): | |
| """ | ||
| from . import _aio_server # noqa: PLC0415 | ||
|
|
||
| _OpenTelemetrySemanticConventionStability._initialize() | ||
| sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode( | ||
| _OpenTelemetryStabilitySignalType.RPC | ||
| ) | ||
|
|
||
| tracer = trace.get_tracer( | ||
| __name__, | ||
| __version__, | ||
| tracer_provider, | ||
| schema_url="https://opentelemetry.io/schemas/1.11.0", | ||
| schema_url=_get_rpc_schema_url(sem_conv_opt_in_mode), | ||
| ) | ||
|
|
||
| return _aio_server.OpenTelemetryAioServerInterceptor( | ||
| tracer, filter_=filter_ | ||
| tracer, filter_=filter_, sem_conv_opt_in_mode=sem_conv_opt_in_mode | ||
| ) | ||
|
|
||
|
|
||
|
|
@@ -715,3 +757,9 @@ def _parse_services(excluded_services: str) -> List[str]: | |
| else: | ||
| excluded_service_list = [] | ||
| return excluded_service_list | ||
|
|
||
| def _get_rpc_schema_url(mode: _StabilityMode) -> str: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have a similar helper in progress housed somewhere else, but it's mainly for instrumentors that export multiple types e.g. SQLAlchemy doing both |
||
| if mode is _StabilityMode.DEFAULT: | ||
| return "https://opentelemetry.io/schemas/1.11.0" | ||
| # TODO: update to 1.40.0 | ||
| return Schemas.V1_38_0.value | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. todo |
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for this 🧡 The readthedocs of each instrumentor were what we're thinking to properly document this, e.g. #4254