diff --git a/src/bedrock_agentcore/runtime/app.py b/src/bedrock_agentcore/runtime/app.py index dd675b54..290a16ca 100644 --- a/src/bedrock_agentcore/runtime/app.py +++ b/src/bedrock_agentcore/runtime/app.py @@ -530,19 +530,37 @@ def _ensure_worker_loop(self) -> asyncio.AbstractEventLoop: return self._worker_loop with self._worker_loop_lock: if self._worker_loop is None or not self._worker_loop.is_running(): - self._worker_loop = asyncio.new_event_loop() + ready = threading.Event() self._worker_thread = threading.Thread( target=self._run_worker_loop, + args=(ready,), daemon=True, name="agentcore-worker-loop", ) self._worker_thread.start() + if not ready.wait(timeout=10): + raise RuntimeError("agentcore-worker-loop failed to start") return self._worker_loop - def _run_worker_loop(self) -> None: - """Entry point for the worker loop background thread.""" - asyncio.set_event_loop(self._worker_loop) - self._worker_loop.run_forever() + def _run_worker_loop(self, ready: threading.Event) -> None: + """Entry point for the worker loop background thread. + + The event loop is created here (inside the worker thread) rather than in + the parent thread to avoid conflicts with OpenTelemetry's threading + instrumentation, which propagates context from the parent thread and can + cause ``RuntimeError: Cannot run the event loop while another loop is + running``. + """ + # Clear any running-loop state that leaked from the parent thread + # (e.g. via OpenTelemetry's threading instrumentation context propagation). + # Without this, run_forever() raises RuntimeError because + # asyncio._get_running_loop() still returns the parent's loop. + asyncio._set_running_loop(None) + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + self._worker_loop = loop + loop.call_soon(ready.set) + loop.run_forever() @staticmethod async def _run_with_context(coro: Any, ctx: contextvars.Context) -> Any: diff --git a/tests/bedrock_agentcore/runtime/test_app.py b/tests/bedrock_agentcore/runtime/test_app.py index c4d37b89..bfc756a0 100644 --- a/tests/bedrock_agentcore/runtime/test_app.py +++ b/tests/bedrock_agentcore/runtime/test_app.py @@ -2715,3 +2715,38 @@ async def handler(payload): content = response.content.decode("utf-8") assert 'data: {"chunk": "a"}' in content assert 'data: {"chunk": "b"}' in content + + @pytest.mark.asyncio + async def test_worker_loop_compatible_with_otel_threading_instrumentation(self): + """Worker loop starts even when a running loop leaks into the child thread. + + OpenTelemetry's opentelemetry-instrumentation-threading wraps Thread.run() + to propagate trace context. This can leak the parent thread's running-loop + state into the child thread, causing: + RuntimeError: Cannot run the event loop while another loop is running + + The fix clears leaked running-loop state at the top of _run_worker_loop. + """ + app = BedrockAgentCoreApp() + ready = threading.Event() + + def otel_simulated_target(): + """Simulate OTEL wrapper leaking a running loop before _run_worker_loop.""" + leak = asyncio.new_event_loop() + asyncio._set_running_loop(leak) + try: + app._run_worker_loop(ready) + finally: + asyncio._set_running_loop(None) + leak.close() + + thread = threading.Thread(target=otel_simulated_target, daemon=True) + thread.start() + assert ready.wait(timeout=5), "Worker loop failed to start under OTEL-like wrapper" + + assert app._worker_loop is not None + assert app._worker_loop.is_running() + + # Verify the loop can actually execute work + future = asyncio.run_coroutine_threadsafe(asyncio.sleep(0, result="otel_ok"), app._worker_loop) + assert future.result(timeout=5) == "otel_ok"