Skip to content

Commit c8f8b5f

Browse files
Add support for more timeouts to Nexus operations (#1276)
* Add support for more timeouts to Nexus operations Add schedule-to-start and start-to-close for Nexus operations * fix format * fic workflow ABC * Add test * Remove unused class * Fix rebase
1 parent 3d64bf7 commit c8f8b5f

4 files changed

Lines changed: 198 additions & 0 deletions

File tree

temporalio/worker/_interceptor.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,8 @@ class StartNexusOperationInput(Generic[InputT, OutputT]):
303303
operation: nexusrpc.Operation[InputT, OutputT] | str | Callable[..., Any]
304304
input: InputT
305305
schedule_to_close_timeout: timedelta | None
306+
schedule_to_start_timeout: timedelta | None
307+
start_to_close_timeout: timedelta | None
306308
cancellation_type: temporalio.workflow.NexusOperationCancellationType
307309
headers: Mapping[str, str] | None
308310
summary: str | None

temporalio/worker/_workflow_instance.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1596,6 +1596,8 @@ async def workflow_start_nexus_operation(
15961596
input: Any,
15971597
output_type: type[OutputT] | None,
15981598
schedule_to_close_timeout: timedelta | None,
1599+
schedule_to_start_timeout: timedelta | None,
1600+
start_to_close_timeout: timedelta | None,
15991601
cancellation_type: temporalio.workflow.NexusOperationCancellationType,
16001602
headers: Mapping[str, str] | None,
16011603
summary: str | None,
@@ -1609,6 +1611,8 @@ async def workflow_start_nexus_operation(
16091611
input=input,
16101612
output_type=output_type,
16111613
schedule_to_close_timeout=schedule_to_close_timeout,
1614+
schedule_to_start_timeout=schedule_to_start_timeout,
1615+
start_to_close_timeout=start_to_close_timeout,
16121616
cancellation_type=cancellation_type,
16131617
headers=headers,
16141618
summary=summary,
@@ -3340,6 +3344,12 @@ def _apply_schedule_command(self) -> None:
33403344
v.schedule_to_close_timeout.FromTimedelta(
33413345
self._input.schedule_to_close_timeout
33423346
)
3347+
if self._input.schedule_to_start_timeout is not None:
3348+
v.schedule_to_start_timeout.FromTimedelta(
3349+
self._input.schedule_to_start_timeout
3350+
)
3351+
if self._input.start_to_close_timeout is not None:
3352+
v.start_to_close_timeout.FromTimedelta(self._input.start_to_close_timeout)
33433353
v.cancellation_type = cast(
33443354
temporalio.bridge.proto.nexus.NexusOperationCancellationType.ValueType,
33453355
int(self._input.cancellation_type),

temporalio/workflow.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -856,6 +856,8 @@ async def workflow_start_nexus_operation(
856856
input: Any,
857857
output_type: type[OutputT] | None,
858858
schedule_to_close_timeout: timedelta | None,
859+
schedule_to_start_timeout: timedelta | None,
860+
start_to_close_timeout: timedelta | None,
859861
cancellation_type: temporalio.workflow.NexusOperationCancellationType,
860862
headers: Mapping[str, str] | None,
861863
summary: str | None,
@@ -5418,6 +5420,8 @@ async def start_operation(
54185420
*,
54195421
output_type: type[OutputT] | None = None,
54205422
schedule_to_close_timeout: timedelta | None = None,
5423+
schedule_to_start_timeout: timedelta | None = None,
5424+
start_to_close_timeout: timedelta | None = None,
54215425
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
54225426
headers: Mapping[str, str] | None = None,
54235427
summary: str | None = None,
@@ -5433,6 +5437,8 @@ async def start_operation(
54335437
*,
54345438
output_type: type[OutputT] | None = None,
54355439
schedule_to_close_timeout: timedelta | None = None,
5440+
schedule_to_start_timeout: timedelta | None = None,
5441+
start_to_close_timeout: timedelta | None = None,
54365442
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
54375443
headers: Mapping[str, str] | None = None,
54385444
summary: str | None = None,
@@ -5451,6 +5457,8 @@ async def start_operation(
54515457
*,
54525458
output_type: type[OutputT] | None = None,
54535459
schedule_to_close_timeout: timedelta | None = None,
5460+
schedule_to_start_timeout: timedelta | None = None,
5461+
start_to_close_timeout: timedelta | None = None,
54545462
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
54555463
headers: Mapping[str, str] | None = None,
54565464
summary: str | None = None,
@@ -5469,6 +5477,8 @@ async def start_operation(
54695477
*,
54705478
output_type: type[OutputT] | None = None,
54715479
schedule_to_close_timeout: timedelta | None = None,
5480+
schedule_to_start_timeout: timedelta | None = None,
5481+
start_to_close_timeout: timedelta | None = None,
54725482
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
54735483
headers: Mapping[str, str] | None = None,
54745484
summary: str | None = None,
@@ -5487,6 +5497,8 @@ async def start_operation(
54875497
*,
54885498
output_type: type[OutputT] | None = None,
54895499
schedule_to_close_timeout: timedelta | None = None,
5500+
schedule_to_start_timeout: timedelta | None = None,
5501+
start_to_close_timeout: timedelta | None = None,
54905502
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
54915503
headers: Mapping[str, str] | None = None,
54925504
summary: str | None = None,
@@ -5504,6 +5516,8 @@ async def start_operation(
55045516
*,
55055517
output_type: type[OutputT] | None = None,
55065518
schedule_to_close_timeout: timedelta | None = None,
5519+
schedule_to_start_timeout: timedelta | None = None,
5520+
start_to_close_timeout: timedelta | None = None,
55075521
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
55085522
headers: Mapping[str, str] | None = None,
55095523
summary: str | None = None,
@@ -5517,6 +5531,8 @@ async def start_operation(
55175531
*,
55185532
output_type: type[OutputT] | None = None,
55195533
schedule_to_close_timeout: timedelta | None = None,
5534+
schedule_to_start_timeout: timedelta | None = None,
5535+
start_to_close_timeout: timedelta | None = None,
55205536
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
55215537
headers: Mapping[str, str] | None = None,
55225538
summary: str | None = None,
@@ -5528,6 +5544,8 @@ async def start_operation(
55285544
input: The Nexus operation input.
55295545
output_type: The Nexus operation output type.
55305546
schedule_to_close_timeout: Timeout for the entire operation attempt.
5547+
schedule_to_start_timeout: Timeout for the operation to be started.
5548+
start_to_close_timeout: Timeout for async operations to complete after starting.
55315549
headers: Headers to send with the Nexus HTTP request.
55325550
55335551
Returns:
@@ -5548,6 +5566,8 @@ async def execute_operation(
55485566
*,
55495567
output_type: type[OutputT] | None = None,
55505568
schedule_to_close_timeout: timedelta | None = None,
5569+
schedule_to_start_timeout: timedelta | None = None,
5570+
start_to_close_timeout: timedelta | None = None,
55515571
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
55525572
headers: Mapping[str, str] | None = None,
55535573
summary: str | None = None,
@@ -5563,6 +5583,8 @@ async def execute_operation(
55635583
*,
55645584
output_type: type[OutputT] | None = None,
55655585
schedule_to_close_timeout: timedelta | None = None,
5586+
schedule_to_start_timeout: timedelta | None = None,
5587+
start_to_close_timeout: timedelta | None = None,
55665588
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
55675589
headers: Mapping[str, str] | None = None,
55685590
summary: str | None = None,
@@ -5581,6 +5603,8 @@ async def execute_operation(
55815603
*,
55825604
output_type: type[OutputT] | None = None,
55835605
schedule_to_close_timeout: timedelta | None = None,
5606+
schedule_to_start_timeout: timedelta | None = None,
5607+
start_to_close_timeout: timedelta | None = None,
55845608
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
55855609
headers: Mapping[str, str] | None = None,
55865610
summary: str | None = None,
@@ -5599,6 +5623,8 @@ async def execute_operation(
55995623
*,
56005624
output_type: type[OutputT] | None = None,
56015625
schedule_to_close_timeout: timedelta | None = None,
5626+
schedule_to_start_timeout: timedelta | None = None,
5627+
start_to_close_timeout: timedelta | None = None,
56025628
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
56035629
headers: Mapping[str, str] | None = None,
56045630
summary: str | None = None,
@@ -5617,6 +5643,8 @@ async def execute_operation(
56175643
*,
56185644
output_type: type[OutputT] | None = None,
56195645
schedule_to_close_timeout: timedelta | None = None,
5646+
schedule_to_start_timeout: timedelta | None = None,
5647+
start_to_close_timeout: timedelta | None = None,
56205648
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
56215649
headers: Mapping[str, str] | None = None,
56225650
summary: str | None = None,
@@ -5635,6 +5663,8 @@ async def execute_operation(
56355663
*,
56365664
output_type: type[OutputT] | None = None,
56375665
schedule_to_close_timeout: timedelta | None = None,
5666+
schedule_to_start_timeout: timedelta | None = None,
5667+
start_to_close_timeout: timedelta | None = None,
56385668
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
56395669
headers: Mapping[str, str] | None = None,
56405670
summary: str | None = None,
@@ -5648,6 +5678,8 @@ async def execute_operation(
56485678
*,
56495679
output_type: type[OutputT] | None = None,
56505680
schedule_to_close_timeout: timedelta | None = None,
5681+
schedule_to_start_timeout: timedelta | None = None,
5682+
start_to_close_timeout: timedelta | None = None,
56515683
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
56525684
headers: Mapping[str, str] | None = None,
56535685
summary: str | None = None,
@@ -5659,6 +5691,8 @@ async def execute_operation(
56595691
input: The Nexus operation input.
56605692
output_type: The Nexus operation output type.
56615693
schedule_to_close_timeout: Timeout for the entire operation attempt.
5694+
schedule_to_start_timeout: Timeout for the operation to be started.
5695+
start_to_close_timeout: Timeout for async operations to complete after starting.
56625696
headers: Headers to send with the Nexus HTTP request.
56635697
56645698
Returns:
@@ -5701,6 +5735,8 @@ async def start_operation(
57015735
*,
57025736
output_type: type[OutputT] | None = None,
57035737
schedule_to_close_timeout: timedelta | None = None,
5738+
schedule_to_start_timeout: timedelta | None = None,
5739+
start_to_close_timeout: timedelta | None = None,
57045740
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
57055741
headers: Mapping[str, str] | None = None,
57065742
summary: str | None = None,
@@ -5713,6 +5749,8 @@ async def start_operation(
57135749
input=input,
57145750
output_type=output_type,
57155751
schedule_to_close_timeout=schedule_to_close_timeout,
5752+
schedule_to_start_timeout=schedule_to_start_timeout,
5753+
start_to_close_timeout=start_to_close_timeout,
57165754
cancellation_type=cancellation_type,
57175755
headers=headers,
57185756
summary=summary,
@@ -5726,6 +5764,8 @@ async def execute_operation(
57265764
*,
57275765
output_type: type[OutputT] | None = None,
57285766
schedule_to_close_timeout: timedelta | None = None,
5767+
schedule_to_start_timeout: timedelta | None = None,
5768+
start_to_close_timeout: timedelta | None = None,
57295769
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
57305770
headers: Mapping[str, str] | None = None,
57315771
summary: str | None = None,
@@ -5735,6 +5775,8 @@ async def execute_operation(
57355775
input,
57365776
output_type=output_type,
57375777
schedule_to_close_timeout=schedule_to_close_timeout,
5778+
schedule_to_start_timeout=schedule_to_start_timeout,
5779+
start_to_close_timeout=start_to_close_timeout,
57385780
cancellation_type=cancellation_type,
57395781
headers=headers,
57405782
summary=summary,

tests/nexus/test_workflow_caller_errors.py

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
ApplicationError,
3131
NexusOperationError,
3232
TimeoutError,
33+
TimeoutType,
3334
)
3435
from temporalio.testing import WorkflowEnvironment
3536
from temporalio.worker import Worker
@@ -323,6 +324,149 @@ async def test_error_raised_by_timeout_of_nexus_start_operation(
323324
assert capturer.find_log("unexpected cancellation reason") is None
324325

325326

327+
# Schedule to start timeout test
328+
@service_handler
329+
class ScheduleToStartTimeoutTestService:
330+
@sync_operation
331+
async def expect_schedule_to_start_timeout(
332+
self, ctx: StartOperationContext, _input: None
333+
) -> None:
334+
try:
335+
await asyncio.wait_for(ctx.task_cancellation.wait_until_cancelled(), 1)
336+
except asyncio.TimeoutError:
337+
raise ApplicationError("expected cancel", non_retryable=True)
338+
339+
340+
@workflow.defn
341+
class ScheduleToStartTimeoutTestCallerWorkflow:
342+
@workflow.init
343+
def __init__(self):
344+
self.nexus_client = workflow.create_nexus_client(
345+
service=ScheduleToStartTimeoutTestService,
346+
endpoint=make_nexus_endpoint_name(workflow.info().task_queue),
347+
)
348+
349+
@workflow.run
350+
async def run(self) -> None:
351+
await self.nexus_client.execute_operation(
352+
ScheduleToStartTimeoutTestService.expect_schedule_to_start_timeout,
353+
None,
354+
output_type=None,
355+
schedule_to_start_timeout=timedelta(seconds=0.1),
356+
)
357+
358+
359+
async def test_error_raised_by_schedule_to_start_timeout_of_nexus_operation(
360+
client: Client, env: WorkflowEnvironment
361+
):
362+
if env.supports_time_skipping:
363+
pytest.skip("Nexus tests don't work with time-skipping server")
364+
365+
task_queue = str(uuid.uuid4())
366+
async with Worker(
367+
client,
368+
nexus_service_handlers=[ScheduleToStartTimeoutTestService()],
369+
workflows=[ScheduleToStartTimeoutTestCallerWorkflow],
370+
task_queue=task_queue,
371+
nexus_task_executor=concurrent.futures.ThreadPoolExecutor(),
372+
):
373+
await env.create_nexus_endpoint(
374+
make_nexus_endpoint_name(task_queue), task_queue
375+
)
376+
try:
377+
await client.execute_workflow(
378+
ScheduleToStartTimeoutTestCallerWorkflow.run,
379+
id=str(uuid.uuid4()),
380+
task_queue=task_queue,
381+
)
382+
except Exception as err:
383+
assert isinstance(err, WorkflowFailureError)
384+
assert isinstance(err.__cause__, NexusOperationError)
385+
assert isinstance(err.__cause__.__cause__, TimeoutError)
386+
timeout_err = err.__cause__.__cause__
387+
assert timeout_err.type == TimeoutType.SCHEDULE_TO_START
388+
else:
389+
pytest.fail(
390+
"Expected exception due to schedule to start timeout of nexus operation"
391+
)
392+
393+
394+
# Start to close timeout test
395+
396+
397+
class OperationThatExpectsStartToCloseTimeoutAsync(OperationHandler[None, None]):
398+
async def start(
399+
self, ctx: StartOperationContext, input: None
400+
) -> StartOperationResultAsync:
401+
return StartOperationResultAsync("fake-token")
402+
403+
async def cancel(self, ctx: CancelOperationContext, token: str) -> None:
404+
pass
405+
406+
407+
@service_handler
408+
class StartToCloseTimeoutTestService:
409+
@operation_handler
410+
def expect_start_to_close_timeout(self) -> OperationHandler[None, None]:
411+
return OperationThatExpectsStartToCloseTimeoutAsync()
412+
413+
414+
@workflow.defn
415+
class StartToCloseTimeoutTestCallerWorkflow:
416+
@workflow.init
417+
def __init__(
418+
self,
419+
):
420+
self.nexus_client = workflow.create_nexus_client(
421+
service=StartToCloseTimeoutTestService,
422+
endpoint=make_nexus_endpoint_name(workflow.info().task_queue),
423+
)
424+
425+
@workflow.run
426+
async def run(self) -> None:
427+
op_handle = await self.nexus_client.start_operation(
428+
StartToCloseTimeoutTestService.expect_start_to_close_timeout,
429+
None,
430+
start_to_close_timeout=timedelta(seconds=0.1),
431+
)
432+
await op_handle
433+
434+
435+
async def test_error_raised_by_start_to_close_timeout_of_nexus_operation(
436+
client: Client, env: WorkflowEnvironment
437+
):
438+
if env.supports_time_skipping:
439+
pytest.skip("Nexus tests don't work with time-skipping server")
440+
441+
task_queue = str(uuid.uuid4())
442+
async with Worker(
443+
client,
444+
nexus_service_handlers=[StartToCloseTimeoutTestService()],
445+
workflows=[StartToCloseTimeoutTestCallerWorkflow],
446+
task_queue=task_queue,
447+
nexus_task_executor=concurrent.futures.ThreadPoolExecutor(),
448+
):
449+
await env.create_nexus_endpoint(
450+
make_nexus_endpoint_name(task_queue), task_queue
451+
)
452+
try:
453+
await client.execute_workflow(
454+
StartToCloseTimeoutTestCallerWorkflow.run,
455+
id=str(uuid.uuid4()),
456+
task_queue=task_queue,
457+
)
458+
except Exception as err:
459+
assert isinstance(err, WorkflowFailureError)
460+
assert isinstance(err.__cause__, NexusOperationError)
461+
timeout_err = err.__cause__.__cause__
462+
assert isinstance(timeout_err, TimeoutError)
463+
assert timeout_err.type == TimeoutType.START_TO_CLOSE
464+
else:
465+
pytest.fail(
466+
"Expected exception due to start to close timeout of nexus operation"
467+
)
468+
469+
326470
# Cancellation timeout test
327471

328472

0 commit comments

Comments
 (0)