Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/uipath/runtime/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ class UiPathExecuteOptions(BaseModel):
default=None,
description="List of nodes or '*' to break on all steps.",
)
wait_for_triggers: bool = Field(
default=False,
description="When True, poll triggers until completion instead of suspending. "
"This keeps the process running and automatically resumes when triggers complete.",
)
trigger_poll_interval: float = Field(
default=5.0,
description="Seconds between poll attempts when wait_for_triggers is True.",
ge=0.1,
)

model_config = {"arbitrary_types_allowed": True, "extra": "allow"}

Expand Down
105 changes: 45 additions & 60 deletions src/uipath/runtime/debug/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
import logging
from typing import Any, AsyncGenerator, cast

from uipath.core.errors import UiPathPendingTriggerError

from uipath.runtime.base import (
UiPathExecuteOptions,
UiPathRuntimeProtocol,
Expand All @@ -25,7 +23,7 @@
UiPathRuntimeResult,
UiPathRuntimeStatus,
)
from uipath.runtime.resumable.protocols import UiPathResumeTriggerReaderProtocol
from uipath.runtime.resumable.polling import TriggerPoller
from uipath.runtime.resumable.runtime import UiPathResumableRuntime
from uipath.runtime.resumable.trigger import (
UiPathResumeTrigger,
Expand Down Expand Up @@ -203,8 +201,7 @@ async def _stream_and_debug(
)
else:
trigger_data = await self._poll_trigger(
final_result.trigger,
self.delegate.trigger_manager,
final_result.trigger
)
resume_data = {interrupt_id: trigger_data}
except UiPathDebugQuitError:
Expand Down Expand Up @@ -245,77 +242,65 @@ async def dispose(self) -> None:
logger.warning(f"Error disconnecting debug bridge: {e}")

async def _poll_trigger(
self, trigger: UiPathResumeTrigger, reader: UiPathResumeTriggerReaderProtocol
self, trigger: UiPathResumeTrigger
) -> dict[str, Any] | None:
"""Poll a resume trigger until data is available.

Args:
trigger: The trigger to poll
reader: The trigger reader to use for polling

Returns:
Resume data when available, or None if polling exhausted
Resume data when available, or None if polling was stopped

Raises:
UiPathDebugQuitError: If quit is requested during polling
"""
attempt = 0
while True:
attempt += 1

try:
resume_data = await reader.read_trigger(trigger)

if resume_data is not None:
return resume_data

await self.debug_bridge.emit_state_update(
UiPathRuntimeStateEvent(
node_name="<polling>",
payload={
"attempt": attempt,
},
)
self._quit_requested = False

async def on_poll_attempt(attempt: int, info: str | None) -> None:
"""Callback for each poll attempt."""
payload: dict[str, Any] = {"attempt": attempt}
if info:
payload["info"] = info
await self.debug_bridge.emit_state_update(
UiPathRuntimeStateEvent(
node_name="<polling>",
payload=payload,
)
)

await self._wait_with_quit_check()

except UiPathDebugQuitError:
raise

except UiPathPendingTriggerError as e:
await self.debug_bridge.emit_state_update(
UiPathRuntimeStateEvent(
node_name="<polling>",
payload={
"attempt": attempt,
"info": str(e),
},
)
async def should_stop() -> bool:
"""Check if quit was requested."""
# Check for termination request with a short timeout
try:
term_task = asyncio.create_task(self.debug_bridge.wait_for_terminate())
done, _ = await asyncio.wait(
{term_task},
timeout=0.01, # Very short timeout just to check
)
if term_task in done:
self._quit_requested = True
return True
else:
term_task.cancel()
try:
await term_task
except asyncio.CancelledError:
pass
except Exception:
pass
return False

await self._wait_with_quit_check()

async def _wait_with_quit_check(self) -> None:
"""Wait for specified seconds, but allow quit command to interrupt.

Raises:
UiPathDebugQuitError: If quit is requested during wait
"""
sleep_task = asyncio.create_task(asyncio.sleep(self.trigger_poll_interval))
term_task = asyncio.create_task(self.debug_bridge.wait_for_terminate())

done, pending = await asyncio.wait(
{sleep_task, term_task},
return_when=asyncio.FIRST_COMPLETED,
poller = TriggerPoller(
reader=self.delegate.trigger_manager,
poll_interval=self.trigger_poll_interval,
on_poll_attempt=on_poll_attempt,
should_stop=should_stop,
)

for task in pending:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
result = await poller.poll_trigger(trigger)

if term_task in done:
if self._quit_requested:
raise UiPathDebugQuitError("Debugging terminated during polling.")

return result
2 changes: 2 additions & 0 deletions src/uipath/runtime/resumable/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Module for resumable runtime features."""

from uipath.runtime.resumable.polling import TriggerPoller
from uipath.runtime.resumable.protocols import (
UiPathResumableStorageProtocol,
UiPathResumeTriggerCreatorProtocol,
Expand All @@ -20,4 +21,5 @@
"UiPathResumeTrigger",
"UiPathResumeTriggerType",
"UiPathApiTrigger",
"TriggerPoller",
]
118 changes: 118 additions & 0 deletions src/uipath/runtime/resumable/polling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
"""Trigger polling utilities for resumable runtime."""

import asyncio
import logging
from typing import Any, Callable, Coroutine

from uipath.core.errors import UiPathPendingTriggerError

from uipath.runtime.resumable.protocols import UiPathResumeTriggerReaderProtocol
from uipath.runtime.resumable.trigger import UiPathResumeTrigger

logger = logging.getLogger(__name__)


class TriggerPoller:
"""Utility for polling resume triggers until completion.

This class provides reusable polling logic for waiting on triggers
to complete, used by both debug runtime and resumable runtime when
wait_for_triggers is enabled.
"""

def __init__(
self,
reader: UiPathResumeTriggerReaderProtocol,
poll_interval: float = 5.0,
on_poll_attempt: Callable[[int, str | None], Coroutine[Any, Any, None]]
| None = None,
should_stop: Callable[[], Coroutine[Any, Any, bool]] | None = None,
):
"""Initialize the trigger poller.

Args:
reader: The trigger reader to use for polling
poll_interval: Seconds between poll attempts
on_poll_attempt: Optional callback for each poll attempt (attempt_num, info)
should_stop: Optional async callback to check if polling should stop early
"""
self.reader = reader
self.poll_interval = poll_interval
self.on_poll_attempt = on_poll_attempt
self.should_stop = should_stop

async def poll_trigger(self, trigger: UiPathResumeTrigger) -> Any | None:
"""Poll a single trigger until data is available.

Args:
trigger: The trigger to poll

Returns:
Resume data when available, or None if polling was stopped

Raises:
Exception: If trigger reading fails with non-pending error
"""
attempt = 0
while True:
attempt += 1

# Check if we should stop
if self.should_stop and await self.should_stop():
logger.debug("Polling stopped by should_stop callback")
return None

try:
resume_data = await self.reader.read_trigger(trigger)

if resume_data is not None:
logger.debug(
f"Trigger {trigger.interrupt_id} completed after {attempt} attempts"
)
return resume_data

# Notify about poll attempt
if self.on_poll_attempt:
await self.on_poll_attempt(attempt, None)

await asyncio.sleep(self.poll_interval)

except UiPathPendingTriggerError as e:
# Trigger still pending, notify and continue polling
if self.on_poll_attempt:
await self.on_poll_attempt(attempt, str(e))

await asyncio.sleep(self.poll_interval)

async def poll_all_triggers(
self, triggers: list[UiPathResumeTrigger]
) -> dict[str, Any]:
"""Poll all triggers until they complete.

Args:
triggers: List of triggers to poll

Returns:
Dict mapping interrupt_id to resume data for completed triggers
"""
resume_map: dict[str, Any] = {}

# Poll triggers concurrently
async def poll_single(trigger: UiPathResumeTrigger) -> tuple[str | None, Any]:
data = await self.poll_trigger(trigger)
return trigger.interrupt_id, data

results = await asyncio.gather(
*[poll_single(trigger) for trigger in triggers],
return_exceptions=True,
)

for result in results:
if isinstance(result, Exception):
logger.error(f"Trigger polling failed: {result}")
raise result
interrupt_id, data = result
if interrupt_id and data is not None:
resume_map[interrupt_id] = data

return resume_map
Loading
Loading