Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions src/bedrock_agentcore/runtime/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,19 +530,37 @@ def _ensure_worker_loop(self) -> asyncio.AbstractEventLoop:
return self._worker_loop
with self._worker_loop_lock:
if self._worker_loop is None or not self._worker_loop.is_running():
self._worker_loop = asyncio.new_event_loop()
ready = threading.Event()
self._worker_thread = threading.Thread(
target=self._run_worker_loop,
args=(ready,),
daemon=True,
name="agentcore-worker-loop",
)
self._worker_thread.start()
if not ready.wait(timeout=10):
raise RuntimeError("agentcore-worker-loop failed to start")
return self._worker_loop

def _run_worker_loop(self) -> None:
"""Entry point for the worker loop background thread."""
asyncio.set_event_loop(self._worker_loop)
self._worker_loop.run_forever()
def _run_worker_loop(self, ready: threading.Event) -> None:
"""Entry point for the worker loop background thread.

The event loop is created here (inside the worker thread) rather than in
the parent thread to avoid conflicts with OpenTelemetry's threading
instrumentation, which propagates context from the parent thread and can
cause ``RuntimeError: Cannot run the event loop while another loop is
running``.
"""
# Clear any running-loop state that leaked from the parent thread
# (e.g. via OpenTelemetry's threading instrumentation context propagation).
# Without this, run_forever() raises RuntimeError because
# asyncio._get_running_loop() still returns the parent's loop.
asyncio._set_running_loop(None)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self._worker_loop = loop
loop.call_soon(ready.set)
loop.run_forever()

@staticmethod
async def _run_with_context(coro: Any, ctx: contextvars.Context) -> Any:
Expand Down
35 changes: 35 additions & 0 deletions tests/bedrock_agentcore/runtime/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2715,3 +2715,38 @@ async def handler(payload):
content = response.content.decode("utf-8")
assert 'data: {"chunk": "a"}' in content
assert 'data: {"chunk": "b"}' in content

@pytest.mark.asyncio
async def test_worker_loop_compatible_with_otel_threading_instrumentation(self):
"""Worker loop starts even when a running loop leaks into the child thread.

OpenTelemetry's opentelemetry-instrumentation-threading wraps Thread.run()
to propagate trace context. This can leak the parent thread's running-loop
state into the child thread, causing:
RuntimeError: Cannot run the event loop while another loop is running

The fix clears leaked running-loop state at the top of _run_worker_loop.
"""
app = BedrockAgentCoreApp()
ready = threading.Event()

def otel_simulated_target():
"""Simulate OTEL wrapper leaking a running loop before _run_worker_loop."""
leak = asyncio.new_event_loop()
asyncio._set_running_loop(leak)
try:
app._run_worker_loop(ready)
finally:
asyncio._set_running_loop(None)
leak.close()

thread = threading.Thread(target=otel_simulated_target, daemon=True)
thread.start()
assert ready.wait(timeout=5), "Worker loop failed to start under OTEL-like wrapper"

assert app._worker_loop is not None
assert app._worker_loop.is_running()

# Verify the loop can actually execute work
future = asyncio.run_coroutine_threadsafe(asyncio.sleep(0, result="otel_ok"), app._worker_loop)
assert future.result(timeout=5) == "otel_ok"
Loading