Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.

Commit 6b4a726

Browse files
authored
Merge pull request #41 from acroca/otel
Hook otel instrumentation when available
2 parents 3618423 + 73c141e commit 6b4a726

3 files changed

Lines changed: 63 additions & 23 deletions

File tree

durabletask/aio/client.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,13 @@
2424
new_orchestration_state,
2525
)
2626

27+
# If `opentelemetry-instrumentation-grpc` is available, enable the gRPC client interceptor
28+
try:
29+
from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient
30+
GrpcInstrumentorClient().instrument()
31+
except ImportError:
32+
pass
33+
2734

2835
class AsyncTaskHubGrpcClient:
2936
def __init__(

durabletask/client.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@
2121
TInput = TypeVar("TInput")
2222
TOutput = TypeVar("TOutput")
2323

24+
# If `opentelemetry-instrumentation-grpc` is available, enable the gRPC client interceptor
25+
try:
26+
from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient
27+
GrpcInstrumentorClient().instrument()
28+
except ImportError:
29+
pass
2430

2531
class OrchestrationStatus(Enum):
2632
"""The status of an orchestration instance."""

durabletask/worker.py

Lines changed: 50 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# Licensed under the MIT License.
33

44
import asyncio
5+
import contextlib
56
import inspect
67
import logging
78
import os
@@ -26,6 +27,17 @@
2627
TInput = TypeVar("TInput")
2728
TOutput = TypeVar("TOutput")
2829

30+
# If `opentelemetry-sdk` is available, enable the tracer
31+
try:
32+
from opentelemetry import trace
33+
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
34+
35+
otel_propagator = TraceContextTextMapPropagator()
36+
otel_tracer = trace.get_tracer(__name__)
37+
except ImportError:
38+
otel_tracer = None
39+
40+
2941

3042
class VersionNotRegisteredException(Exception):
3143
pass
@@ -839,31 +851,46 @@ def _execute_activity(
839851
completionToken,
840852
):
841853
instance_id = req.orchestrationInstance.instanceId
842-
try:
843-
executor = _ActivityExecutor(self._registry, self._logger)
844-
result = executor.execute(instance_id, req.name, req.taskId, req.input.value)
845-
res = pb.ActivityResponse(
846-
instanceId=instance_id,
847-
taskId=req.taskId,
848-
result=ph.get_string_value(result),
849-
completionToken=completionToken,
850-
)
851-
except Exception as ex:
852-
res = pb.ActivityResponse(
853-
instanceId=instance_id,
854-
taskId=req.taskId,
855-
failureDetails=ph.new_failure_details(ex),
856-
completionToken=completionToken,
857-
)
858854

859-
try:
860-
stub.CompleteActivityTask(res)
861-
except grpc.RpcError as rpc_error: # type: ignore
862-
self._handle_grpc_execution_error(rpc_error, "activity")
863-
except Exception as ex:
864-
self._logger.exception(
865-
f"Failed to deliver activity response for '{req.name}#{req.taskId}' of orchestration ID '{instance_id}' to sidecar: {ex}"
855+
if otel_tracer is not None:
856+
span_context = otel_tracer.start_as_current_span(
857+
name=f'activity: {req.name}',
858+
context=otel_propagator.extract(carrier={"traceparent": req.parentTraceContext.traceParent}),
859+
attributes={
860+
"durabletask.task.instance_id": instance_id,
861+
"durabletask.task.id": req.taskId,
862+
"durabletask.activity.name": req.name,
863+
}
866864
)
865+
else:
866+
span_context = contextlib.nullcontext()
867+
868+
with span_context:
869+
try:
870+
executor = _ActivityExecutor(self._registry, self._logger)
871+
result = executor.execute(instance_id, req.name, req.taskId, req.input.value)
872+
res = pb.ActivityResponse(
873+
instanceId=instance_id,
874+
taskId=req.taskId,
875+
result=ph.get_string_value(result),
876+
completionToken=completionToken,
877+
)
878+
except Exception as ex:
879+
res = pb.ActivityResponse(
880+
instanceId=instance_id,
881+
taskId=req.taskId,
882+
failureDetails=ph.new_failure_details(ex),
883+
completionToken=completionToken,
884+
)
885+
886+
try:
887+
stub.CompleteActivityTask(res)
888+
except grpc.RpcError as rpc_error: # type: ignore
889+
self._handle_grpc_execution_error(rpc_error, "activity")
890+
except Exception as ex:
891+
self._logger.exception(
892+
f"Failed to deliver activity response for '{req.name}#{req.taskId}' of orchestration ID '{instance_id}' to sidecar: {ex}"
893+
)
867894

868895

869896
class _RuntimeOrchestrationContext(

0 commit comments

Comments
 (0)