-
Notifications
You must be signed in to change notification settings - Fork 1
feat: resume runtime on fired triggers #70
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,11 +12,15 @@ | |
| ) | ||
| from uipath.runtime.debug.breakpoint import UiPathBreakpointResult | ||
| from uipath.runtime.events import UiPathRuntimeEvent | ||
| from uipath.runtime.result import UiPathRuntimeResult, UiPathRuntimeStatus | ||
| from uipath.runtime.result import ( | ||
| UiPathRuntimeResult, | ||
| UiPathRuntimeStatus, | ||
| ) | ||
| from uipath.runtime.resumable.protocols import ( | ||
| UiPathResumableStorageProtocol, | ||
| UiPathResumeTriggerProtocol, | ||
| ) | ||
| from uipath.runtime.resumable.trigger import UiPathResumeTrigger | ||
| from uipath.runtime.schema import UiPathRuntimeSchema | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
@@ -68,12 +72,28 @@ async def execute( | |
| """ | ||
| # If resuming, restore trigger from storage | ||
| if options and options.resume: | ||
| # restore trigger from storage | ||
| input = await self._restore_resume_input(input) | ||
|
|
||
| # Execute the delegate | ||
| result = await self.delegate.execute(input, options=options) | ||
| # If suspended, create and persist trigger | ||
| return await self._handle_suspension(result) | ||
| while True: | ||
| # Execute the delegate | ||
| result = await self.delegate.execute(input, options=options) | ||
| # If suspended, create and persist trigger | ||
| suspension_result = await self._handle_suspension(result) | ||
|
|
||
| # check if any trigger may be resumed | ||
| if not suspension_result.is_suspended() or not ( | ||
| fired_triggers := await self._restore_resume_input(None) | ||
| ): | ||
| return suspension_result | ||
|
|
||
| # Note: when resuming a job, orchestrator deletes all triggers associated with it, | ||
| # thus we can resume the runtime at this point without worrying a trigger may be fired 'twice' | ||
| input = fired_triggers | ||
| if not options: | ||
| options = UiPathExecuteOptions(resume=True) | ||
| else: | ||
| options.resume = True | ||
|
|
||
| async def stream( | ||
| self, | ||
|
|
@@ -94,15 +114,31 @@ async def stream( | |
| input = await self._restore_resume_input(input) | ||
|
|
||
| final_result: UiPathRuntimeResult | None = None | ||
| async for event in self.delegate.stream(input, options=options): | ||
| if isinstance(event, UiPathRuntimeResult): | ||
| final_result = event | ||
| else: | ||
| yield event | ||
| while True: | ||
| async for event in self.delegate.stream(input, options=options): | ||
| if isinstance(event, UiPathRuntimeResult): | ||
| final_result = event | ||
| else: | ||
| yield event | ||
|
|
||
| # If suspended, create and persist trigger | ||
| if final_result: | ||
| suspension_result = await self._handle_suspension(final_result) | ||
|
|
||
| if suspension_result.status != UiPathRuntimeStatus.SUSPENDED or not ( | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We either use is_suspended everywhere or just check for UiPathRuntimeStatus everywhere (I would incline towards the latter in this low-level concepts)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good catch, forgot to replace it here. |
||
| fired_triggers := await self._restore_resume_input(None) | ||
| ): | ||
| yield suspension_result | ||
| return | ||
|
|
||
| # If suspended, create and persist trigger | ||
| if final_result: | ||
| yield await self._handle_suspension(final_result) | ||
| # Note: when resuming a job, orchestrator deletes all triggers associated with it, | ||
| # thus we can resume the runtime at this point without worrying a trigger may be fired 'twice' | ||
| input = fired_triggers | ||
|
|
||
| if not options: | ||
| options = UiPathStreamOptions(resume=True) | ||
| else: | ||
| options.resume = True | ||
|
|
||
| async def _restore_resume_input( | ||
| self, input: dict[str, Any] | None | ||
|
|
@@ -142,6 +178,11 @@ async def _restore_resume_input( | |
| if not triggers: | ||
| return None | ||
|
|
||
| return await self._build_resume_map(triggers) | ||
|
|
||
| async def _build_resume_map( | ||
| self, triggers: list[UiPathResumeTrigger] | ||
| ) -> dict[str, Any]: | ||
| # Build resume map: {interrupt_id: resume_data} | ||
| resume_map: dict[str, Any] = {} | ||
| for trigger in triggers: | ||
|
|
@@ -166,11 +207,10 @@ async def _handle_suspension( | |
| Args: | ||
| result: The execution result to check for suspension | ||
| """ | ||
| # Only handle suspensions | ||
| if result.status != UiPathRuntimeStatus.SUSPENDED: | ||
| return result | ||
|
|
||
| if isinstance(result, UiPathBreakpointResult): | ||
| # Only handle interrupt suspensions | ||
| if result.status != UiPathRuntimeStatus.SUSPENDED or isinstance( | ||
| result, UiPathBreakpointResult | ||
| ): | ||
| return result | ||
|
|
||
| suspended_result = UiPathRuntimeResult( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and change the return with
I remember issues with returning during a while loop in an async generator