Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -267,25 +267,47 @@ async def serve():
``filter_`` option also applies to both global and manual client intrumentors.


Environment variable
--------------------
Environment variables
---------------------

If you'd like to exclude specific services for the instrumentations, you can use
``OTEL_PYTHON_GRPC_EXCLUDED_SERVICES`` environment variables.
``OTEL_PYTHON_GRPC_EXCLUDED_SERVICES``
Comma-separated list of service names to exclude from instrumentation.
For example, ``"GRPCTestServer,GRPCHealthServer"`` will exclude those services.

For example, if you assign ``"GRPCTestServer,GRPCHealthServer"`` to the variable,
then the global interceptor automatically adds the filters to exclude requests to
services ``GRPCTestServer`` and ``GRPCHealthServer``.
``OTEL_SEMCONV_STABILITY_OPT_IN``
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this 🧡 The readthedocs of each instrumentor were what we're thinking to properly document this, e.g. #4254

Controls which version of the RPC semantic conventions the instrumentation
emits. Accepted values (comma-separated):

- ``rpc`` — emit only the stable (new) RPC conventions. Key changes:

- ``rpc.system`` → ``rpc.system.name``
- ``rpc.grpc.status_code`` (int) → ``rpc.response.status_code`` (string,
e.g. ``"OK"``, ``"UNAVAILABLE"``); non-OK codes also set ``error.type``
- ``rpc.method`` now contains the fully-qualified name
(e.g. ``"helloworld.Greeter/SayHello"``); ``rpc.service`` is removed
- ``net.peer.ip`` / ``net.peer.name`` / ``net.peer.port`` (server spans)
→ ``client.address`` / ``client.port``

- ``rpc/dup`` — emit both old and new RPC conventions simultaneously,
useful for a phased rollout.

- *(default, no value)* — continue emitting the old RPC conventions.

"""

import os
from typing import Callable, Collection, List, Union
from typing import Callable, Collection, List, Optional, Tuple, Union

import grpc # pylint:disable=import-self
from wrapt import wrap_function_wrapper as _wrap

from opentelemetry import trace
from opentelemetry.instrumentation._semconv import (
_StabilityMode,
_OpenTelemetrySemanticConventionStability,
_OpenTelemetryStabilitySignalType,
)
from opentelemetry.instrumentation.grpc._semconv import _parse_grpc_target
from opentelemetry.instrumentation.grpc.filters import (
any_of,
negate,
Expand All @@ -296,6 +318,7 @@ async def serve():
from opentelemetry.instrumentation.grpc.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.semconv.schemas import Schemas

# pylint:disable=import-outside-toplevel
# pylint:disable=import-self
Expand Down Expand Up @@ -482,15 +505,17 @@ def _uninstrument(self, **kwargs):
def wrapper_fn(self, original_func, instance, args, kwargs):
channel = original_func(*args, **kwargs)
tracer_provider = kwargs.get("tracer_provider")
request_hook = self._request_hook
response_hook = self._response_hook
target = args[0] if args else kwargs.get("target", "")
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: are there other ways to pass target /host:port to public api?

host, port = _parse_grpc_target(target)
return intercept_channel(
channel,
client_interceptor(
tracer_provider=tracer_provider,
filter_=self._filter,
request_hook=request_hook,
response_hook=response_hook,
request_hook=self._request_hook,
response_hook=self._response_hook,
host=host,
port=port,
),
)

Expand Down Expand Up @@ -522,26 +547,21 @@ def __init__(self, filter_=None):
def instrumentation_dependencies(self) -> Collection[str]:
return _instruments

def _add_interceptors(self, tracer_provider, kwargs):
def _add_interceptors(self, tracer_provider, args, kwargs):
target = args[0] if args else kwargs.get("target", "")
host, port = _parse_grpc_target(target)
ours = aio_client_interceptors(
tracer_provider=tracer_provider,
filter_=self._filter,
request_hook=self._request_hook,
response_hook=self._response_hook,
host=host,
port=port,
)
if "interceptors" in kwargs and kwargs["interceptors"]:
kwargs["interceptors"] = list(kwargs["interceptors"])
kwargs["interceptors"] = (
aio_client_interceptors(
tracer_provider=tracer_provider,
filter_=self._filter,
request_hook=self._request_hook,
response_hook=self._response_hook,
)
+ kwargs["interceptors"]
)
kwargs["interceptors"] = ours + list(kwargs["interceptors"])
else:
kwargs["interceptors"] = aio_client_interceptors(
tracer_provider=tracer_provider,
filter_=self._filter,
request_hook=self._request_hook,
response_hook=self._response_hook,
)

kwargs["interceptors"] = ours
return kwargs

def _instrument(self, **kwargs):
Expand All @@ -552,13 +572,11 @@ def _instrument(self, **kwargs):
tracer_provider = kwargs.get("tracer_provider")

def insecure(*args, **kwargs):
kwargs = self._add_interceptors(tracer_provider, kwargs)

kwargs = self._add_interceptors(tracer_provider, args, kwargs)
return self._original_insecure(*args, **kwargs)

def secure(*args, **kwargs):
kwargs = self._add_interceptors(tracer_provider, kwargs)

kwargs = self._add_interceptors(tracer_provider, args, kwargs)
return self._original_secure(*args, **kwargs)

grpc.aio.insecure_channel = insecure
Expand All @@ -570,7 +588,8 @@ def _uninstrument(self, **kwargs):


def client_interceptor(
tracer_provider=None, filter_=None, request_hook=None, response_hook=None
tracer_provider=None, filter_=None, request_hook=None, response_hook=None,
host=None, port=None,
):
"""Create a gRPC client channel interceptor.

Expand All @@ -581,23 +600,37 @@ def client_interceptor(
matches the condition. Default is None and intercept
all requests.

host: Server hostname parsed from the channel target. Used to set
``server.address`` / ``net.peer.name`` on client spans.

port: Server port parsed from the channel target. Used to set
``server.port`` / ``net.peer.port`` on client spans.

Returns:
An invocation-side interceptor object.
"""
from . import _client # noqa: PLC0415

_OpenTelemetrySemanticConventionStability._initialize()
sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode(
_OpenTelemetryStabilitySignalType.RPC
)

tracer = trace.get_tracer(
__name__,
__version__,
tracer_provider,
schema_url="https://opentelemetry.io/schemas/1.11.0",
schema_url=_get_rpc_schema_url(sem_conv_opt_in_mode),
)

return _client.OpenTelemetryClientInterceptor(
tracer,
filter_=filter_,
request_hook=request_hook,
response_hook=response_hook,
sem_conv_opt_in_mode=sem_conv_opt_in_mode,
host=host,
port=port,
)


Expand All @@ -616,61 +649,65 @@ def server_interceptor(tracer_provider=None, filter_=None):
"""
from . import _server # noqa: PLC0415

_OpenTelemetrySemanticConventionStability._initialize()
sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode(
_OpenTelemetryStabilitySignalType.RPC
)

tracer = trace.get_tracer(
__name__,
__version__,
tracer_provider,
schema_url="https://opentelemetry.io/schemas/1.11.0",
schema_url=_get_rpc_schema_url(sem_conv_opt_in_mode),
)

return _server.OpenTelemetryServerInterceptor(tracer, filter_=filter_)
return _server.OpenTelemetryServerInterceptor(
tracer, filter_=filter_, sem_conv_opt_in_mode=sem_conv_opt_in_mode
)


def aio_client_interceptors(
tracer_provider=None, filter_=None, request_hook=None, response_hook=None
tracer_provider=None, filter_=None, request_hook=None, response_hook=None,
host=None, port=None,
):
"""Create a gRPC client channel interceptor.

Args:
tracer: The tracer to use to create client-side spans.

host: Server hostname parsed from the channel target.
port: Server port parsed from the channel target.

Returns:
An invocation-side interceptor object.
"""
from . import _aio_client # noqa: PLC0415

_OpenTelemetrySemanticConventionStability._initialize()
sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode(
_OpenTelemetryStabilitySignalType.RPC
)

tracer = trace.get_tracer(
__name__,
__version__,
tracer_provider,
schema_url="https://opentelemetry.io/schemas/1.11.0",
schema_url=_get_rpc_schema_url(sem_conv_opt_in_mode),
)

interceptor_kwargs = dict(
filter_=filter_,
request_hook=request_hook,
response_hook=response_hook,
sem_conv_opt_in_mode=sem_conv_opt_in_mode,
host=host,
port=port,
)
return [
_aio_client.UnaryUnaryAioClientInterceptor(
tracer,
filter_=filter_,
request_hook=request_hook,
response_hook=response_hook,
),
_aio_client.UnaryStreamAioClientInterceptor(
tracer,
filter_=filter_,
request_hook=request_hook,
response_hook=response_hook,
),
_aio_client.StreamUnaryAioClientInterceptor(
tracer,
filter_=filter_,
request_hook=request_hook,
response_hook=response_hook,
),
_aio_client.StreamStreamAioClientInterceptor(
tracer,
filter_=filter_,
request_hook=request_hook,
response_hook=response_hook,
),
_aio_client.UnaryUnaryAioClientInterceptor(tracer, **interceptor_kwargs),
_aio_client.UnaryStreamAioClientInterceptor(tracer, **interceptor_kwargs),
_aio_client.StreamUnaryAioClientInterceptor(tracer, **interceptor_kwargs),
_aio_client.StreamStreamAioClientInterceptor(tracer, **interceptor_kwargs),
]


Expand All @@ -685,15 +722,20 @@ def aio_server_interceptor(tracer_provider=None, filter_=None):
"""
from . import _aio_server # noqa: PLC0415

_OpenTelemetrySemanticConventionStability._initialize()
sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode(
_OpenTelemetryStabilitySignalType.RPC
)

tracer = trace.get_tracer(
__name__,
__version__,
tracer_provider,
schema_url="https://opentelemetry.io/schemas/1.11.0",
schema_url=_get_rpc_schema_url(sem_conv_opt_in_mode),
)

return _aio_server.OpenTelemetryAioServerInterceptor(
tracer, filter_=filter_
tracer, filter_=filter_, sem_conv_opt_in_mode=sem_conv_opt_in_mode
)


Expand All @@ -715,3 +757,9 @@ def _parse_services(excluded_services: str) -> List[str]:
else:
excluded_service_list = []
return excluded_service_list

def _get_rpc_schema_url(mode: _StabilityMode) -> str:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a similar helper in progress housed somewhere else, but it's mainly for instrumentors that export multiple types e.g. SQLAlchemy doing both HTTP and DATABASE. In distant future and if it makes sense, would be good to have just one.

if mode is _StabilityMode.DEFAULT:
return "https://opentelemetry.io/schemas/1.11.0"
# TODO: update to 1.40.0
return Schemas.V1_38_0.value
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo

Loading
Loading