From 2c07433fb22b22bf43a1abcf15f5a609cc65fb3d Mon Sep 17 00:00:00 2001 From: Radu Mihai Gheorghe Date: Wed, 4 Mar 2026 18:45:35 +0200 Subject: [PATCH 1/3] feat: add watchdog for runtime session handling --- .gitignore | 6 +- src/uipath_mcp/_cli/_runtime/_runtime.py | 71 ++++++- src/uipath_mcp/_cli/_runtime/_session.py | 57 ++++++ src/uipath_mcp/_cli/_runtime/_watchdog.py | 138 ++++++++++++++ tests/test_watchdog.py | 215 ++++++++++++++++++++++ 5 files changed, 484 insertions(+), 3 deletions(-) create mode 100644 src/uipath_mcp/_cli/_runtime/_watchdog.py create mode 100644 tests/test_watchdog.py diff --git a/.gitignore b/.gitignore index 854d55b..c41c1bb 100644 --- a/.gitignore +++ b/.gitignore @@ -165,7 +165,11 @@ cython_debug/ # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. -#.idea/ +.idea/ + +.vscode/ + +.claude/ # Ruff stuff: .ruff_cache/ diff --git a/src/uipath_mcp/_cli/_runtime/_runtime.py b/src/uipath_mcp/_cli/_runtime/_runtime.py index e1ae42a..4d56a57 100644 --- a/src/uipath_mcp/_cli/_runtime/_runtime.py +++ b/src/uipath_mcp/_cli/_runtime/_runtime.py @@ -37,8 +37,14 @@ from .._utils._config import McpServer from ._context import UiPathServerType from ._exception import McpErrorCode, UiPathMcpRuntimeError -from ._session import BaseSessionServer, StdioSessionServer, StreamableHttpSessionServer +from ._session import ( + BaseSessionServer, + SessionHealthInfo, + StdioSessionServer, + StreamableHttpSessionServer, +) from ._token_refresh import TokenRefresher +from ._watchdog import SessionWatchdog logger = logging.getLogger(__name__) tracer = trace.get_tracer(__name__) @@ -86,6 +92,7 @@ def __init__( self._http_stderr_drain_task: asyncio.Task[None] | None = None self._http_server_stderr_lines: list[str] = [] self._uipath = UiPath() + self._watchdog: SessionWatchdog | None = None self._token_refresher: TokenRefresher | None = None self._cleanup_done = False @@ -118,6 +125,46 @@ def _validate_auth(self) -> None: UiPathErrorCategory.SYSTEM, ) + def get_sessions(self) -> dict[str, SessionHealthInfo]: + """Return health info for all active sessions (SessionProvider protocol).""" + return { + sid: session.get_health_info() + for sid, session in self._session_servers.items() + } + + async def remove_session(self, session_id: str, reason: str) -> None: + """Remove and stop a session by ID (SessionProvider protocol).""" + session_server = self._session_servers.pop(session_id, None) + if session_server is not None: + logger.warning( + f"Removing session {session_id}: {reason}" + ) + try: + await session_server.stop() + except Exception: + logger.error( + f"Error stopping session {session_id} during watchdog removal", + exc_info=True, + ) + await self._close_session_on_server(session_id) + + async def _close_session_on_server(self, session_id: str) -> None: + """Notify the UiPath server to remove a session so it stops sending messages.""" + try: + await self._uipath.api_client.request_async( + "DELETE", + f"agenthub_/mcp/{self._folder_key}/{self.slug}", + headers={"mcp-session-id": session_id}, + ) + logger.info(f"Notified server of session closure: {session_id}") + except HTTPStatusError as e: + if e.response.status_code == 404: + logger.info(f"Session {session_id} already removed server-side") + else: + logger.error(f"Error closing session {session_id} on server: {e}") + except Exception as e: + logger.error(f"Error closing session {session_id} on server: {e}") + async def get_schema(self) -> UiPathRuntimeSchema: """Get schema for this MCP runtime. @@ -240,6 +287,9 @@ async def _run_server(self) -> UiPathRuntimeResult: run_task = asyncio.create_task(self._signalr_client.run()) cancel_task = asyncio.create_task(self._cancel_event.wait()) self._keep_alive_task = asyncio.create_task(self._keep_alive()) + + self._watchdog = SessionWatchdog(self) + self._watchdog.start() self._token_refresher.start() try: @@ -312,6 +362,10 @@ async def _cleanup(self) -> None: except asyncio.CancelledError: pass + if self._watchdog: + await self._watchdog.stop() + self._watchdog = None + for session_id, session_server in list(self._session_servers.items()): try: await session_server.stop() @@ -367,6 +421,10 @@ async def _handle_signalr_message(self, args: list[str]) -> None: """ Handle incoming SignalR messages. """ + + if self._cleanup_done: + return + if len(args) < 2: logger.error(f"Received invalid websocket message arguments: {args}") return @@ -769,7 +827,15 @@ async def on_keep_alive_response( logger.error(f"Error during keep-alive: {response.error}") return session_ids = response.result - logger.info(f"Active sessions: {session_ids}") + logger.info(f"Server active sessions: {session_ids}") + runtime_sessions = {} + for sid, s in self._session_servers.items(): + health = s.get_health_info() + runtime_sessions[sid] = { + "task_done": health.task_done, + "active_requests": health.active_request_count, + } + logger.info(f"Runtime active sessions: {runtime_sessions}") # If there are no active sessions and this is a sandbox environment # We need to cancel the runtime # eg: when user kills the agent that triggered the runtime, before we subscribe to events @@ -783,6 +849,7 @@ async def on_keep_alive_response( ) self._cancel_event.set() + if self._signalr_client: logger.info("Sending keep-alive ping...") await self._signalr_client.send( diff --git a/src/uipath_mcp/_cli/_runtime/_session.py b/src/uipath_mcp/_cli/_runtime/_session.py index 6eee861..8c92b0a 100644 --- a/src/uipath_mcp/_cli/_runtime/_session.py +++ b/src/uipath_mcp/_cli/_runtime/_session.py @@ -2,9 +2,12 @@ import io import logging import tempfile +import time from abc import ABC, abstractmethod +from dataclasses import dataclass from typing import Any +from anyio import EndOfStream from mcp import StdioServerParameters, stdio_client from mcp.client.streamable_http import streamable_http_client from mcp.shared.message import SessionMessage @@ -28,6 +31,19 @@ RETRY_DELAY = 1 +@dataclass +class SessionHealthInfo: + """Health information for a session, used by the watchdog.""" + + session_id: str + transport_type: str + task_done: bool + task_exception: BaseException | None + last_activity_time: float + queue_size: int + active_request_count: int + + class BaseSessionServer(ABC): """Base class with transport-agnostic message relay logic.""" @@ -48,9 +64,16 @@ def __init__( self._active_requests: dict[str, str] = {} self._last_request_id: str | None = None self._last_message_id: str | None = None + self._last_activity_time: float = time.monotonic() self._uipath = uipath self._mcp_tracer = McpTracer(tracer, logger) + @property + @abstractmethod + def transport_type(self) -> str: + """Returns the transport type identifier (e.g. 'stdio', 'streamable-http').""" + ... + @property @abstractmethod def output(self) -> str | None: @@ -79,8 +102,28 @@ async def stop(self) -> None: self._read_stream = None self._write_stream = None + def get_health_info(self) -> SessionHealthInfo: + """Return health information for this session.""" + task_done = self._run_task.done() if self._run_task else True + task_exception: BaseException | None = None + if task_done and self._run_task is not None: + try: + task_exception = self._run_task.exception() + except (asyncio.CancelledError, asyncio.InvalidStateError): + pass + return SessionHealthInfo( + session_id=self._session_id, + transport_type=self.transport_type, + task_done=task_done, + task_exception=task_exception, + last_activity_time=self._last_activity_time, + queue_size=self._message_queue.qsize(), + active_request_count=len(self._active_requests), + ) + async def on_message_received(self, request_id: str) -> None: """Get new incoming messages from UiPath MCP Server.""" + self._last_activity_time = time.monotonic() for attempt in range(MAX_RETRIES + 1): try: await self._get_messages_internal(request_id) @@ -115,6 +158,7 @@ async def _relay_messages(self) -> None: break session_message = await self._read_stream.receive() + self._last_activity_time = time.monotonic() if isinstance(session_message, Exception): logger.error(f"Received error: {session_message}") continue @@ -137,6 +181,11 @@ async def _relay_messages(self) -> None: # For non-responses, use the last known request_id if self._last_request_id is not None: await self._send_message(message, self._last_request_id) + except EndOfStream: + logger.warning( + f"Read stream closed for session {self._session_id}" + ) + break except Exception as e: if session_message: logger.info(session_message) @@ -292,6 +341,10 @@ class StdioSessionServer(BaseSessionServer): _server_stderr_output: str | None = None + @property + def transport_type(self) -> str: + return "stdio" + @property def output(self) -> str | None: """Returns the captured stderr output from the MCP server process.""" @@ -345,6 +398,10 @@ async def _run_server(self, server_params: StdioServerParameters) -> None: class StreamableHttpSessionServer(BaseSessionServer): """Manages an HTTP connection to a shared streamable-http server for a specific session.""" + @property + def transport_type(self) -> str: + return "streamable-http" + @property def output(self) -> str | None: """Returns captured output from the server process, if any.""" diff --git a/src/uipath_mcp/_cli/_runtime/_watchdog.py b/src/uipath_mcp/_cli/_runtime/_watchdog.py new file mode 100644 index 0000000..2e688a0 --- /dev/null +++ b/src/uipath_mcp/_cli/_runtime/_watchdog.py @@ -0,0 +1,138 @@ +import asyncio +import logging +import time +from typing import Protocol + +from opentelemetry import trace + +from ._session import SessionHealthInfo + +logger = logging.getLogger(__name__) +tracer = trace.get_tracer(__name__) + +WATCHDOG_CHECK_INTERVAL = 30 # seconds +SESSION_IDLE_TIMEOUT = 60 # 1 minute for testing purposes - to be changed to 1 hour + + +class SessionProvider(Protocol): + """Protocol for accessing and managing sessions.""" + + def get_sessions(self) -> dict[str, SessionHealthInfo]: ... + + async def remove_session(self, session_id: str, reason: str) -> None: ... + + +class SessionWatchdog: + """Periodically checks session health and removes dead or idle sessions.""" + + def __init__( + self, + provider: SessionProvider, + check_interval: float = WATCHDOG_CHECK_INTERVAL, + idle_timeout: float = SESSION_IDLE_TIMEOUT, + ): + self._provider = provider + self._check_interval = check_interval + self._idle_timeout = idle_timeout + self._task: asyncio.Task[None] | None = None + self._cancel_event = asyncio.Event() + + def start(self) -> None: + """Start the watchdog background task.""" + if self._task is not None: + return + self._cancel_event.clear() + self._task = asyncio.create_task(self._run()) + logger.info("Session watchdog started") + + async def stop(self) -> None: + """Stop the watchdog background task.""" + if self._task is None: + return + self._cancel_event.set() + try: + await asyncio.wait_for(self._task, timeout=5.0) + except (asyncio.TimeoutError, asyncio.CancelledError): + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + self._task = None + logger.info("Session watchdog stopped") + + async def _run(self) -> None: + """Main watchdog loop.""" + try: + while not self._cancel_event.is_set(): + try: + await self._check_sessions() + except Exception: + logger.error("Error during watchdog check cycle", exc_info=True) + + try: + await asyncio.wait_for( + self._cancel_event.wait(), timeout=self._check_interval + ) + break # cancel_event was set + except asyncio.TimeoutError: + continue + except asyncio.CancelledError: + logger.info("Session watchdog task cancelled") + raise + + async def _check_sessions(self) -> None: + """Inspect all sessions and remove dead or idle ones.""" + with tracer.start_as_current_span("watchdog.check_sessions") as span: + sessions = self._provider.get_sessions() + + if not sessions: + logger.debug("Watchdog check: no active sessions") + span.set_attribute("session_count", 0) + return + + now = time.monotonic() + removed_count = 0 + + for session_id, health in sessions.items(): + try: + transport = health.transport_type + if health.task_done: + if health.task_exception is not None: + logger.error( + f"Watchdog: {transport} session {session_id} task failed " + f"with exception: {health.task_exception}" + ) + else: + logger.warning( + f"Watchdog: {transport} session {session_id} task " + f"completed unexpectedly" + ) + await self._provider.remove_session( + session_id, reason="dead task" + ) + removed_count += 1 + continue + + idle_duration = now - health.last_activity_time + if idle_duration > self._idle_timeout: + logger.warning( + f"Watchdog: {transport} session {session_id} idle for " + f"{idle_duration:.0f}s (timeout: {self._idle_timeout}s)" + ) + await self._provider.remove_session( + session_id, reason="idle timeout" + ) + removed_count += 1 + except Exception: + logger.error( + f"Watchdog: error checking session {session_id}", + exc_info=True, + ) + + span.set_attribute("session_count", len(sessions)) + span.set_attribute("removed_count", removed_count) + logger.info( + f"Watchdog check: {len(sessions)} session(s), {removed_count} removed" + ) + diff --git a/tests/test_watchdog.py b/tests/test_watchdog.py new file mode 100644 index 0000000..ebcbd07 --- /dev/null +++ b/tests/test_watchdog.py @@ -0,0 +1,215 @@ +import asyncio +import time + +import pytest + +from uipath_mcp._cli._runtime._session import SessionHealthInfo +from uipath_mcp._cli._runtime._watchdog import SessionWatchdog + + +class MockSessionProvider: + """Mock provider for testing the watchdog.""" + + def __init__(self) -> None: + self.sessions: dict[str, SessionHealthInfo] = {} + self.removed: list[tuple[str, str]] = [] + + def get_sessions(self) -> dict[str, SessionHealthInfo]: + return dict(self.sessions) + + async def remove_session(self, session_id: str, reason: str) -> None: + self.removed.append((session_id, reason)) + self.sessions.pop(session_id, None) + + +def _make_health( + session_id: str = "test-session", + transport_type: str = "stdio", + task_done: bool = False, + task_exception: BaseException | None = None, + last_activity_time: float | None = None, + queue_size: int = 0, + active_request_count: int = 0, +) -> SessionHealthInfo: + return SessionHealthInfo( + session_id=session_id, + transport_type=transport_type, + task_done=task_done, + task_exception=task_exception, + last_activity_time=last_activity_time if last_activity_time is not None else time.monotonic(), + queue_size=queue_size, + active_request_count=active_request_count, + ) + + +@pytest.mark.asyncio +async def test_detects_dead_task_with_exception() -> None: + """Watchdog should remove sessions whose task has finished with an exception.""" + provider = MockSessionProvider() + provider.sessions["s1"] = _make_health( + session_id="s1", + task_done=True, + task_exception=RuntimeError("boom"), + ) + watchdog = SessionWatchdog(provider, check_interval=1, idle_timeout=900) + await watchdog._check_sessions() + + assert len(provider.removed) == 1 + assert provider.removed[0] == ("s1", "dead task") + + +@pytest.mark.asyncio +async def test_detects_dead_task_without_exception() -> None: + """Watchdog should remove sessions whose task completed without exception.""" + provider = MockSessionProvider() + provider.sessions["s1"] = _make_health( + session_id="s1", + task_done=True, + task_exception=None, + ) + watchdog = SessionWatchdog(provider, check_interval=1, idle_timeout=900) + await watchdog._check_sessions() + + assert len(provider.removed) == 1 + assert provider.removed[0] == ("s1", "dead task") + + +@pytest.mark.asyncio +async def test_detects_idle_timeout() -> None: + """Watchdog should remove sessions that exceed the idle timeout.""" + provider = MockSessionProvider() + provider.sessions["s1"] = _make_health( + session_id="s1", + task_done=False, + last_activity_time=time.monotonic() - 1000, # idle for 1000s + ) + watchdog = SessionWatchdog(provider, check_interval=1, idle_timeout=900) + await watchdog._check_sessions() + + assert len(provider.removed) == 1 + assert provider.removed[0] == ("s1", "idle timeout") + + +@pytest.mark.asyncio +async def test_healthy_session_not_removed() -> None: + """Watchdog should leave healthy, active sessions alone.""" + provider = MockSessionProvider() + provider.sessions["s1"] = _make_health( + session_id="s1", + task_done=False, + last_activity_time=time.monotonic(), # just active + ) + watchdog = SessionWatchdog(provider, check_interval=1, idle_timeout=900) + await watchdog._check_sessions() + + assert len(provider.removed) == 0 + assert "s1" in provider.sessions + + +@pytest.mark.asyncio +async def test_no_sessions_is_noop() -> None: + """Watchdog should handle empty session list gracefully.""" + provider = MockSessionProvider() + watchdog = SessionWatchdog(provider, check_interval=1, idle_timeout=900) + await watchdog._check_sessions() + + assert len(provider.removed) == 0 + + +@pytest.mark.asyncio +async def test_multiple_sessions_mixed() -> None: + """Watchdog should handle a mix of healthy, dead, and idle sessions.""" + provider = MockSessionProvider() + provider.sessions["healthy"] = _make_health( + session_id="healthy", task_done=False, last_activity_time=time.monotonic() + ) + provider.sessions["dead"] = _make_health( + session_id="dead", task_done=True, task_exception=RuntimeError("crash") + ) + provider.sessions["idle"] = _make_health( + session_id="idle", task_done=False, last_activity_time=time.monotonic() - 2000 + ) + watchdog = SessionWatchdog(provider, check_interval=1, idle_timeout=900) + await watchdog._check_sessions() + + removed_ids = {r[0] for r in provider.removed} + assert removed_ids == {"dead", "idle"} + assert "healthy" in provider.sessions + + +@pytest.mark.asyncio +async def test_start_stop_lifecycle() -> None: + """Watchdog should start and stop cleanly.""" + provider = MockSessionProvider() + watchdog = SessionWatchdog(provider, check_interval=0.1, idle_timeout=900) + + watchdog.start() + assert watchdog._task is not None + assert not watchdog._task.done() + + await asyncio.sleep(0.15) # let at least one cycle run + + await watchdog.stop() + assert watchdog._task is None + + +@pytest.mark.asyncio +async def test_stop_when_not_started() -> None: + """Stopping a watchdog that was never started should be a no-op.""" + provider = MockSessionProvider() + watchdog = SessionWatchdog(provider, check_interval=1, idle_timeout=900) + await watchdog.stop() # should not raise + + +@pytest.mark.asyncio +async def test_start_idempotent() -> None: + """Calling start() twice should not create a second task.""" + provider = MockSessionProvider() + watchdog = SessionWatchdog(provider, check_interval=1, idle_timeout=900) + watchdog.start() + first_task = watchdog._task + watchdog.start() + assert watchdog._task is first_task + await watchdog.stop() + + +@pytest.mark.asyncio +async def test_error_in_remove_session_does_not_crash_watchdog() -> None: + """Watchdog should survive errors from remove_session.""" + + class FailingProvider(MockSessionProvider): + async def remove_session(self, session_id: str, reason: str) -> None: + raise RuntimeError("removal failed") + + provider = FailingProvider() + provider.sessions["s1"] = _make_health(session_id="s1", task_done=True) + watchdog = SessionWatchdog(provider, check_interval=1, idle_timeout=900) + + # Should not raise + await watchdog._check_sessions() + + +@pytest.mark.asyncio +async def test_dead_task_prioritized_over_idle() -> None: + """A dead task should be detected even if the session is also idle.""" + provider = MockSessionProvider() + provider.sessions["s1"] = _make_health( + session_id="s1", + task_done=True, + last_activity_time=time.monotonic() - 2000, + ) + watchdog = SessionWatchdog(provider, check_interval=1, idle_timeout=900) + await watchdog._check_sessions() + + assert len(provider.removed) == 1 + assert provider.removed[0] == ("s1", "dead task") # dead task, not idle timeout + + +@pytest.mark.asyncio +async def test_transport_type_in_health_info() -> None: + """SessionHealthInfo should carry the transport type.""" + stdio_health = _make_health(session_id="s1", transport_type="stdio") + http_health = _make_health(session_id="s2", transport_type="streamable-http") + + assert stdio_health.transport_type == "stdio" + assert http_health.transport_type == "streamable-http" From 9dfeb8d96950e9e780a0278ed7ecc7987630865c Mon Sep 17 00:00:00 2001 From: Radu Mihai Gheorghe Date: Mon, 23 Mar 2026 17:38:09 +0200 Subject: [PATCH 2/3] refact: remove verbose session deletion code --- src/uipath_mcp/_cli/_runtime/_runtime.py | 84 +++++++---------------- src/uipath_mcp/_cli/_runtime/_session.py | 6 +- src/uipath_mcp/_cli/_runtime/_watchdog.py | 5 +- tests/test_watchdog.py | 6 +- 4 files changed, 31 insertions(+), 70 deletions(-) diff --git a/src/uipath_mcp/_cli/_runtime/_runtime.py b/src/uipath_mcp/_cli/_runtime/_runtime.py index 4d56a57..f01bf75 100644 --- a/src/uipath_mcp/_cli/_runtime/_runtime.py +++ b/src/uipath_mcp/_cli/_runtime/_runtime.py @@ -133,37 +133,29 @@ def get_sessions(self) -> dict[str, SessionHealthInfo]: } async def remove_session(self, session_id: str, reason: str) -> None: - """Remove and stop a session by ID (SessionProvider protocol).""" + """Pop, stop, and clean up a single session (SessionProvider protocol).""" session_server = self._session_servers.pop(session_id, None) - if session_server is not None: - logger.warning( - f"Removing session {session_id}: {reason}" - ) - try: - await session_server.stop() - except Exception: - logger.error( - f"Error stopping session {session_id} during watchdog removal", - exc_info=True, - ) - await self._close_session_on_server(session_id) + if session_server is None: + return + + logger.warning(f"Removing session {session_id}: {reason}") - async def _close_session_on_server(self, session_id: str) -> None: - """Notify the UiPath server to remove a session so it stops sending messages.""" try: - await self._uipath.api_client.request_async( - "DELETE", - f"agenthub_/mcp/{self._folder_key}/{self.slug}", - headers={"mcp-session-id": session_id}, + await session_server.stop() + except Exception: + logger.error( + f"Error stopping session {session_id}", + exc_info=True, ) - logger.info(f"Notified server of session closure: {session_id}") - except HTTPStatusError as e: - if e.response.status_code == 404: - logger.info(f"Session {session_id} already removed server-side") + + if session_server.output: + if self.sandboxed: + self._session_output = session_server.output else: - logger.error(f"Error closing session {session_id} on server: {e}") - except Exception as e: - logger.error(f"Error closing session {session_id} on server: {e}") + logger.info(f"Session {session_id} output: {session_server.output}") + + if self.sandboxed: + self._cancel_event.set() async def get_schema(self) -> UiPathRuntimeSchema: """Get schema for this MCP runtime. @@ -366,11 +358,8 @@ async def _cleanup(self) -> None: await self._watchdog.stop() self._watchdog = None - for session_id, session_server in list(self._session_servers.items()): - try: - await session_server.stop() - except Exception as e: - logger.error(f"Error cleaning up session server {session_id}: {str(e)}") + for session_id in list(self._session_servers.keys()): + await self.remove_session(session_id, reason="runtime shutdown") # Stop the shared HTTP server process (streamable-http only) await self._stop_http_server_process() @@ -396,26 +385,8 @@ async def _handle_signalr_session_closed(self, args: list[str]) -> None: return session_id = args[0] - logger.info(f"Received closed signal for session {session_id}") - - try: - session_server = self._session_servers.pop(session_id, None) - if session_server: - await session_server.stop() - if session_server.output: - if self.sandboxed: - self._session_output = session_server.output - else: - logger.info( - f"Session {session_id} output: {session_server.output}" - ) - # If this is a sandboxed runtime for a specific session, cancel the execution - if self.sandboxed: - self._cancel_event.set() - - except Exception as e: - logger.error(f"Error terminating session {session_id}: {str(e)}") + await self.remove_session(session_id, reason="server closed") async def _handle_signalr_message(self, args: list[str]) -> None: """ @@ -619,13 +590,9 @@ async def _monitor_http_server_process(self) -> None: # Stop all HTTP sessions, they will fail on next request anyway for session_id, session_server in list(self._session_servers.items()): if isinstance(session_server, StreamableHttpSessionServer): - try: - await session_server.stop() - except Exception as e: - logger.error( - f"Error stopping session {session_id} after process crash: {e}" - ) - self._session_servers.pop(session_id, None) + await self.remove_session( + session_id, reason="http process crash" + ) except asyncio.CancelledError: pass @@ -833,7 +800,7 @@ async def on_keep_alive_response( health = s.get_health_info() runtime_sessions[sid] = { "task_done": health.task_done, - "active_requests": health.active_request_count, + "active_requests": len(s._active_requests), } logger.info(f"Runtime active sessions: {runtime_sessions}") # If there are no active sessions and this is a sandbox environment @@ -849,7 +816,6 @@ async def on_keep_alive_response( ) self._cancel_event.set() - if self._signalr_client: logger.info("Sending keep-alive ping...") await self._signalr_client.send( diff --git a/src/uipath_mcp/_cli/_runtime/_session.py b/src/uipath_mcp/_cli/_runtime/_session.py index 8c92b0a..6867077 100644 --- a/src/uipath_mcp/_cli/_runtime/_session.py +++ b/src/uipath_mcp/_cli/_runtime/_session.py @@ -41,7 +41,6 @@ class SessionHealthInfo: task_exception: BaseException | None last_activity_time: float queue_size: int - active_request_count: int class BaseSessionServer(ABC): @@ -118,7 +117,6 @@ def get_health_info(self) -> SessionHealthInfo: task_exception=task_exception, last_activity_time=self._last_activity_time, queue_size=self._message_queue.qsize(), - active_request_count=len(self._active_requests), ) async def on_message_received(self, request_id: str) -> None: @@ -182,9 +180,7 @@ async def _relay_messages(self) -> None: if self._last_request_id is not None: await self._send_message(message, self._last_request_id) except EndOfStream: - logger.warning( - f"Read stream closed for session {self._session_id}" - ) + logger.warning(f"Read stream closed for session {self._session_id}") break except Exception as e: if session_message: diff --git a/src/uipath_mcp/_cli/_runtime/_watchdog.py b/src/uipath_mcp/_cli/_runtime/_watchdog.py index 2e688a0..e3392cf 100644 --- a/src/uipath_mcp/_cli/_runtime/_watchdog.py +++ b/src/uipath_mcp/_cli/_runtime/_watchdog.py @@ -104,9 +104,9 @@ async def _check_sessions(self) -> None: f"with exception: {health.task_exception}" ) else: - logger.warning( + logger.info( f"Watchdog: {transport} session {session_id} task " - f"completed unexpectedly" + f"completed, cleaning up" ) await self._provider.remove_session( session_id, reason="dead task" @@ -135,4 +135,3 @@ async def _check_sessions(self) -> None: logger.info( f"Watchdog check: {len(sessions)} session(s), {removed_count} removed" ) - diff --git a/tests/test_watchdog.py b/tests/test_watchdog.py index ebcbd07..d845210 100644 --- a/tests/test_watchdog.py +++ b/tests/test_watchdog.py @@ -29,16 +29,16 @@ def _make_health( task_exception: BaseException | None = None, last_activity_time: float | None = None, queue_size: int = 0, - active_request_count: int = 0, ) -> SessionHealthInfo: return SessionHealthInfo( session_id=session_id, transport_type=transport_type, task_done=task_done, task_exception=task_exception, - last_activity_time=last_activity_time if last_activity_time is not None else time.monotonic(), + last_activity_time=last_activity_time + if last_activity_time is not None + else time.monotonic(), queue_size=queue_size, - active_request_count=active_request_count, ) From a32082ac69fefed96caf00e7611e1a8396b19163 Mon Sep 17 00:00:00 2001 From: Radu Mihai Gheorghe Date: Mon, 23 Mar 2026 18:03:13 +0200 Subject: [PATCH 3/3] refactor: solve runtime code inconsistencies --- src/uipath_mcp/_cli/_runtime/_runtime.py | 119 ++++++++++------------- 1 file changed, 53 insertions(+), 66 deletions(-) diff --git a/src/uipath_mcp/_cli/_runtime/_runtime.py b/src/uipath_mcp/_cli/_runtime/_runtime.py index f01bf75..d74ddac 100644 --- a/src/uipath_mcp/_cli/_runtime/_runtime.py +++ b/src/uipath_mcp/_cli/_runtime/_runtime.py @@ -295,8 +295,8 @@ async def _run_server(self) -> UiPathRuntimeResult: ) self._cancel_event.set() finally: - # Cancel any pending tasks gracefully - for task in [run_task, cancel_task, self._keep_alive_task]: + # Cancel pending tasks + for task in [run_task, cancel_task]: if task and not task.done(): task.cancel() try: @@ -322,7 +322,7 @@ async def _run_server(self) -> UiPathRuntimeResult: except Exception as e: if isinstance(e, UiPathMcpRuntimeError): raise - detail = f"Error: {str(e)}" + detail = f"Error: {e}" raise UiPathMcpRuntimeError( UiPathErrorCode.EXECUTION_ERROR, "MCP Runtime execution failed", @@ -370,16 +370,17 @@ async def _cleanup(self) -> None: try: await transport._ws.close() except Exception as e: - logger.error(f"Error closing SignalR WebSocket: {str(e)}") + logger.error(f"Error closing SignalR WebSocket: {e}") # Add a small delay to allow the server to shut down gracefully if sys.platform == "win32": await asyncio.sleep(0.5) async def _handle_signalr_session_closed(self, args: list[str]) -> None: - """ - Handle session closed by server. - """ + """Handle session closed by server.""" + if self._cleanup_done: + return + if len(args) < 1: logger.error(f"Received invalid websocket message arguments: {args}") return @@ -389,10 +390,7 @@ async def _handle_signalr_session_closed(self, args: list[str]) -> None: await self.remove_session(session_id, reason="server closed") async def _handle_signalr_message(self, args: list[str]) -> None: - """ - Handle incoming SignalR messages. - """ - + """Handle incoming SignalR messages.""" if self._cleanup_done: return @@ -421,7 +419,7 @@ async def _handle_signalr_message(self, args: list[str]) -> None: await session_server.start() except Exception as e: logger.error( - f"Error starting session server for session {session_id}: {str(e)}" + f"Error starting session server for session {session_id}: {e}" ) await self._on_session_start_error(session_id) raise @@ -435,7 +433,7 @@ async def _handle_signalr_message(self, args: list[str]) -> None: except Exception as e: logger.error( - f"Error handling websocket notification for session {session_id}: {str(e)}" + f"Error handling websocket notification for session {session_id}: {e}" ) async def _handle_signalr_error(self, error: Any) -> None: @@ -450,17 +448,21 @@ async def _handle_signalr_close(self) -> None: """Handle SignalR connection close event.""" logger.info("Websocket connection closed.") - async def _start_http_server_process(self) -> None: - """Spawn the streamable-http server process. - - The process is started once and shared across all sessions. - """ + def _get_server_env(self) -> dict[str, str]: + """Return server env vars, with os.environ merged in for Coded servers.""" env_vars = self._server.env.copy() if self.server_type is UiPathServerType.Coded: for name, value in os.environ.items(): if name not in env_vars: env_vars[name] = value + return env_vars + async def _start_http_server_process(self) -> None: + """Spawn the streamable-http server process. + + The process is started once and shared across all sessions. + """ + env_vars = self._get_server_env() merged_env = {**os.environ, **env_vars} if env_vars else None self._http_server_stderr_lines = [] self._http_server_process = await asyncio.create_subprocess_exec( @@ -501,7 +503,12 @@ async def _wait_for_http_server_ready( url = self._server.url if not url: - raise ValueError("streamable-http transport requires url in config") + raise UiPathMcpRuntimeError( + McpErrorCode.CONFIGURATION_ERROR, + "Missing URL for streamable-http server", + "Please specify a 'url' in the server configuration for streamable-http transport.", + UiPathErrorCategory.SYSTEM, + ) for attempt in range(max_retries): # Check if process has crashed @@ -602,14 +609,6 @@ async def _register(self) -> None: initialization_successful = False tools_result: ListToolsResult | None = None server_stderr_output = "" - env_vars = self._server.env - - # if server is Coded, include environment variables - if self.server_type is UiPathServerType.Coded: - for name, value in os.environ.items(): - # config env variables should have precedence over system ones - if name not in env_vars: - env_vars[name] = value try: if self._server.is_streamable_http: @@ -649,7 +648,7 @@ async def _register(self) -> None: server_params = StdioServerParameters( command=self._server.command, args=self._server.args, - env=env_vars, + env=self._get_server_env(), ) with tempfile.TemporaryFile(mode="w+b") as stderr_temp_binary: @@ -779,49 +778,39 @@ async def _on_session_start_error(self, session_id: str) -> None: f"Error sending session dispose signal to UiPath MCP Server: {e}" ) + async def _on_keep_alive_response(self, response: CompletionMessage) -> None: + """Handle keep-alive response: log session state, detect orphaned sandboxed runtimes.""" + if response.error: + logger.error(f"Error during keep-alive: {response.error}") + return + session_ids = response.result + logger.info(f"Server active sessions: {session_ids}") + runtime_sessions = {} + for sid, s in self._session_servers.items(): + health = s.get_health_info() + runtime_sessions[sid] = { + "task_done": health.task_done, + "active_requests": len(s._active_requests), + } + logger.info(f"Runtime active sessions: {runtime_sessions}") + # If there are no active sessions and this is a sandbox environment + # We need to cancel the runtime + # eg: when user kills the agent that triggered the runtime, before we subscribe to events + if not session_ids and self.sandboxed and not self._cancel_event.is_set(): + logger.warning("No active sessions, cancelling sandboxed runtime...") + self._cancel_event.set() + async def _keep_alive(self) -> None: - """ - Heartbeat to keep the runtime available. - """ + """Heartbeat to keep the runtime available.""" try: while not self._cancel_event.is_set(): try: - - async def on_keep_alive_response( - response: CompletionMessage, - ) -> None: - if response.error: - logger.error(f"Error during keep-alive: {response.error}") - return - session_ids = response.result - logger.info(f"Server active sessions: {session_ids}") - runtime_sessions = {} - for sid, s in self._session_servers.items(): - health = s.get_health_info() - runtime_sessions[sid] = { - "task_done": health.task_done, - "active_requests": len(s._active_requests), - } - logger.info(f"Runtime active sessions: {runtime_sessions}") - # If there are no active sessions and this is a sandbox environment - # We need to cancel the runtime - # eg: when user kills the agent that triggered the runtime, before we subscribe to events - if ( - not session_ids - and self.sandboxed - and not self._cancel_event.is_set() - ): - logger.error( - "No active sessions, cancelling sandboxed runtime..." - ) - self._cancel_event.set() - if self._signalr_client: logger.info("Sending keep-alive ping...") await self._signalr_client.send( method="OnKeepAlive", arguments=[], - on_invocation=on_keep_alive_response, # type: ignore + on_invocation=self._on_keep_alive_response, # type: ignore ) else: logger.error("SignalR client not initialized during keep-alive") @@ -839,9 +828,7 @@ async def on_keep_alive_response( raise async def _on_runtime_abort(self) -> None: - """ - Sends a runtime abort signalr to terminate all connected sessions. - """ + """Send a runtime abort request to terminate all connected sessions.""" try: response = await self._uipath.api_client.request_async( "POST", @@ -854,7 +841,7 @@ async def _on_runtime_abort(self) -> None: ) else: logger.error( - f"Error sending runtime abort signalr to UiPath MCP Server: {response.status_code} - {response.text}" + f"Error sending runtime abort to UiPath MCP Server: {response.status_code} - {response.text}" ) except Exception as e: logger.error(