Skip to content

Commit feae9f7

Browse files
committed
PR feedback, instrument async client
1 parent ff13570 commit feae9f7

File tree

5 files changed

+30
-12
lines changed

5 files changed

+30
-12
lines changed

durabletask/client.py

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,13 @@ def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInpu
190190
reuse_id_policy=reuse_id_policy, tags=tags,
191191
version=version if version else self.default_version)
192192

193+
# Inject the active PRODUCER span context into the request so the sidecar
194+
# stores it in the executionStarted event and the worker can parent all
195+
# orchestration/activity/timer spans under this trace.
196+
parent_trace_ctx = tracing.get_current_trace_context()
197+
if parent_trace_ctx is not None:
198+
req.parentTraceContext.CopyFrom(parent_trace_ctx)
199+
193200
self._logger.info(f"Starting new '{req.name}' instance with ID = '{req.instanceId}'.")
194201
res: pb.CreateInstanceResponse = self._stub.StartInstance(req)
195202
return res.instanceId
@@ -427,14 +434,25 @@ async def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator
427434
tags: Optional[dict[str, str]] = None,
428435
version: Optional[str] = None) -> str:
429436

430-
req = build_schedule_new_orchestration_req(
431-
orchestrator, input=input, instance_id=instance_id, start_at=start_at,
432-
reuse_id_policy=reuse_id_policy, tags=tags,
433-
version=version if version else self.default_version)
437+
name = orchestrator if isinstance(orchestrator, str) else task.get_name(orchestrator)
438+
resolved_instance_id = instance_id if instance_id else uuid.uuid4().hex
439+
resolved_version = version if version else self.default_version
434440

435-
self._logger.info(f"Starting new '{req.name}' instance with ID = '{req.instanceId}'.")
436-
res: pb.CreateInstanceResponse = await self._stub.StartInstance(req)
437-
return res.instanceId
441+
with tracing.start_create_orchestration_span(
442+
name, resolved_instance_id, version=resolved_version,
443+
):
444+
req = build_schedule_new_orchestration_req(
445+
orchestrator, input=input, instance_id=instance_id, start_at=start_at,
446+
reuse_id_policy=reuse_id_policy, tags=tags,
447+
version=version if version else self.default_version)
448+
449+
parent_trace_ctx = tracing.get_current_trace_context()
450+
if parent_trace_ctx is not None:
451+
req.parentTraceContext.CopyFrom(parent_trace_ctx)
452+
453+
self._logger.info(f"Starting new '{req.name}' instance with ID = '{req.instanceId}'.")
454+
res: pb.CreateInstanceResponse = await self._stub.StartInstance(req)
455+
return res.instanceId
438456

439457
async def get_orchestration_state(self, instance_id: str, *,
440458
fetch_payloads: bool = True) -> Optional[OrchestrationState]:
@@ -496,10 +514,10 @@ async def wait_for_orchestration_completion(self, instance_id: str, *,
496514

497515
async def raise_orchestration_event(self, instance_id: str, event_name: str, *,
498516
data: Optional[Any] = None) -> None:
499-
req = build_raise_event_req(instance_id, event_name, data)
500-
501-
self._logger.info(f"Raising event '{event_name}' for instance '{instance_id}'.")
502-
await self._stub.RaiseEvent(req)
517+
with tracing.start_raise_event_span(event_name, instance_id):
518+
req = build_raise_event_req(instance_id, event_name, data)
519+
self._logger.info(f"Raising event '{event_name}' for instance '{instance_id}'.")
520+
await self._stub.RaiseEvent(req)
503521

504522
async def terminate_orchestration(self, instance_id: str, *,
505523
output: Optional[Any] = None,

examples/distributed-tracing/app.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ def weather_report_orchestrator(ctx: task.OrchestrationContext, cities: list):
9797
3. Call an activity to summarize the results.
9898
"""
9999
# Step 1 — Timer: wait briefly before starting work
100-
yield ctx.create_timer(timedelta(seconds=2))
100+
yield ctx.create_timer(timedelta(milliseconds=100))
101101
if not ctx.is_replaying:
102102
print(" [Orchestrator] Timer fired — starting weather collection")
103103

-3.05 KB
Loading
10.3 KB
Loading
32.7 KB
Loading

0 commit comments

Comments
 (0)