Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion durabletask/internal/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,14 +204,19 @@ def new_create_timer_action(id: int, fire_at: datetime) -> pb.OrchestratorAction


def new_schedule_task_action(
id: int, name: str, encoded_input: Optional[str], router: Optional[pb.TaskRouter] = None
id: int,
name: str,
encoded_input: Optional[str],
router: Optional[pb.TaskRouter] = None,
task_execution_id: str = "",
) -> pb.OrchestratorAction:
return pb.OrchestratorAction(
id=id,
scheduleTask=pb.ScheduleTaskAction(
name=name,
input=get_string_value(encoded_input),
router=router,
taskExecutionId=task_execution_id,
),
router=router,
)
Expand Down
30 changes: 27 additions & 3 deletions durabletask/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,16 +396,24 @@ class RetryableTask(CompletableTask[T]):
def __init__(
self,
retry_policy: RetryPolicy,
action: pb.OrchestratorAction,
start_time: datetime,
is_sub_orch: bool,
task_name: str,
encoded_input: Optional[str] = None,
task_execution_id: str = "",
instance_id: Optional[str] = None,
app_id: Optional[str] = None,
) -> None:
super().__init__()
self._action = action
self._retry_policy = retry_policy
self._attempt_count = 1
self._start_time = start_time
self._is_sub_orch = is_sub_orch
self._task_name = task_name
self._encoded_input = encoded_input
self._task_execution_id = task_execution_id
self._instance_id = instance_id
self._app_id = app_id

def increment_attempt_count(self) -> None:
self._attempt_count += 1
Expand Down Expand Up @@ -479,9 +487,10 @@ def when_any(tasks: list[Task]) -> WhenAnyTask:


class ActivityContext:
def __init__(self, orchestration_id: str, task_id: int):
def __init__(self, orchestration_id: str, task_id: int, task_execution_id: str = ""):
self._orchestration_id = orchestration_id
self._task_id = task_id
self._task_execution_id = task_execution_id

@property
def orchestration_id(self) -> str:
Expand Down Expand Up @@ -510,6 +519,21 @@ def task_id(self) -> int:
"""
return self._task_id

@property
def task_execution_id(self) -> str:
"""Get the task execution ID associated with this activity invocation.

The task execution ID is a UUID that is stable across retry attempts
of the same activity call. It can be used for idempotency and
deduplication when an activity may be retried.

Returns
-------
str
The task execution ID for this activity invocation.
"""
return self._task_execution_id


# Orchestrators are generators that yield tasks and receive/return any type
Orchestrator = Callable[[OrchestrationContext, TInput], Union[Generator[Task, Any, Any], TOutput]]
Expand Down
62 changes: 34 additions & 28 deletions durabletask/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,9 @@ def _execute_activity(
with span_context:
try:
executor = _ActivityExecutor(self._registry, self._logger)
result = executor.execute(instance_id, req.name, req.taskId, req.input.value)
result = executor.execute(
instance_id, req.name, req.taskId, req.input.value, req.taskExecutionId
)
res = pb.ActivityResponse(
instanceId=instance_id,
taskId=req.taskId,
Expand Down Expand Up @@ -1125,9 +1127,16 @@ def call_activity(
app_id: Optional[str] = None,
) -> task.Task[TOutput]:
id = self.next_sequence_number()
task_execution_id = str(self.new_guid())

self.call_activity_function_helper(
id, activity, input=input, retry_policy=retry_policy, is_sub_orch=False, app_id=app_id
id,
activity,
input=input,
retry_policy=retry_policy,
is_sub_orch=False,
app_id=app_id,
task_execution_id=task_execution_id,
)
return self._pending_tasks.get(id, task.CompletableTask())

Expand Down Expand Up @@ -1167,6 +1176,7 @@ def call_activity_function_helper(
instance_id: Optional[str] = None,
fn_task: Optional[task.CompletableTask[TOutput]] = None,
app_id: Optional[str] = None,
task_execution_id: str = "",
):
if id is None:
id = self.next_sequence_number()
Expand All @@ -1180,16 +1190,17 @@ def call_activity_function_helper(
if fn_task is None:
encoded_input = shared.to_json(input) if input is not None else None
else:
# Here, we don't need to convert the input to JSON because it is already converted.
# We just need to take string representation of it.
encoded_input = str(input)
# When retrying, input is already encoded as a string (or None).
encoded_input = str(input) if input is not None else None
if not is_sub_orch:
name = (
activity_function
if isinstance(activity_function, str)
else task.get_name(activity_function)
)
action = ph.new_schedule_task_action(id, name, encoded_input, router)
action = ph.new_schedule_task_action(
id, name, encoded_input, router, task_execution_id=task_execution_id
)
else:
if instance_id is None:
# Create a deteministic instance ID based on the parent instance ID
Expand All @@ -1207,9 +1218,13 @@ def call_activity_function_helper(
else:
fn_task = task.RetryableTask[TOutput](
retry_policy=retry_policy,
action=action,
start_time=self.current_utc_datetime,
is_sub_orch=is_sub_orch,
task_name=name if not is_sub_orch else activity_function,
encoded_input=encoded_input,
task_execution_id=task_execution_id,
instance_id=instance_id,
app_id=app_id,
)
self._pending_tasks[id] = fn_task

Expand Down Expand Up @@ -1429,28 +1444,18 @@ def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEven
return
timer_task.complete(None)
if timer_task._retryable_parent is not None:
activity_action = timer_task._retryable_parent._action

if not timer_task._retryable_parent._is_sub_orch:
cur_task = activity_action.scheduleTask
instance_id = None
else:
cur_task = activity_action.createSubOrchestration
instance_id = cur_task.instanceId
if cur_task.router and cur_task.router.targetAppID:
target_app_id = cur_task.router.targetAppID
else:
target_app_id = None
retryable = timer_task._retryable_parent

ctx.call_activity_function_helper(
id=activity_action.id,
activity_function=cur_task.name,
input=cur_task.input.value,
retry_policy=timer_task._retryable_parent._retry_policy,
is_sub_orch=timer_task._retryable_parent._is_sub_orch,
instance_id=instance_id,
fn_task=timer_task._retryable_parent,
app_id=target_app_id,
id=None, # Get a new sequence number
activity_function=retryable._task_name,
input=retryable._encoded_input,
retry_policy=retryable._retry_policy,
is_sub_orch=retryable._is_sub_orch,
instance_id=retryable._instance_id,
fn_task=retryable,
app_id=retryable._app_id,
task_execution_id=retryable._task_execution_id,
)
else:
ctx.resume()
Expand Down Expand Up @@ -1682,6 +1687,7 @@ def execute(
name: str,
task_id: int,
encoded_input: Optional[str],
task_execution_id: str = "",
) -> Optional[str]:
"""Executes an activity function and returns the serialized result, if any."""
self._logger.debug(f"{orchestration_id}/{task_id}: Executing activity '{name}'...")
Expand All @@ -1692,7 +1698,7 @@ def execute(
)

activity_input = shared.from_json(encoded_input) if encoded_input else None
ctx = task.ActivityContext(orchestration_id, task_id)
ctx = task.ActivityContext(orchestration_id, task_id, task_execution_id)

# Execute the activity function
activity_output = fn(ctx, activity_input)
Expand Down
7 changes: 4 additions & 3 deletions tests/durabletask/test_orchestration_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,8 +500,9 @@ def throw_activity_with_retry(ctx: task.ActivityContext, _):
assert state.runtime_status == client.OrchestrationStatus.FAILED
assert state.failure_details is not None
assert state.failure_details.error_type == "TaskFailedError"
assert state.failure_details.message.startswith("Sub-orchestration task #1 failed:")
assert state.failure_details.message.endswith("Activity task #1 failed: Kah-BOOOOM!!!")
assert "Sub-orchestration task #" in state.failure_details.message
assert "failed:" in state.failure_details.message
assert state.failure_details.message.endswith("failed: Kah-BOOOOM!!!")
assert state.failure_details.stack_trace is not None
assert throw_activity_counter == 9
assert child_orch_counter == 3
Expand Down Expand Up @@ -568,7 +569,7 @@ def throw_activity(ctx: task.ActivityContext, _):
assert state.runtime_status == client.OrchestrationStatus.FAILED
assert state.failure_details is not None
assert state.failure_details.error_type == "TaskFailedError"
assert state.failure_details.message.endswith("Activity task #1 failed: Kah-BOOOOM!!!")
assert state.failure_details.message.endswith("failed: Kah-BOOOOM!!!")
assert state.failure_details.stack_trace is not None
assert throw_activity_counter == 4

Expand Down
7 changes: 4 additions & 3 deletions tests/durabletask/test_orchestration_e2e_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,8 +414,9 @@ def throw_activity_with_retry(ctx: task.ActivityContext, _):
assert state.runtime_status == OrchestrationStatus.FAILED
assert state.failure_details is not None
assert state.failure_details.error_type == "TaskFailedError"
assert state.failure_details.message.startswith("Sub-orchestration task #1 failed:")
assert state.failure_details.message.endswith("Activity task #1 failed: Kah-BOOOOM!!!")
assert "Sub-orchestration task #" in state.failure_details.message
assert "failed:" in state.failure_details.message
assert state.failure_details.message.endswith("failed: Kah-BOOOOM!!!")
assert state.failure_details.stack_trace is not None
assert throw_activity_counter == 9
assert child_orch_counter == 3
Expand Down Expand Up @@ -455,7 +456,7 @@ def throw_activity(ctx: task.ActivityContext, _):
assert state.runtime_status == OrchestrationStatus.FAILED
assert state.failure_details is not None
assert state.failure_details.error_type == "TaskFailedError"
assert state.failure_details.message.endswith("Activity task #1 failed: Kah-BOOOOM!!!")
assert state.failure_details.message.endswith("failed: Kah-BOOOOM!!!")
assert state.failure_details.stack_trace is not None
assert throw_activity_counter == 4

Expand Down
Loading
Loading