Skip to content

Commit f95a23a

Browse files
committed
Initial implementation
1 parent 9a909d2 commit f95a23a

File tree

8 files changed

+775
-158
lines changed

8 files changed

+775
-158
lines changed

durabletask/client.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,26 @@ def resume_orchestration(self, instance_id: str):
299299
self._logger.info(f"Resuming instance '{instance_id}'.")
300300
self._stub.ResumeInstance(req)
301301

302+
def rewind_orchestration(self, instance_id: str, *,
303+
reason: Optional[str] = None):
304+
"""Rewinds a failed orchestration instance to its last known good state.
305+
306+
Rewind removes failed task and sub-orchestration results from the
307+
orchestration history and replays the orchestration from the last
308+
successful checkpoint. Activities that previously succeeded are
309+
not re-executed; only failed work is retried.
310+
311+
Args:
312+
instance_id: The ID of the orchestration instance to rewind.
313+
reason: An optional reason string describing why the orchestration is being rewound.
314+
"""
315+
req = pb.RewindInstanceRequest(
316+
instanceId=instance_id,
317+
reason=helpers.get_string_value(reason))
318+
319+
self._logger.info(f"Rewinding instance '{instance_id}'.")
320+
self._stub.RewindInstance(req)
321+
302322
def restart_orchestration(self, instance_id: str, *,
303323
restart_with_new_instance_id: bool = False) -> str:
304324
"""Restarts an existing orchestration instance.
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
443b333f4f65a438dc9eb4f090560d232afec4b7
22
fd9369c6a03d6af4e95285e432b7c4e943c06970
3-
026329c53fe6363985655857b9ca848ec7238bd2
3+
026329c53fe6363985655857b9ca848ec7238bd2
4+
57930bf659bb4c90cfeae44eaf465e000a67ecf1 // DO NOT MERGE - FEATURE BRANCH

durabletask/internal/orchestrator_service_pb2.py

Lines changed: 159 additions & 149 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

durabletask/internal/orchestrator_service_pb2.pyi

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -593,8 +593,14 @@ class SendEntityMessageAction(_message.Message):
593593
entityUnlockSent: EntityUnlockSentEvent
594594
def __init__(self, entityOperationSignaled: _Optional[_Union[EntityOperationSignaledEvent, _Mapping]] = ..., entityOperationCalled: _Optional[_Union[EntityOperationCalledEvent, _Mapping]] = ..., entityLockRequested: _Optional[_Union[EntityLockRequestedEvent, _Mapping]] = ..., entityUnlockSent: _Optional[_Union[EntityUnlockSentEvent, _Mapping]] = ...) -> None: ...
595595

596+
class RewindOrchestrationAction(_message.Message):
597+
__slots__ = ("newHistory",)
598+
NEWHISTORY_FIELD_NUMBER: _ClassVar[int]
599+
newHistory: _containers.RepeatedCompositeFieldContainer[HistoryEvent]
600+
def __init__(self, newHistory: _Optional[_Iterable[_Union[HistoryEvent, _Mapping]]] = ...) -> None: ...
601+
596602
class OrchestratorAction(_message.Message):
597-
__slots__ = ("id", "scheduleTask", "createSubOrchestration", "createTimer", "sendEvent", "completeOrchestration", "terminateOrchestration", "sendEntityMessage")
603+
__slots__ = ("id", "scheduleTask", "createSubOrchestration", "createTimer", "sendEvent", "completeOrchestration", "terminateOrchestration", "sendEntityMessage", "rewindOrchestration")
598604
ID_FIELD_NUMBER: _ClassVar[int]
599605
SCHEDULETASK_FIELD_NUMBER: _ClassVar[int]
600606
CREATESUBORCHESTRATION_FIELD_NUMBER: _ClassVar[int]
@@ -603,6 +609,7 @@ class OrchestratorAction(_message.Message):
603609
COMPLETEORCHESTRATION_FIELD_NUMBER: _ClassVar[int]
604610
TERMINATEORCHESTRATION_FIELD_NUMBER: _ClassVar[int]
605611
SENDENTITYMESSAGE_FIELD_NUMBER: _ClassVar[int]
612+
REWINDORCHESTRATION_FIELD_NUMBER: _ClassVar[int]
606613
id: int
607614
scheduleTask: ScheduleTaskAction
608615
createSubOrchestration: CreateSubOrchestrationAction
@@ -611,7 +618,8 @@ class OrchestratorAction(_message.Message):
611618
completeOrchestration: CompleteOrchestrationAction
612619
terminateOrchestration: TerminateOrchestrationAction
613620
sendEntityMessage: SendEntityMessageAction
614-
def __init__(self, id: _Optional[int] = ..., scheduleTask: _Optional[_Union[ScheduleTaskAction, _Mapping]] = ..., createSubOrchestration: _Optional[_Union[CreateSubOrchestrationAction, _Mapping]] = ..., createTimer: _Optional[_Union[CreateTimerAction, _Mapping]] = ..., sendEvent: _Optional[_Union[SendEventAction, _Mapping]] = ..., completeOrchestration: _Optional[_Union[CompleteOrchestrationAction, _Mapping]] = ..., terminateOrchestration: _Optional[_Union[TerminateOrchestrationAction, _Mapping]] = ..., sendEntityMessage: _Optional[_Union[SendEntityMessageAction, _Mapping]] = ...) -> None: ...
621+
rewindOrchestration: RewindOrchestrationAction
622+
def __init__(self, id: _Optional[int] = ..., scheduleTask: _Optional[_Union[ScheduleTaskAction, _Mapping]] = ..., createSubOrchestration: _Optional[_Union[CreateSubOrchestrationAction, _Mapping]] = ..., createTimer: _Optional[_Union[CreateTimerAction, _Mapping]] = ..., sendEvent: _Optional[_Union[SendEventAction, _Mapping]] = ..., completeOrchestration: _Optional[_Union[CompleteOrchestrationAction, _Mapping]] = ..., terminateOrchestration: _Optional[_Union[TerminateOrchestrationAction, _Mapping]] = ..., sendEntityMessage: _Optional[_Union[SendEntityMessageAction, _Mapping]] = ..., rewindOrchestration: _Optional[_Union[RewindOrchestrationAction, _Mapping]] = ...) -> None: ...
615623

616624
class OrchestrationTraceContext(_message.Message):
617625
__slots__ = ("spanID", "spanStartTime")
@@ -1250,16 +1258,50 @@ class SkipGracefulOrchestrationTerminationsResponse(_message.Message):
12501258
def __init__(self, unterminatedInstanceIds: _Optional[_Iterable[str]] = ...) -> None: ...
12511259

12521260
class GetWorkItemsRequest(_message.Message):
1253-
__slots__ = ("maxConcurrentOrchestrationWorkItems", "maxConcurrentActivityWorkItems", "maxConcurrentEntityWorkItems", "capabilities")
1261+
__slots__ = ("maxConcurrentOrchestrationWorkItems", "maxConcurrentActivityWorkItems", "maxConcurrentEntityWorkItems", "capabilities", "workItemFilters")
12541262
MAXCONCURRENTORCHESTRATIONWORKITEMS_FIELD_NUMBER: _ClassVar[int]
12551263
MAXCONCURRENTACTIVITYWORKITEMS_FIELD_NUMBER: _ClassVar[int]
12561264
MAXCONCURRENTENTITYWORKITEMS_FIELD_NUMBER: _ClassVar[int]
12571265
CAPABILITIES_FIELD_NUMBER: _ClassVar[int]
1266+
WORKITEMFILTERS_FIELD_NUMBER: _ClassVar[int]
12581267
maxConcurrentOrchestrationWorkItems: int
12591268
maxConcurrentActivityWorkItems: int
12601269
maxConcurrentEntityWorkItems: int
12611270
capabilities: _containers.RepeatedScalarFieldContainer[WorkerCapability]
1262-
def __init__(self, maxConcurrentOrchestrationWorkItems: _Optional[int] = ..., maxConcurrentActivityWorkItems: _Optional[int] = ..., maxConcurrentEntityWorkItems: _Optional[int] = ..., capabilities: _Optional[_Iterable[_Union[WorkerCapability, str]]] = ...) -> None: ...
1271+
workItemFilters: WorkItemFilters
1272+
def __init__(self, maxConcurrentOrchestrationWorkItems: _Optional[int] = ..., maxConcurrentActivityWorkItems: _Optional[int] = ..., maxConcurrentEntityWorkItems: _Optional[int] = ..., capabilities: _Optional[_Iterable[_Union[WorkerCapability, str]]] = ..., workItemFilters: _Optional[_Union[WorkItemFilters, _Mapping]] = ...) -> None: ...
1273+
1274+
class WorkItemFilters(_message.Message):
1275+
__slots__ = ("orchestrations", "activities", "entities")
1276+
ORCHESTRATIONS_FIELD_NUMBER: _ClassVar[int]
1277+
ACTIVITIES_FIELD_NUMBER: _ClassVar[int]
1278+
ENTITIES_FIELD_NUMBER: _ClassVar[int]
1279+
orchestrations: _containers.RepeatedCompositeFieldContainer[OrchestrationFilter]
1280+
activities: _containers.RepeatedCompositeFieldContainer[ActivityFilter]
1281+
entities: _containers.RepeatedCompositeFieldContainer[EntityFilter]
1282+
def __init__(self, orchestrations: _Optional[_Iterable[_Union[OrchestrationFilter, _Mapping]]] = ..., activities: _Optional[_Iterable[_Union[ActivityFilter, _Mapping]]] = ..., entities: _Optional[_Iterable[_Union[EntityFilter, _Mapping]]] = ...) -> None: ...
1283+
1284+
class OrchestrationFilter(_message.Message):
1285+
__slots__ = ("name", "versions")
1286+
NAME_FIELD_NUMBER: _ClassVar[int]
1287+
VERSIONS_FIELD_NUMBER: _ClassVar[int]
1288+
name: str
1289+
versions: _containers.RepeatedScalarFieldContainer[str]
1290+
def __init__(self, name: _Optional[str] = ..., versions: _Optional[_Iterable[str]] = ...) -> None: ...
1291+
1292+
class ActivityFilter(_message.Message):
1293+
__slots__ = ("name", "versions")
1294+
NAME_FIELD_NUMBER: _ClassVar[int]
1295+
VERSIONS_FIELD_NUMBER: _ClassVar[int]
1296+
name: str
1297+
versions: _containers.RepeatedScalarFieldContainer[str]
1298+
def __init__(self, name: _Optional[str] = ..., versions: _Optional[_Iterable[str]] = ...) -> None: ...
1299+
1300+
class EntityFilter(_message.Message):
1301+
__slots__ = ("name",)
1302+
NAME_FIELD_NUMBER: _ClassVar[int]
1303+
name: str
1304+
def __init__(self, name: _Optional[str] = ...) -> None: ...
12631305

12641306
class WorkItem(_message.Message):
12651307
__slots__ = ("orchestratorRequest", "activityRequest", "entityRequest", "healthPing", "entityRequestV2", "completionToken")

durabletask/testing/README.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -255,8 +255,7 @@ The in-memory backend is designed for testing and has some limitations compared
255255
1. **No persistence**: All state is lost when the backend is stopped
256256
2. **No distributed execution**: Runs in a single process
257257
3. **No history streaming**: StreamInstanceHistory is not implemented
258-
4. **No rewind**: RewindInstance is not implemented
259-
5. **No recursive termination**: Recursive termination is not supported
258+
4. **No recursive termination**: Recursive termination is not supported
260259

261260
### Best Practices
262261

durabletask/testing/in_memory_backend.py

Lines changed: 150 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -981,8 +981,33 @@ def DeleteTaskHub(self, request: pb.DeleteTaskHubRequest, context):
981981
return pb.DeleteTaskHubResponse()
982982

983983
def RewindInstance(self, request: pb.RewindInstanceRequest, context):
984-
"""Rewinds an orchestration instance (not implemented)."""
985-
context.abort(grpc.StatusCode.UNIMPLEMENTED, "RewindInstance not implemented")
984+
"""Rewinds a failed orchestration instance.
985+
986+
The backend validates the instance is in a failed state, appends
987+
an ``ExecutionRewoundEvent`` to the pending events, resets the
988+
instance status to RUNNING, and re-enqueues the orchestration
989+
so the worker can replay it and produce a
990+
``RewindOrchestrationAction`` with the corrected history.
991+
"""
992+
with self._lock:
993+
instance = self._instances.get(request.instanceId)
994+
if not instance:
995+
context.abort(
996+
grpc.StatusCode.NOT_FOUND,
997+
f"Orchestration instance '{request.instanceId}' not found")
998+
return pb.RewindInstanceResponse()
999+
1000+
if instance.status != pb.ORCHESTRATION_STATUS_FAILED:
1001+
context.abort(
1002+
grpc.StatusCode.FAILED_PRECONDITION,
1003+
f"Orchestration instance '{request.instanceId}' is not in a failed state")
1004+
return pb.RewindInstanceResponse()
1005+
1006+
reason = request.reason.value if request.HasField("reason") else None
1007+
self._prepare_rewind(instance, reason)
1008+
1009+
self._logger.info(f"Rewound instance '{request.instanceId}'")
1010+
return pb.RewindInstanceResponse()
9861011

9871012
def AbandonTaskActivityWorkItem(self, request: pb.AbandonActivityTaskRequest, context):
9881013
"""Abandons an activity work item."""
@@ -1196,6 +1221,8 @@ def _process_action(self, instance: OrchestrationInstance, action: pb.Orchestrat
11961221
self._process_send_event_action(action.sendEvent)
11971222
elif action.HasField("sendEntityMessage"):
11981223
self._process_send_entity_message_action(instance, action)
1224+
elif action.HasField("rewindOrchestration"):
1225+
self._process_rewind_orchestration_action(instance, action.rewindOrchestration)
11991226

12001227
def _process_complete_orchestration_action(self, instance: OrchestrationInstance,
12011228
complete_action: pb.CompleteOrchestrationAction):
@@ -1205,6 +1232,14 @@ def _process_complete_orchestration_action(self, instance: OrchestrationInstance
12051232
instance.output = complete_action.result.value if complete_action.result else None
12061233
instance.failure_details = complete_action.failureDetails if complete_action.failureDetails else None
12071234

1235+
# Append orchestratorCompleted to history when the orchestration
1236+
# reaches a terminal state. This positional marker allows the
1237+
# SDK to distinguish a post-rewind replay from a new rewind
1238+
# request by comparing the position of the last
1239+
# orchestratorCompleted against the last executionRewound.
1240+
if status != pb.ORCHESTRATION_STATUS_CONTINUED_AS_NEW:
1241+
instance.history.append(helpers.new_orchestrator_completed_event())
1242+
12081243
if status == pb.ORCHESTRATION_STATUS_CONTINUED_AS_NEW:
12091244
# Handle continue-as-new
12101245
new_input = complete_action.result.value if complete_action.result else None
@@ -1558,6 +1593,119 @@ def _signal_entity_internal(self, entity_id: str, operation: str,
15581593
)
15591594
self._queue_entity_operation(entity_id, event)
15601595

1596+
def _prepare_rewind(self, instance: OrchestrationInstance,
1597+
reason: Optional[str] = None):
1598+
"""Prepares an orchestration instance for rewind.
1599+
1600+
Appends an ``ExecutionRewoundEvent`` to the pending events, resets
1601+
the instance status to RUNNING, and re-enqueues it so the worker
1602+
can replay it. The actual history rewriting is done by the SDK
1603+
worker when it processes the rewind event.
1604+
1605+
Args:
1606+
instance: The orchestration instance to rewind.
1607+
reason: Optional reason string for the rewind.
1608+
1609+
Note:
1610+
Must be called while holding ``self._lock``.
1611+
"""
1612+
# Reset instance state so it can be re-processed.
1613+
instance.status = pb.ORCHESTRATION_STATUS_RUNNING
1614+
instance.output = None
1615+
instance.failure_details = None
1616+
instance.last_updated_at = datetime.now(timezone.utc)
1617+
1618+
# Clear any stale dispatched events.
1619+
instance.dispatched_events.clear()
1620+
1621+
# Add the ExecutionRewound event as a new pending event.
1622+
rewind_event = pb.HistoryEvent(
1623+
eventId=-1,
1624+
timestamp=timestamp_pb2.Timestamp(),
1625+
executionRewound=pb.ExecutionRewoundEvent(
1626+
reason=wrappers_pb2.StringValue(value=reason) if reason else None,
1627+
),
1628+
)
1629+
instance.pending_events.append(rewind_event)
1630+
1631+
# Refresh the completion token and enqueue.
1632+
instance.completion_token = self._next_completion_token
1633+
self._next_completion_token += 1
1634+
self._orchestration_in_flight.discard(instance.instance_id)
1635+
self._enqueue_orchestration(instance.instance_id)
1636+
1637+
def _process_rewind_orchestration_action(
1638+
self, instance: OrchestrationInstance,
1639+
rewind_action: pb.RewindOrchestrationAction):
1640+
"""Processes a RewindOrchestrationAction returned by the SDK.
1641+
1642+
The action contains a ``newHistory`` field with the rewritten
1643+
history computed by the SDK (failed tasks and sub-orchestration
1644+
failures removed). The backend replaces the instance's history
1645+
with this new history, recursively rewinds any failed
1646+
sub-orchestrations, and re-enqueues the orchestration.
1647+
"""
1648+
new_history = list(rewind_action.newHistory)
1649+
1650+
# Replace history with the rewritten version.
1651+
instance.history = new_history
1652+
instance.status = pb.ORCHESTRATION_STATUS_RUNNING
1653+
instance.output = None
1654+
instance.failure_details = None
1655+
instance.last_updated_at = datetime.now(timezone.utc)
1656+
1657+
# Identify sub-orchestrations that were created but did not
1658+
# complete successfully — they need to be recursively rewound.
1659+
completed_sub_orch_task_ids: set[int] = set()
1660+
created_sub_orchs: dict[int, str] = {}
1661+
for event in new_history:
1662+
if event.HasField("subOrchestrationInstanceCreated"):
1663+
created_sub_orchs[event.eventId] = (
1664+
event.subOrchestrationInstanceCreated.instanceId)
1665+
elif event.HasField("subOrchestrationInstanceCompleted"):
1666+
completed_sub_orch_task_ids.add(
1667+
event.subOrchestrationInstanceCompleted.taskScheduledId)
1668+
1669+
# Extract the rewind reason from the last ExecutionRewound event.
1670+
reason: Optional[str] = None
1671+
for event in reversed(new_history):
1672+
if event.HasField("executionRewound"):
1673+
if event.executionRewound.HasField("reason"):
1674+
reason = event.executionRewound.reason.value
1675+
break
1676+
1677+
# Recursively rewind failed sub-orchestrations.
1678+
for task_id, sub_instance_id in created_sub_orchs.items():
1679+
if task_id not in completed_sub_orch_task_ids:
1680+
sub_instance = self._instances.get(sub_instance_id)
1681+
if (sub_instance
1682+
and sub_instance.status == pb.ORCHESTRATION_STATUS_FAILED):
1683+
self._prepare_rewind(sub_instance, reason)
1684+
self._watch_sub_orchestration(
1685+
instance.instance_id, sub_instance_id, task_id)
1686+
1687+
# Re-enqueue so the orchestration replays with the clean history.
1688+
# The executionRewound event is added to pending_events so the
1689+
# worker can see it in new_events; the worker uses the presence
1690+
# of executionRewound in old_events (history) to distinguish
1691+
# this normal post-rewind replay from the initial rewind
1692+
# short-circuit. Note: we do NOT add orchestratorStarted here
1693+
# because the work-item dispatch loop already inserts one when
1694+
# the instance has non-empty history.
1695+
rewind_event = None
1696+
for event in new_history:
1697+
if event.HasField("executionRewound"):
1698+
rewind_event = event
1699+
break
1700+
instance.pending_events.clear()
1701+
instance.dispatched_events.clear()
1702+
if rewind_event is not None:
1703+
instance.pending_events.append(rewind_event)
1704+
instance.completion_token = self._next_completion_token
1705+
self._next_completion_token += 1
1706+
self._orchestration_in_flight.discard(instance.instance_id)
1707+
self._enqueue_orchestration(instance.instance_id)
1708+
15611709
def _enqueue_entity(self, entity_id: str):
15621710
"""Enqueues an entity for processing."""
15631711
if entity_id not in self._entity_queue_set:

0 commit comments

Comments
 (0)