diff --git a/durabletask/internal/helpers.py b/durabletask/internal/helpers.py index 3f04728..501de75 100644 --- a/durabletask/internal/helpers.py +++ b/durabletask/internal/helpers.py @@ -204,7 +204,11 @@ def new_create_timer_action(id: int, fire_at: datetime) -> pb.OrchestratorAction def new_schedule_task_action( - id: int, name: str, encoded_input: Optional[str], router: Optional[pb.TaskRouter] = None + id: int, + name: str, + encoded_input: Optional[str], + router: Optional[pb.TaskRouter] = None, + task_execution_id: str = "", ) -> pb.OrchestratorAction: return pb.OrchestratorAction( id=id, @@ -212,6 +216,7 @@ def new_schedule_task_action( name=name, input=get_string_value(encoded_input), router=router, + taskExecutionId=task_execution_id, ), router=router, ) diff --git a/durabletask/task.py b/durabletask/task.py index 3eaf9a2..83750ff 100644 --- a/durabletask/task.py +++ b/durabletask/task.py @@ -396,16 +396,24 @@ class RetryableTask(CompletableTask[T]): def __init__( self, retry_policy: RetryPolicy, - action: pb.OrchestratorAction, start_time: datetime, is_sub_orch: bool, + task_name: str, + encoded_input: Optional[str] = None, + task_execution_id: str = "", + instance_id: Optional[str] = None, + app_id: Optional[str] = None, ) -> None: super().__init__() - self._action = action self._retry_policy = retry_policy self._attempt_count = 1 self._start_time = start_time self._is_sub_orch = is_sub_orch + self._task_name = task_name + self._encoded_input = encoded_input + self._task_execution_id = task_execution_id + self._instance_id = instance_id + self._app_id = app_id def increment_attempt_count(self) -> None: self._attempt_count += 1 @@ -479,9 +487,10 @@ def when_any(tasks: list[Task]) -> WhenAnyTask: class ActivityContext: - def __init__(self, orchestration_id: str, task_id: int): + def __init__(self, orchestration_id: str, task_id: int, task_execution_id: str = ""): self._orchestration_id = orchestration_id self._task_id = task_id + self._task_execution_id = task_execution_id @property def orchestration_id(self) -> str: @@ -510,6 +519,21 @@ def task_id(self) -> int: """ return self._task_id + @property + def task_execution_id(self) -> str: + """Get the task execution ID associated with this activity invocation. + + The task execution ID is a UUID that is stable across retry attempts + of the same activity call. It can be used for idempotency and + deduplication when an activity may be retried. + + Returns + ------- + str + The task execution ID for this activity invocation. + """ + return self._task_execution_id + # Orchestrators are generators that yield tasks and receive/return any type Orchestrator = Callable[[OrchestrationContext, TInput], Union[Generator[Task, Any, Any], TOutput]] diff --git a/durabletask/worker.py b/durabletask/worker.py index 165b98c..13f13d8 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -888,7 +888,9 @@ def _execute_activity( with span_context: try: executor = _ActivityExecutor(self._registry, self._logger) - result = executor.execute(instance_id, req.name, req.taskId, req.input.value) + result = executor.execute( + instance_id, req.name, req.taskId, req.input.value, req.taskExecutionId + ) res = pb.ActivityResponse( instanceId=instance_id, taskId=req.taskId, @@ -1125,9 +1127,16 @@ def call_activity( app_id: Optional[str] = None, ) -> task.Task[TOutput]: id = self.next_sequence_number() + task_execution_id = str(self.new_guid()) self.call_activity_function_helper( - id, activity, input=input, retry_policy=retry_policy, is_sub_orch=False, app_id=app_id + id, + activity, + input=input, + retry_policy=retry_policy, + is_sub_orch=False, + app_id=app_id, + task_execution_id=task_execution_id, ) return self._pending_tasks.get(id, task.CompletableTask()) @@ -1167,6 +1176,7 @@ def call_activity_function_helper( instance_id: Optional[str] = None, fn_task: Optional[task.CompletableTask[TOutput]] = None, app_id: Optional[str] = None, + task_execution_id: str = "", ): if id is None: id = self.next_sequence_number() @@ -1180,16 +1190,17 @@ def call_activity_function_helper( if fn_task is None: encoded_input = shared.to_json(input) if input is not None else None else: - # Here, we don't need to convert the input to JSON because it is already converted. - # We just need to take string representation of it. - encoded_input = str(input) + # When retrying, input is already encoded as a string (or None). + encoded_input = str(input) if input is not None else None if not is_sub_orch: name = ( activity_function if isinstance(activity_function, str) else task.get_name(activity_function) ) - action = ph.new_schedule_task_action(id, name, encoded_input, router) + action = ph.new_schedule_task_action( + id, name, encoded_input, router, task_execution_id=task_execution_id + ) else: if instance_id is None: # Create a deteministic instance ID based on the parent instance ID @@ -1207,9 +1218,13 @@ def call_activity_function_helper( else: fn_task = task.RetryableTask[TOutput]( retry_policy=retry_policy, - action=action, start_time=self.current_utc_datetime, is_sub_orch=is_sub_orch, + task_name=name if not is_sub_orch else activity_function, + encoded_input=encoded_input, + task_execution_id=task_execution_id, + instance_id=instance_id, + app_id=app_id, ) self._pending_tasks[id] = fn_task @@ -1429,28 +1444,18 @@ def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEven return timer_task.complete(None) if timer_task._retryable_parent is not None: - activity_action = timer_task._retryable_parent._action - - if not timer_task._retryable_parent._is_sub_orch: - cur_task = activity_action.scheduleTask - instance_id = None - else: - cur_task = activity_action.createSubOrchestration - instance_id = cur_task.instanceId - if cur_task.router and cur_task.router.targetAppID: - target_app_id = cur_task.router.targetAppID - else: - target_app_id = None + retryable = timer_task._retryable_parent ctx.call_activity_function_helper( - id=activity_action.id, - activity_function=cur_task.name, - input=cur_task.input.value, - retry_policy=timer_task._retryable_parent._retry_policy, - is_sub_orch=timer_task._retryable_parent._is_sub_orch, - instance_id=instance_id, - fn_task=timer_task._retryable_parent, - app_id=target_app_id, + id=None, # Get a new sequence number + activity_function=retryable._task_name, + input=retryable._encoded_input, + retry_policy=retryable._retry_policy, + is_sub_orch=retryable._is_sub_orch, + instance_id=retryable._instance_id, + fn_task=retryable, + app_id=retryable._app_id, + task_execution_id=retryable._task_execution_id, ) else: ctx.resume() @@ -1682,6 +1687,7 @@ def execute( name: str, task_id: int, encoded_input: Optional[str], + task_execution_id: str = "", ) -> Optional[str]: """Executes an activity function and returns the serialized result, if any.""" self._logger.debug(f"{orchestration_id}/{task_id}: Executing activity '{name}'...") @@ -1692,7 +1698,7 @@ def execute( ) activity_input = shared.from_json(encoded_input) if encoded_input else None - ctx = task.ActivityContext(orchestration_id, task_id) + ctx = task.ActivityContext(orchestration_id, task_id, task_execution_id) # Execute the activity function activity_output = fn(ctx, activity_input) diff --git a/tests/durabletask/test_orchestration_e2e.py b/tests/durabletask/test_orchestration_e2e.py index 181d71d..f3cd56c 100644 --- a/tests/durabletask/test_orchestration_e2e.py +++ b/tests/durabletask/test_orchestration_e2e.py @@ -500,8 +500,9 @@ def throw_activity_with_retry(ctx: task.ActivityContext, _): assert state.runtime_status == client.OrchestrationStatus.FAILED assert state.failure_details is not None assert state.failure_details.error_type == "TaskFailedError" - assert state.failure_details.message.startswith("Sub-orchestration task #1 failed:") - assert state.failure_details.message.endswith("Activity task #1 failed: Kah-BOOOOM!!!") + assert "Sub-orchestration task #" in state.failure_details.message + assert "failed:" in state.failure_details.message + assert state.failure_details.message.endswith("failed: Kah-BOOOOM!!!") assert state.failure_details.stack_trace is not None assert throw_activity_counter == 9 assert child_orch_counter == 3 @@ -568,7 +569,7 @@ def throw_activity(ctx: task.ActivityContext, _): assert state.runtime_status == client.OrchestrationStatus.FAILED assert state.failure_details is not None assert state.failure_details.error_type == "TaskFailedError" - assert state.failure_details.message.endswith("Activity task #1 failed: Kah-BOOOOM!!!") + assert state.failure_details.message.endswith("failed: Kah-BOOOOM!!!") assert state.failure_details.stack_trace is not None assert throw_activity_counter == 4 diff --git a/tests/durabletask/test_orchestration_e2e_async.py b/tests/durabletask/test_orchestration_e2e_async.py index b71e70b..b2f3003 100644 --- a/tests/durabletask/test_orchestration_e2e_async.py +++ b/tests/durabletask/test_orchestration_e2e_async.py @@ -414,8 +414,9 @@ def throw_activity_with_retry(ctx: task.ActivityContext, _): assert state.runtime_status == OrchestrationStatus.FAILED assert state.failure_details is not None assert state.failure_details.error_type == "TaskFailedError" - assert state.failure_details.message.startswith("Sub-orchestration task #1 failed:") - assert state.failure_details.message.endswith("Activity task #1 failed: Kah-BOOOOM!!!") + assert "Sub-orchestration task #" in state.failure_details.message + assert "failed:" in state.failure_details.message + assert state.failure_details.message.endswith("failed: Kah-BOOOOM!!!") assert state.failure_details.stack_trace is not None assert throw_activity_counter == 9 assert child_orch_counter == 3 @@ -455,7 +456,7 @@ def throw_activity(ctx: task.ActivityContext, _): assert state.runtime_status == OrchestrationStatus.FAILED assert state.failure_details is not None assert state.failure_details.error_type == "TaskFailedError" - assert state.failure_details.message.endswith("Activity task #1 failed: Kah-BOOOOM!!!") + assert state.failure_details.message.endswith("failed: Kah-BOOOOM!!!") assert state.failure_details.stack_trace is not None assert throw_activity_counter == 4 diff --git a/tests/durabletask/test_orchestration_executor.py b/tests/durabletask/test_orchestration_executor.py index bf81f26..9dc8ec3 100644 --- a/tests/durabletask/test_orchestration_executor.py +++ b/tests/durabletask/test_orchestration_executor.py @@ -313,7 +313,19 @@ def orchestrator(ctx: task.OrchestrationContext, orchestrator_input): def test_activity_retry_policies(): - """Tests the retry policy logic for activity tasks""" + """Tests the retry policy logic for activity tasks. + + Each retry attempt gets a NEW sequence number (event ID). The + taskExecutionId remains the same across all retry attempts. + + Sequence of IDs: + Attempt 1: scheduleTask(id=1) + Retry timer 1: createTimer(id=2) + Attempt 2: scheduleTask(id=3) + Retry timer 2: createTimer(id=4) + Attempt 3: scheduleTask(id=5) + ... and so on + """ def dummy_activity(ctx, _): raise ValueError("Kah-BOOOOM!!!") @@ -336,7 +348,8 @@ def orchestrator(ctx: task.OrchestrationContext, orchestrator_input): name = registry.add_orchestrator(orchestrator) current_timestamp = datetime.utcnow() - # Simulate the task failing for the first time and confirm that a timer is scheduled for 1 second in the future + + # --- Attempt 1: scheduleTask(id=1) fails --- old_events = [ helpers.new_orchestrator_started_event(timestamp=current_timestamp), helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None), @@ -356,151 +369,169 @@ def orchestrator(ctx: task.OrchestrationContext, orchestrator_input): assert actions[0].createTimer.fireAt.ToDatetime() == expected_fire_at assert actions[0].id == 2 - # Simulate the timer firing at the expected time and confirm that another activity task is scheduled + # --- Timer fires, retry schedules scheduleTask(id=3) --- current_timestamp = expected_fire_at old_events = old_events + new_events new_events = [ helpers.new_orchestrator_started_event(current_timestamp), + helpers.new_timer_created_event(2, current_timestamp), helpers.new_timer_fired_event(2, current_timestamp), ] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) actions = result.actions - assert len(actions) == 2 - assert actions[1].HasField("scheduleTask") - assert actions[1].id == 1 + assert len(actions) == 1 + assert actions[0].HasField("scheduleTask") + assert actions[0].id == 3 # NEW sequence number for retry + # Capture the taskExecutionId from first retry - it must be non-empty + # and consistent across ALL retry attempts + retry_task_execution_id = actions[0].scheduleTask.taskExecutionId + assert retry_task_execution_id != "", "taskExecutionId must be non-empty" - # Simulate the task failing for the second time and confirm that a timer is scheduled for 2 seconds in the future + # --- Attempt 2: scheduleTask(id=3) fails --- old_events = old_events + new_events expected_fire_at = current_timestamp + timedelta(seconds=2) new_events = [ helpers.new_orchestrator_started_event(current_timestamp), - helpers.new_task_failed_event(1, ValueError("Kah-BOOOOM!!!")), + helpers.new_task_scheduled_event(3, task.get_name(dummy_activity)), + helpers.new_task_failed_event(3, ValueError("Kah-BOOOOM!!!")), ] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) actions = result.actions - assert len(actions) == 3 - assert actions[2].HasField("createTimer") - assert actions[2].createTimer.fireAt.ToDatetime() == expected_fire_at - assert actions[2].id == 3 + assert len(actions) == 1 + assert actions[0].HasField("createTimer") + assert actions[0].createTimer.fireAt.ToDatetime() == expected_fire_at + assert actions[0].id == 4 - # Simulate the timer firing at the expected time and confirm that another activity task is scheduled + # --- Timer fires, retry schedules scheduleTask(id=5) --- current_timestamp = expected_fire_at old_events = old_events + new_events new_events = [ helpers.new_orchestrator_started_event(current_timestamp), - helpers.new_timer_fired_event(3, current_timestamp), + helpers.new_timer_created_event(4, current_timestamp), + helpers.new_timer_fired_event(4, current_timestamp), ] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) actions = result.actions - assert len(actions) == 3 - assert actions[1].HasField("scheduleTask") - assert actions[1].id == 1 + assert len(actions) == 1 + assert actions[0].HasField("scheduleTask") + assert actions[0].id == 5 + assert actions[0].scheduleTask.taskExecutionId == retry_task_execution_id - # Simulate the task failing for a third time and confirm that a timer is scheduled for 4 seconds in the future + # --- Attempt 3: scheduleTask(id=5) fails --- expected_fire_at = current_timestamp + timedelta(seconds=4) old_events = old_events + new_events new_events = [ helpers.new_orchestrator_started_event(current_timestamp), - helpers.new_task_failed_event(1, ValueError("Kah-BOOOOM!!!")), + helpers.new_task_scheduled_event(5, task.get_name(dummy_activity)), + helpers.new_task_failed_event(5, ValueError("Kah-BOOOOM!!!")), ] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) actions = result.actions - assert len(actions) == 4 - assert actions[3].HasField("createTimer") - assert actions[3].createTimer.fireAt.ToDatetime() == expected_fire_at - assert actions[3].id == 4 + assert len(actions) == 1 + assert actions[0].HasField("createTimer") + assert actions[0].createTimer.fireAt.ToDatetime() == expected_fire_at + assert actions[0].id == 6 - # Simulate the timer firing at the expected time and confirm that another activity task is scheduled + # --- Timer fires, retry schedules scheduleTask(id=7) --- current_timestamp = expected_fire_at old_events = old_events + new_events new_events = [ helpers.new_orchestrator_started_event(current_timestamp), - helpers.new_timer_fired_event(4, current_timestamp), + helpers.new_timer_created_event(6, current_timestamp), + helpers.new_timer_fired_event(6, current_timestamp), ] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) actions = result.actions - assert len(actions) == 4 - assert actions[1].HasField("scheduleTask") - assert actions[1].id == 1 + assert len(actions) == 1 + assert actions[0].HasField("scheduleTask") + assert actions[0].id == 7 + assert actions[0].scheduleTask.taskExecutionId == retry_task_execution_id - # Simulate the task failing for a fourth time and confirm that a timer is scheduled for 8 seconds in the future + # --- Attempt 4: scheduleTask(id=7) fails --- expected_fire_at = current_timestamp + timedelta(seconds=8) old_events = old_events + new_events new_events = [ helpers.new_orchestrator_started_event(current_timestamp), - helpers.new_task_failed_event(1, ValueError("Kah-BOOOOM!!!")), + helpers.new_task_scheduled_event(7, task.get_name(dummy_activity)), + helpers.new_task_failed_event(7, ValueError("Kah-BOOOOM!!!")), ] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) actions = result.actions - assert len(actions) == 5 - assert actions[4].HasField("createTimer") - assert actions[4].createTimer.fireAt.ToDatetime() == expected_fire_at - assert actions[4].id == 5 + assert len(actions) == 1 + assert actions[0].HasField("createTimer") + assert actions[0].createTimer.fireAt.ToDatetime() == expected_fire_at + assert actions[0].id == 8 - # Simulate the timer firing at the expected time and confirm that another activity task is scheduled + # --- Timer fires, retry schedules scheduleTask(id=9) --- current_timestamp = expected_fire_at old_events = old_events + new_events new_events = [ helpers.new_orchestrator_started_event(current_timestamp), - helpers.new_timer_fired_event(5, current_timestamp), + helpers.new_timer_created_event(8, current_timestamp), + helpers.new_timer_fired_event(8, current_timestamp), ] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) actions = result.actions - assert len(actions) == 5 - assert actions[1].HasField("scheduleTask") - assert actions[1].id == 1 + assert len(actions) == 1 + assert actions[0].HasField("scheduleTask") + assert actions[0].id == 9 + assert actions[0].scheduleTask.taskExecutionId == retry_task_execution_id - # Simulate the task failing for a fifth time and confirm that a timer is scheduled for 10 seconds in the future. - # This time, the timer will fire after 10 seconds, instead of 16, as max_retry_interval is set to 10 seconds. + # --- Attempt 5: scheduleTask(id=9) fails --- + # max_retry_interval caps at 10 seconds (instead of 16) expected_fire_at = current_timestamp + timedelta(seconds=10) old_events = old_events + new_events new_events = [ helpers.new_orchestrator_started_event(current_timestamp), - helpers.new_task_failed_event(1, ValueError("Kah-BOOOOM!!!")), + helpers.new_task_scheduled_event(9, task.get_name(dummy_activity)), + helpers.new_task_failed_event(9, ValueError("Kah-BOOOOM!!!")), ] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) actions = result.actions - assert len(actions) == 6 - assert actions[5].HasField("createTimer") - assert actions[5].createTimer.fireAt.ToDatetime() == expected_fire_at - assert actions[5].id == 6 + assert len(actions) == 1 + assert actions[0].HasField("createTimer") + assert actions[0].createTimer.fireAt.ToDatetime() == expected_fire_at + assert actions[0].id == 10 - # Simulate the timer firing at the expected time and confirm that another activity task is scheduled + # --- Timer fires, retry schedules scheduleTask(id=11) --- current_timestamp = expected_fire_at old_events = old_events + new_events new_events = [ helpers.new_orchestrator_started_event(current_timestamp), - helpers.new_timer_fired_event(6, current_timestamp), + helpers.new_timer_created_event(10, current_timestamp), + helpers.new_timer_fired_event(10, current_timestamp), ] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) actions = result.actions - assert len(actions) == 6 - assert actions[1].HasField("scheduleTask") - assert actions[1].id == 1 + assert len(actions) == 1 + assert actions[0].HasField("scheduleTask") + assert actions[0].id == 11 + assert actions[0].scheduleTask.taskExecutionId == retry_task_execution_id - # Simulate the task failing for a sixth time and confirm that orchestration is marked as failed finally. + # --- Attempt 6: scheduleTask(id=11) fails - max attempts exhausted --- old_events = old_events + new_events new_events = [ helpers.new_orchestrator_started_event(current_timestamp), - helpers.new_task_failed_event(1, ValueError("Kah-BOOOOM!!!")), + helpers.new_task_scheduled_event(11, task.get_name(dummy_activity)), + helpers.new_task_failed_event(11, ValueError("Kah-BOOOOM!!!")), ] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) actions = result.actions assert len(actions) == 1 assert actions[0].completeOrchestration.failureDetails.errorMessage.__contains__( - "Activity task #1 failed: Kah-BOOOOM!!!" + "Activity task #11 failed: Kah-BOOOOM!!!" ) - assert actions[0].id == 7 + assert actions[0].id == 12 def test_nondeterminism_expected_timer(): @@ -1297,29 +1328,31 @@ def orchestrator(ctx: task.OrchestrationContext, _): old_events = old_events + new_events new_events = [ helpers.new_orchestrator_started_event(current_timestamp), + helpers.new_timer_created_event(3, current_timestamp), helpers.new_timer_fired_event(3, current_timestamp), ] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) actions = result.actions - assert len(actions) == 2 - assert actions[1].HasField("scheduleTask") - assert actions[1].id == 1 + assert len(actions) == 1 + assert actions[0].HasField("scheduleTask") + assert actions[0].id == 4 # New sequence number for retry # Simulate the task failing for the second time and confirm that a timer is scheduled for 2 seconds in the future old_events = old_events + new_events expected_fire_at = current_timestamp + timedelta(seconds=2) new_events = [ helpers.new_orchestrator_started_event(current_timestamp), - helpers.new_task_failed_event(1, ValueError("Kah-BOOOOM!!!")), + helpers.new_task_scheduled_event(4, task.get_name(dummy_activity)), + helpers.new_task_failed_event(4, ValueError("Kah-BOOOOM!!!")), ] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) actions = result.actions - assert len(actions) == 3 - assert actions[2].HasField("createTimer") - assert actions[2].createTimer.fireAt.ToDatetime() == expected_fire_at - assert actions[2].id == 4 + assert len(actions) == 1 + assert actions[0].HasField("createTimer") + assert actions[0].createTimer.fireAt.ToDatetime() == expected_fire_at + assert actions[0].id == 5 # Complete the "Seattle" task. We expect the orchestration to complete with output "Hello, Seattle!" encoded_output = json.dumps(dummy_activity(None, "Seattle")) @@ -1389,29 +1422,31 @@ def orchestrator(ctx: task.OrchestrationContext, _): old_events = old_events + new_events new_events = [ helpers.new_orchestrator_started_event(current_timestamp), + helpers.new_timer_created_event(3, current_timestamp), helpers.new_timer_fired_event(3, current_timestamp), ] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) actions = result.actions - assert len(actions) == 2 - assert actions[1].HasField("scheduleTask") - assert actions[1].id == 1 + assert len(actions) == 1 + assert actions[0].HasField("scheduleTask") + assert actions[0].id == 4 # New sequence number for retry # Simulate the task failing for the second time and confirm that a timer is scheduled for 5 seconds in the future old_events = old_events + new_events expected_fire_at = current_timestamp + timedelta(seconds=5) new_events = [ helpers.new_orchestrator_started_event(current_timestamp), - helpers.new_task_failed_event(1, ValueError("Kah-BOOOOM!!!")), + helpers.new_task_scheduled_event(4, task.get_name(dummy_activity)), + helpers.new_task_failed_event(4, ValueError("Kah-BOOOOM!!!")), ] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) actions = result.actions - assert len(actions) == 3 - assert actions[2].HasField("createTimer") - assert actions[2].createTimer.fireAt.ToDatetime() == expected_fire_at - assert actions[2].id == 4 + assert len(actions) == 1 + assert actions[0].HasField("createTimer") + assert actions[0].createTimer.fireAt.ToDatetime() == expected_fire_at + assert actions[0].id == 5 # Complete the "Seattle" task. # And, Simulate the timer firing at the expected time and confirm that another activity task is scheduled @@ -1419,14 +1454,15 @@ def orchestrator(ctx: task.OrchestrationContext, _): old_events = old_events + new_events new_events = [ helpers.new_task_completed_event(2, encoded_output), - helpers.new_timer_fired_event(4, current_timestamp), + helpers.new_timer_created_event(5, current_timestamp), + helpers.new_timer_fired_event(5, current_timestamp), ] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) actions = result.actions - assert len(actions) == 3 - assert actions[1].HasField("scheduleTask") - assert actions[1].id == 1 + assert len(actions) == 1 + assert actions[0].HasField("scheduleTask") + assert actions[0].id == 6 # New sequence number for retry ex = ValueError("Kah-BOOOOM!!!") @@ -1434,7 +1470,8 @@ def orchestrator(ctx: task.OrchestrationContext, _): old_events = old_events + new_events new_events = [ helpers.new_orchestrator_started_event(current_timestamp), - helpers.new_task_failed_event(1, ValueError("Kah-BOOOOM!!!")), + helpers.new_task_scheduled_event(6, task.get_name(dummy_activity)), + helpers.new_task_failed_event(6, ValueError("Kah-BOOOOM!!!")), ] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) @@ -1568,27 +1605,31 @@ def orchestrator(ctx: task.OrchestrationContext, _): current_timestamp = expected_fire_at new_events = [ helpers.new_orchestrator_started_event(current_timestamp), + helpers.new_timer_created_event(2, current_timestamp), helpers.new_timer_fired_event(2, current_timestamp), ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) actions = result.actions - assert len(actions) == 2 # timer + rescheduled task - assert actions[1].HasField("scheduleTask") - assert actions[1].id == 1 + assert len(actions) == 1 # rescheduled task only (timer consumed) + assert actions[0].HasField("scheduleTask") + assert actions[0].id == 3 # New sequence number for retry # Second attempt also fails old_events = old_events + new_events new_events = [ helpers.new_orchestrator_started_event(current_timestamp), - helpers.new_task_failed_event(1, Exception("generic error")), + helpers.new_task_scheduled_event(3, task.get_name(dummy_activity)), + helpers.new_task_failed_event(3, Exception("generic error")), ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) actions = result.actions # Should schedule another retry timer - assert len(actions) == 3 - assert actions[2].HasField("createTimer") - assert actions[2].id == 3 + assert len(actions) == 1 + assert actions[0].HasField("createTimer") + assert actions[0].id == 4 # Simulate the timer firing and activity being rescheduled expected_fire_at = current_timestamp + timedelta(seconds=1) @@ -1596,28 +1637,32 @@ def orchestrator(ctx: task.OrchestrationContext, _): current_timestamp = expected_fire_at new_events = [ helpers.new_orchestrator_started_event(current_timestamp), - helpers.new_timer_fired_event(3, current_timestamp), + helpers.new_timer_created_event(4, current_timestamp), + helpers.new_timer_fired_event(4, current_timestamp), ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) actions = result.actions - assert len(actions) == 3 # timer + rescheduled task - assert actions[1].HasField("scheduleTask") - assert actions[1].id == 1 + assert len(actions) == 1 # rescheduled task only (timer consumed) + assert actions[0].HasField("scheduleTask") + assert actions[0].id == 5 # New sequence number for retry # Third attempt fails - should exhaust retries old_events = old_events + new_events new_events = [ helpers.new_orchestrator_started_event(current_timestamp), - helpers.new_task_failed_event(1, Exception("generic error")), + helpers.new_task_scheduled_event(5, task.get_name(dummy_activity)), + helpers.new_task_failed_event(5, Exception("generic error")), ] + executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) actions = result.actions # Now should fail - no more retries complete_action = get_and_validate_single_complete_orchestration_action(actions) assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_FAILED assert complete_action.failureDetails.errorMessage.__contains__( - "Activity task #1 failed: generic error" + "Activity task #5 failed: generic error" )