From 5d463ee0ef023c4ad10ca73152c53a2edf46349a Mon Sep 17 00:00:00 2001 From: Aidan Daly Date: Fri, 10 Apr 2026 15:17:30 -0400 Subject: [PATCH 1/2] fix: make agentcore-worker-loop compatible with OTEL threading instrumentation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When opentelemetry-instrumentation-threading is active (e.g. via opentelemetry-instrument in Dockerfile CMD), Thread.run() is wrapped to propagate trace context from the parent thread. This leaks the parent's running event loop state into the worker thread, causing run_forever() to raise "RuntimeError: Cannot run the event loop while another loop is running". Three changes to _run_worker_loop / _ensure_worker_loop: 1. Clear leaked running-loop state with asyncio._set_running_loop(None) at the top of the worker thread — this is the critical fix. 2. Create the event loop inside the worker thread (not the parent) to follow the canonical asyncio pattern and eliminate cross-thread state leakage. 3. Use threading.Event + loop.call_soon(ready.set) so the parent waits until the loop is truly running before returning a reference, fixing a pre-existing race condition. --- src/bedrock_agentcore/runtime/app.py | 28 +++++++++++++--- tests/bedrock_agentcore/runtime/test_app.py | 37 +++++++++++++++++++++ 2 files changed, 60 insertions(+), 5 deletions(-) diff --git a/src/bedrock_agentcore/runtime/app.py b/src/bedrock_agentcore/runtime/app.py index dd675b54..290a16ca 100644 --- a/src/bedrock_agentcore/runtime/app.py +++ b/src/bedrock_agentcore/runtime/app.py @@ -530,19 +530,37 @@ def _ensure_worker_loop(self) -> asyncio.AbstractEventLoop: return self._worker_loop with self._worker_loop_lock: if self._worker_loop is None or not self._worker_loop.is_running(): - self._worker_loop = asyncio.new_event_loop() + ready = threading.Event() self._worker_thread = threading.Thread( target=self._run_worker_loop, + args=(ready,), daemon=True, name="agentcore-worker-loop", ) self._worker_thread.start() + if not ready.wait(timeout=10): + raise RuntimeError("agentcore-worker-loop failed to start") return self._worker_loop - def _run_worker_loop(self) -> None: - """Entry point for the worker loop background thread.""" - asyncio.set_event_loop(self._worker_loop) - self._worker_loop.run_forever() + def _run_worker_loop(self, ready: threading.Event) -> None: + """Entry point for the worker loop background thread. + + The event loop is created here (inside the worker thread) rather than in + the parent thread to avoid conflicts with OpenTelemetry's threading + instrumentation, which propagates context from the parent thread and can + cause ``RuntimeError: Cannot run the event loop while another loop is + running``. + """ + # Clear any running-loop state that leaked from the parent thread + # (e.g. via OpenTelemetry's threading instrumentation context propagation). + # Without this, run_forever() raises RuntimeError because + # asyncio._get_running_loop() still returns the parent's loop. + asyncio._set_running_loop(None) + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + self._worker_loop = loop + loop.call_soon(ready.set) + loop.run_forever() @staticmethod async def _run_with_context(coro: Any, ctx: contextvars.Context) -> Any: diff --git a/tests/bedrock_agentcore/runtime/test_app.py b/tests/bedrock_agentcore/runtime/test_app.py index c4d37b89..15d1ad07 100644 --- a/tests/bedrock_agentcore/runtime/test_app.py +++ b/tests/bedrock_agentcore/runtime/test_app.py @@ -2715,3 +2715,40 @@ async def handler(payload): content = response.content.decode("utf-8") assert 'data: {"chunk": "a"}' in content assert 'data: {"chunk": "b"}' in content + + @pytest.mark.asyncio + async def test_worker_loop_compatible_with_otel_threading_instrumentation(self): + """Worker loop starts even when a running loop leaks into the child thread. + + OpenTelemetry's opentelemetry-instrumentation-threading wraps Thread.run() + to propagate trace context. This can leak the parent thread's running-loop + state into the child thread, causing: + RuntimeError: Cannot run the event loop while another loop is running + + The fix clears leaked running-loop state at the top of _run_worker_loop. + """ + app = BedrockAgentCoreApp() + ready = threading.Event() + + def otel_simulated_target(): + """Simulate OTEL wrapper leaking a running loop before _run_worker_loop.""" + leak = asyncio.new_event_loop() + asyncio._set_running_loop(leak) + try: + app._run_worker_loop(ready) + finally: + asyncio._set_running_loop(None) + leak.close() + + thread = threading.Thread(target=otel_simulated_target, daemon=True) + thread.start() + assert ready.wait(timeout=5), "Worker loop failed to start under OTEL-like wrapper" + + assert app._worker_loop is not None + assert app._worker_loop.is_running() + + # Verify the loop can actually execute work + future = asyncio.run_coroutine_threadsafe( + asyncio.sleep(0, result="otel_ok"), app._worker_loop + ) + assert future.result(timeout=5) == "otel_ok" From 82aaf17787350313c74e2829e7731b6cf374273e Mon Sep 17 00:00:00 2001 From: Aidan Daly Date: Fri, 10 Apr 2026 15:19:45 -0400 Subject: [PATCH 2/2] style: fix ruff formatting in test file --- tests/bedrock_agentcore/runtime/test_app.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/bedrock_agentcore/runtime/test_app.py b/tests/bedrock_agentcore/runtime/test_app.py index 15d1ad07..bfc756a0 100644 --- a/tests/bedrock_agentcore/runtime/test_app.py +++ b/tests/bedrock_agentcore/runtime/test_app.py @@ -2748,7 +2748,5 @@ def otel_simulated_target(): assert app._worker_loop.is_running() # Verify the loop can actually execute work - future = asyncio.run_coroutine_threadsafe( - asyncio.sleep(0, result="otel_ok"), app._worker_loop - ) + future = asyncio.run_coroutine_threadsafe(asyncio.sleep(0, result="otel_ok"), app._worker_loop) assert future.result(timeout=5) == "otel_ok"