Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions durabletask/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,26 @@ def resume_orchestration(self, instance_id: str):
self._logger.info(f"Resuming instance '{instance_id}'.")
self._stub.ResumeInstance(req)

def rewind_orchestration(self, instance_id: str, *,
reason: Optional[str] = None):
"""Rewinds a failed orchestration instance to its last known good state.

Rewind removes failed task and sub-orchestration results from the
orchestration history and replays the orchestration from the last
successful checkpoint. Activities that previously succeeded are
not re-executed; only failed work is retried.

Args:
instance_id: The ID of the orchestration instance to rewind.
reason: An optional reason string describing why the orchestration is being rewound.
"""
req = pb.RewindInstanceRequest(
instanceId=instance_id,
reason=helpers.get_string_value(reason))

self._logger.info(f"Rewinding instance '{instance_id}'.")
self._stub.RewindInstance(req)

def restart_orchestration(self, instance_id: str, *,
restart_with_new_instance_id: bool = False) -> str:
"""Restarts an existing orchestration instance.
Expand Down
3 changes: 2 additions & 1 deletion durabletask/internal/PROTO_SOURCE_COMMIT_HASH
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
443b333f4f65a438dc9eb4f090560d232afec4b7
fd9369c6a03d6af4e95285e432b7c4e943c06970
026329c53fe6363985655857b9ca848ec7238bd2
026329c53fe6363985655857b9ca848ec7238bd2
57930bf659bb4c90cfeae44eaf465e000a67ecf1 // DO NOT MERGE - FEATURE BRANCH
15 changes: 15 additions & 0 deletions durabletask/internal/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,21 @@ def new_terminated_event(*, encoded_output: Optional[str] = None) -> pb.HistoryE
)


def new_execution_completed_event(
status: 'pb.OrchestrationStatus',
encoded_result: Optional[str] = None,
failure_details: Optional['pb.TaskFailureDetails'] = None) -> pb.HistoryEvent:
return pb.HistoryEvent(
eventId=-1,
timestamp=timestamp_pb2.Timestamp(),
executionCompleted=pb.ExecutionCompletedEvent(
orchestrationStatus=status,
result=get_string_value(encoded_result),
failureDetails=failure_details,
)
)


def get_string_value(val: Optional[str]) -> Optional[wrappers_pb2.StringValue]:
if val is None:
return None
Expand Down
308 changes: 159 additions & 149 deletions durabletask/internal/orchestrator_service_pb2.py

Large diffs are not rendered by default.

50 changes: 46 additions & 4 deletions durabletask/internal/orchestrator_service_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -593,8 +593,14 @@ class SendEntityMessageAction(_message.Message):
entityUnlockSent: EntityUnlockSentEvent
def __init__(self, entityOperationSignaled: _Optional[_Union[EntityOperationSignaledEvent, _Mapping]] = ..., entityOperationCalled: _Optional[_Union[EntityOperationCalledEvent, _Mapping]] = ..., entityLockRequested: _Optional[_Union[EntityLockRequestedEvent, _Mapping]] = ..., entityUnlockSent: _Optional[_Union[EntityUnlockSentEvent, _Mapping]] = ...) -> None: ...

class RewindOrchestrationAction(_message.Message):
__slots__ = ("newHistory",)
NEWHISTORY_FIELD_NUMBER: _ClassVar[int]
newHistory: _containers.RepeatedCompositeFieldContainer[HistoryEvent]
def __init__(self, newHistory: _Optional[_Iterable[_Union[HistoryEvent, _Mapping]]] = ...) -> None: ...

class OrchestratorAction(_message.Message):
__slots__ = ("id", "scheduleTask", "createSubOrchestration", "createTimer", "sendEvent", "completeOrchestration", "terminateOrchestration", "sendEntityMessage")
__slots__ = ("id", "scheduleTask", "createSubOrchestration", "createTimer", "sendEvent", "completeOrchestration", "terminateOrchestration", "sendEntityMessage", "rewindOrchestration")
ID_FIELD_NUMBER: _ClassVar[int]
SCHEDULETASK_FIELD_NUMBER: _ClassVar[int]
CREATESUBORCHESTRATION_FIELD_NUMBER: _ClassVar[int]
Expand All @@ -603,6 +609,7 @@ class OrchestratorAction(_message.Message):
COMPLETEORCHESTRATION_FIELD_NUMBER: _ClassVar[int]
TERMINATEORCHESTRATION_FIELD_NUMBER: _ClassVar[int]
SENDENTITYMESSAGE_FIELD_NUMBER: _ClassVar[int]
REWINDORCHESTRATION_FIELD_NUMBER: _ClassVar[int]
id: int
scheduleTask: ScheduleTaskAction
createSubOrchestration: CreateSubOrchestrationAction
Expand All @@ -611,7 +618,8 @@ class OrchestratorAction(_message.Message):
completeOrchestration: CompleteOrchestrationAction
terminateOrchestration: TerminateOrchestrationAction
sendEntityMessage: SendEntityMessageAction
def __init__(self, id: _Optional[int] = ..., scheduleTask: _Optional[_Union[ScheduleTaskAction, _Mapping]] = ..., createSubOrchestration: _Optional[_Union[CreateSubOrchestrationAction, _Mapping]] = ..., createTimer: _Optional[_Union[CreateTimerAction, _Mapping]] = ..., sendEvent: _Optional[_Union[SendEventAction, _Mapping]] = ..., completeOrchestration: _Optional[_Union[CompleteOrchestrationAction, _Mapping]] = ..., terminateOrchestration: _Optional[_Union[TerminateOrchestrationAction, _Mapping]] = ..., sendEntityMessage: _Optional[_Union[SendEntityMessageAction, _Mapping]] = ...) -> None: ...
rewindOrchestration: RewindOrchestrationAction
def __init__(self, id: _Optional[int] = ..., scheduleTask: _Optional[_Union[ScheduleTaskAction, _Mapping]] = ..., createSubOrchestration: _Optional[_Union[CreateSubOrchestrationAction, _Mapping]] = ..., createTimer: _Optional[_Union[CreateTimerAction, _Mapping]] = ..., sendEvent: _Optional[_Union[SendEventAction, _Mapping]] = ..., completeOrchestration: _Optional[_Union[CompleteOrchestrationAction, _Mapping]] = ..., terminateOrchestration: _Optional[_Union[TerminateOrchestrationAction, _Mapping]] = ..., sendEntityMessage: _Optional[_Union[SendEntityMessageAction, _Mapping]] = ..., rewindOrchestration: _Optional[_Union[RewindOrchestrationAction, _Mapping]] = ...) -> None: ...

class OrchestrationTraceContext(_message.Message):
__slots__ = ("spanID", "spanStartTime")
Expand Down Expand Up @@ -1250,16 +1258,50 @@ class SkipGracefulOrchestrationTerminationsResponse(_message.Message):
def __init__(self, unterminatedInstanceIds: _Optional[_Iterable[str]] = ...) -> None: ...

class GetWorkItemsRequest(_message.Message):
__slots__ = ("maxConcurrentOrchestrationWorkItems", "maxConcurrentActivityWorkItems", "maxConcurrentEntityWorkItems", "capabilities")
__slots__ = ("maxConcurrentOrchestrationWorkItems", "maxConcurrentActivityWorkItems", "maxConcurrentEntityWorkItems", "capabilities", "workItemFilters")
MAXCONCURRENTORCHESTRATIONWORKITEMS_FIELD_NUMBER: _ClassVar[int]
MAXCONCURRENTACTIVITYWORKITEMS_FIELD_NUMBER: _ClassVar[int]
MAXCONCURRENTENTITYWORKITEMS_FIELD_NUMBER: _ClassVar[int]
CAPABILITIES_FIELD_NUMBER: _ClassVar[int]
WORKITEMFILTERS_FIELD_NUMBER: _ClassVar[int]
maxConcurrentOrchestrationWorkItems: int
maxConcurrentActivityWorkItems: int
maxConcurrentEntityWorkItems: int
capabilities: _containers.RepeatedScalarFieldContainer[WorkerCapability]
def __init__(self, maxConcurrentOrchestrationWorkItems: _Optional[int] = ..., maxConcurrentActivityWorkItems: _Optional[int] = ..., maxConcurrentEntityWorkItems: _Optional[int] = ..., capabilities: _Optional[_Iterable[_Union[WorkerCapability, str]]] = ...) -> None: ...
workItemFilters: WorkItemFilters
def __init__(self, maxConcurrentOrchestrationWorkItems: _Optional[int] = ..., maxConcurrentActivityWorkItems: _Optional[int] = ..., maxConcurrentEntityWorkItems: _Optional[int] = ..., capabilities: _Optional[_Iterable[_Union[WorkerCapability, str]]] = ..., workItemFilters: _Optional[_Union[WorkItemFilters, _Mapping]] = ...) -> None: ...

class WorkItemFilters(_message.Message):
__slots__ = ("orchestrations", "activities", "entities")
ORCHESTRATIONS_FIELD_NUMBER: _ClassVar[int]
ACTIVITIES_FIELD_NUMBER: _ClassVar[int]
ENTITIES_FIELD_NUMBER: _ClassVar[int]
orchestrations: _containers.RepeatedCompositeFieldContainer[OrchestrationFilter]
activities: _containers.RepeatedCompositeFieldContainer[ActivityFilter]
entities: _containers.RepeatedCompositeFieldContainer[EntityFilter]
def __init__(self, orchestrations: _Optional[_Iterable[_Union[OrchestrationFilter, _Mapping]]] = ..., activities: _Optional[_Iterable[_Union[ActivityFilter, _Mapping]]] = ..., entities: _Optional[_Iterable[_Union[EntityFilter, _Mapping]]] = ...) -> None: ...

class OrchestrationFilter(_message.Message):
__slots__ = ("name", "versions")
NAME_FIELD_NUMBER: _ClassVar[int]
VERSIONS_FIELD_NUMBER: _ClassVar[int]
name: str
versions: _containers.RepeatedScalarFieldContainer[str]
def __init__(self, name: _Optional[str] = ..., versions: _Optional[_Iterable[str]] = ...) -> None: ...

class ActivityFilter(_message.Message):
__slots__ = ("name", "versions")
NAME_FIELD_NUMBER: _ClassVar[int]
VERSIONS_FIELD_NUMBER: _ClassVar[int]
name: str
versions: _containers.RepeatedScalarFieldContainer[str]
def __init__(self, name: _Optional[str] = ..., versions: _Optional[_Iterable[str]] = ...) -> None: ...

class EntityFilter(_message.Message):
__slots__ = ("name",)
NAME_FIELD_NUMBER: _ClassVar[int]
name: str
def __init__(self, name: _Optional[str] = ...) -> None: ...

class WorkItem(_message.Message):
__slots__ = ("orchestratorRequest", "activityRequest", "entityRequest", "healthPing", "entityRequestV2", "completionToken")
Expand Down
3 changes: 1 addition & 2 deletions durabletask/testing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,7 @@ The in-memory backend is designed for testing and has some limitations compared
1. **No persistence**: All state is lost when the backend is stopped
2. **No distributed execution**: Runs in a single process
3. **No history streaming**: StreamInstanceHistory is not implemented
4. **No rewind**: RewindInstance is not implemented
5. **No recursive termination**: Recursive termination is not supported
4. **No recursive termination**: Recursive termination is not supported

### Best Practices

Expand Down
Loading
Loading