Skip to content

Commit 57d8b7d

Browse files
committed
Implement long timer support
1 parent 36910b2 commit 57d8b7d

File tree

3 files changed

+360
-31
lines changed

3 files changed

+360
-31
lines changed

durabletask/task.py

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -520,13 +520,32 @@ def compute_next_delay(self) -> Optional[timedelta]:
520520
return None
521521

522522

523-
class TimerTask(CancellableTask[T]):
523+
class TimerTask(CancellableTask[None]):
524+
def set_retryable_parent(self, retryable_task: RetryableTask):
525+
self._retryable_parent = retryable_task
524526

525-
def __init__(self) -> None:
527+
def complete(self, *args, **kwargs):
528+
super().complete(None)
529+
530+
531+
class LongTimerTask(TimerTask):
532+
def __init__(self, final_fire_at: datetime, maximum_timer_duration: timedelta):
526533
super().__init__()
534+
self._final_fire_at = final_fire_at
535+
self._maximum_timer_duration = maximum_timer_duration
527536

528-
def set_retryable_parent(self, retryable_task: RetryableTask):
529-
self._retryable_parent = retryable_task
537+
def start(self, current_utc_datetime: datetime) -> datetime:
538+
return self._get_next_fire_at(current_utc_datetime)
539+
540+
def complete(self, current_utc_datetime: datetime):
541+
if current_utc_datetime < self._final_fire_at:
542+
return self._get_next_fire_at(current_utc_datetime)
543+
super().complete(None)
544+
545+
def _get_next_fire_at(self, current_utc_datetime: datetime) -> datetime:
546+
if current_utc_datetime + self._maximum_timer_duration < self._final_fire_at:
547+
return current_utc_datetime + self._maximum_timer_duration
548+
return self._final_fire_at
530549

531550

532551
class WhenAnyTask(CompositeTask[Task]):

durabletask/worker.py

Lines changed: 69 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
TInput = TypeVar("TInput")
3939
TOutput = TypeVar("TOutput")
4040
DATETIME_STRING_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
41+
DEFAULT_MAXIMUM_TIMER_INTERVAL = timedelta(days=3)
4142

4243

4344
class ConcurrencyOptions:
@@ -320,6 +321,7 @@ def __init__(
320321
secure_channel: bool = False,
321322
interceptors: Optional[Sequence[shared.ClientInterceptor]] = None,
322323
concurrency_options: Optional[ConcurrencyOptions] = None,
324+
maximum_timer_interval: Optional[timedelta] = DEFAULT_MAXIMUM_TIMER_INTERVAL
323325
):
324326
self._registry = _Registry()
325327
self._host_address = (
@@ -348,12 +350,18 @@ def __init__(
348350
self._interceptors = None
349351

350352
self._async_worker_manager = _AsyncWorkerManager(self._concurrency_options, self._logger)
353+
self._maximum_timer_interval = maximum_timer_interval
351354

352355
@property
353356
def concurrency_options(self) -> ConcurrencyOptions:
354357
"""Get the current concurrency options for this worker."""
355358
return self._concurrency_options
356359

360+
@property
361+
def maximum_timer_interval(self) -> Optional[timedelta]:
362+
"""Get the configured maximum timer interval for long timer chunking."""
363+
return self._maximum_timer_interval
364+
357365
def __enter__(self):
358366
return self
359367

@@ -826,7 +834,11 @@ class _RuntimeOrchestrationContext(task.OrchestrationContext):
826834
_generator: Optional[Generator[task.Task, Any, Any]]
827835
_previous_task: Optional[task.Task]
828836

829-
def __init__(self, instance_id: str, registry: _Registry):
837+
def __init__(self,
838+
instance_id: str,
839+
registry: _Registry,
840+
maximum_timer_interval: Optional[timedelta] = DEFAULT_MAXIMUM_TIMER_INTERVAL,
841+
):
830842
self._generator = None
831843
self._is_replaying = True
832844
self._is_complete = False
@@ -851,6 +863,7 @@ def __init__(self, instance_id: str, registry: _Registry):
851863
self._new_input: Optional[Any] = None
852864
self._save_events = False
853865
self._encoded_custom_status: Optional[str] = None
866+
self._maximum_timer_interval = maximum_timer_interval
854867

855868
def run(self, generator: Generator[task.Task, Any, Any]):
856869
self._generator = generator
@@ -1026,11 +1039,20 @@ def create_timer_internal(
10261039
) -> task.TimerTask:
10271040
id = self.next_sequence_number()
10281041
if isinstance(fire_at, timedelta):
1029-
fire_at = self.current_utc_datetime + fire_at
1030-
action = ph.new_create_timer_action(id, fire_at)
1031-
self._pending_actions[id] = action
1042+
final_fire_at = self.current_utc_datetime + fire_at
1043+
else:
1044+
final_fire_at = fire_at
10321045

1033-
timer_task = task.TimerTask()
1046+
next_fire_at: datetime = final_fire_at
1047+
1048+
if self._maximum_timer_interval is not None and self.current_utc_datetime + self._maximum_timer_interval < final_fire_at:
1049+
timer_task = task.LongTimerTask(final_fire_at, self._maximum_timer_interval)
1050+
next_fire_at = timer_task.start(self.current_utc_datetime)
1051+
else:
1052+
timer_task = task.TimerTask()
1053+
1054+
action = ph.new_create_timer_action(id, next_fire_at)
1055+
self._pending_actions[id] = action
10341056

10351057
def _cancel_timer() -> None:
10361058
self._pending_actions.pop(id, None)
@@ -1311,9 +1333,13 @@ def __init__(
13111333
class _OrchestrationExecutor:
13121334
_generator: Optional[task.Orchestrator] = None
13131335

1314-
def __init__(self, registry: _Registry, logger: logging.Logger):
1336+
def __init__(self,
1337+
registry: _Registry,
1338+
logger: logging.Logger,
1339+
maximum_timer_interval: Optional[timedelta] = DEFAULT_MAXIMUM_TIMER_INTERVAL):
13151340
self._registry = registry
13161341
self._logger = logger
1342+
self._maximum_timer_interval = maximum_timer_interval
13171343
self._is_suspended = False
13181344
self._suspended_events: list[pb.HistoryEvent] = []
13191345

@@ -1337,7 +1363,11 @@ def execute(
13371363
"The new history event list must have at least one event in it."
13381364
)
13391365

1340-
ctx = _RuntimeOrchestrationContext(instance_id, self._registry)
1366+
ctx = _RuntimeOrchestrationContext(
1367+
instance_id,
1368+
self._registry,
1369+
maximum_timer_interval=self._maximum_timer_interval,
1370+
)
13411371
try:
13421372
# Rebuild local state by replaying old history into the orchestrator function
13431373
self._logger.debug(
@@ -1473,34 +1503,46 @@ def process_event(
14731503
f"{ctx.instance_id}: Ignoring unexpected timerFired event with ID = {timer_id}."
14741504
)
14751505
return
1476-
if not isinstance(timer_task, task.TimerTask):
1506+
if not (isinstance(timer_task, task.TimerTask) or isinstance(timer_task, task.LongTimerTask)):
14771507
if not ctx._is_replaying:
14781508
self._logger.warning(
14791509
f"{ctx.instance_id}: Ignoring timerFired event with non-timer task ID = {timer_id}."
14801510
)
14811511
return
14821512

1483-
timer_task.complete(None)
1484-
if timer_task._retryable_parent is not None:
1485-
activity_action = timer_task._retryable_parent._action
1513+
next_fire_at = timer_task.complete(event.timerFired.fireAt.ToDatetime())
1514+
if next_fire_at is not None:
1515+
id = ctx.next_sequence_number()
1516+
new_action = ph.new_create_timer_action(id, next_fire_at)
1517+
ctx._pending_tasks[id] = timer_task
1518+
ctx._pending_actions[id] = new_action
14861519

1487-
if not timer_task._retryable_parent._is_sub_orch:
1488-
cur_task = activity_action.scheduleTask
1489-
instance_id = None
1490-
else:
1491-
cur_task = activity_action.createSubOrchestration
1492-
instance_id = cur_task.instanceId
1493-
ctx.call_activity_function_helper(
1494-
id=activity_action.id,
1495-
activity_function=cur_task.name,
1496-
input=cur_task.input.value,
1497-
retry_policy=timer_task._retryable_parent._retry_policy,
1498-
is_sub_orch=timer_task._retryable_parent._is_sub_orch,
1499-
instance_id=instance_id,
1500-
fn_task=timer_task._retryable_parent,
1501-
)
1520+
def _cancel_timer() -> None:
1521+
ctx._pending_actions.pop(id, None)
1522+
ctx._pending_tasks.pop(id, None)
1523+
1524+
timer_task.set_cancel_handler(_cancel_timer)
15021525
else:
1503-
ctx.resume()
1526+
if timer_task._retryable_parent is not None:
1527+
activity_action = timer_task._retryable_parent._action
1528+
1529+
if not timer_task._retryable_parent._is_sub_orch:
1530+
cur_task = activity_action.scheduleTask
1531+
instance_id = None
1532+
else:
1533+
cur_task = activity_action.createSubOrchestration
1534+
instance_id = cur_task.instanceId
1535+
ctx.call_activity_function_helper(
1536+
id=activity_action.id,
1537+
activity_function=cur_task.name,
1538+
input=cur_task.input.value,
1539+
retry_policy=timer_task._retryable_parent._retry_policy,
1540+
is_sub_orch=timer_task._retryable_parent._is_sub_orch,
1541+
instance_id=instance_id,
1542+
fn_task=timer_task._retryable_parent,
1543+
)
1544+
else:
1545+
ctx.resume()
15041546
elif event.HasField("taskScheduled"):
15051547
# This history event confirms that the activity execution was successfully scheduled.
15061548
# Remove the taskScheduled event from the pending action list so we don't schedule it again.

0 commit comments

Comments
 (0)