Skip to content

Commit bee475e

Browse files
committed
test: deterministically cover message_router closed-stream handler
Replace the two remaining `lax no cover` pragmas with a unit test that deterministically triggers the race the handler defends against. The scenario: sse_writer receives a JSONRPCResponse and breaks its async-for loop; its finally block pops _request_streams. If message_router is concurrently routing a trailing message to the same request_id, the membership check passes but send() hits a closed receiver. Under xdist the scheduler contention made this fire ~15% of the time — enough to break strict-no-cover but not enough to rely on. The new test injects a closed-receiver stream directly into _request_streams, routes a message, and asserts the handler popped the stale entry. No scheduler timing involved. Also removed `# pragma: no cover` from `except Exception` in standalone_sse_writer (line 717) — the thread-based GET stream tests reliably cover it (both sequential and xdist). It was only unhit on main because subprocess tests couldn't see into the subprocess. Net: 0 lax no cover added, 2 more no cover removed.
1 parent b9b1656 commit bee475e

File tree

2 files changed

+42
-12
lines changed

2 files changed

+42
-12
lines changed

src/mcp/server/streamable_http.py

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -715,11 +715,7 @@ async def standalone_sse_writer():
715715
event_data = self._create_event_data(event_message)
716716
await sse_stream_writer.send(event_data)
717717
except Exception:
718-
# Timing race: if task cancellation reaches this writer before it
719-
# hits the closed stream, we get CancelledError (not caught here).
720-
# If the closed-stream error fires first, this logs. Neither order is
721-
# a bug — both are valid shutdown sequences.
722-
logger.exception("Error in standalone SSE writer") # pragma: lax no cover
718+
logger.exception("Error in standalone SSE writer")
723719
finally:
724720
logger.debug("Closing standalone SSE writer")
725721
await self._clean_up_memory_streams(GET_STREAM_KEY)
@@ -1019,13 +1015,7 @@ async def message_router():
10191015
try:
10201016
# Send both the message and the event ID
10211017
await self._request_streams[request_stream_id][0].send(EventMessage(message, event_id))
1022-
except ( # pragma: lax no cover
1023-
# Timing race: if cancellation reaches this coroutine
1024-
# before send() hits the closed stream, CancelledError
1025-
# propagates instead. Either shutdown sequence is valid.
1026-
anyio.BrokenResourceError,
1027-
anyio.ClosedResourceError,
1028-
):
1018+
except (anyio.BrokenResourceError, anyio.ClosedResourceError):
10291019
# Stream might be closed, remove from registry
10301020
self._request_streams.pop(request_stream_id, None)
10311021
else:

tests/shared/test_streamable_http.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1705,6 +1705,46 @@ async def test_priming_event_not_sent_for_old_protocol_version():
17051705
await read_stream.aclose()
17061706

17071707

1708+
@pytest.mark.anyio
1709+
async def test_message_router_handles_closed_request_stream():
1710+
"""message_router must survive a stream closing between membership check and send.
1711+
1712+
Real scenario: sse_writer receives a JSONRPCResponse, breaks its async-for,
1713+
and runs cleanup — while message_router is concurrently routing a trailing
1714+
notification to the same request_id. The router's `if id in _request_streams`
1715+
check passes, but by the time send() tries to deliver, the receiver is gone.
1716+
The except (BrokenResourceError, ClosedResourceError) handler must catch
1717+
this and pop the stale entry so the router keeps routing other messages.
1718+
1719+
This test reproduces the race deterministically rather than relying on
1720+
scheduler timing (which only triggers under xdist contention).
1721+
"""
1722+
transport = StreamableHTTPServerTransport(mcp_session_id="test-session")
1723+
1724+
async with transport.connect() as (_, write_stream):
1725+
# Register a request stream pair as if a POST request had opened one,
1726+
# then close the receiver to simulate the sse_writer breaking its loop
1727+
# before cleanup pops the dict entry.
1728+
request_id = "req-1"
1729+
send_side, recv_side = anyio.create_memory_object_stream[EventMessage](0)
1730+
transport._request_streams[request_id] = (send_side, recv_side)
1731+
await recv_side.aclose() # receiver gone — paired sender will now raise
1732+
1733+
# Route a response targeting that request_id. message_router's membership
1734+
# check passes (still in dict), send() raises BrokenResourceError, the
1735+
# except handler pops the entry.
1736+
response = types.JSONRPCResponse(jsonrpc="2.0", id=request_id, result={})
1737+
await write_stream.send(SessionMessage(response))
1738+
1739+
# Give the router a tick to process
1740+
with anyio.fail_after(1):
1741+
while request_id in transport._request_streams:
1742+
await anyio.sleep(0)
1743+
1744+
assert request_id not in transport._request_streams
1745+
await send_side.aclose()
1746+
1747+
17081748
@pytest.mark.anyio
17091749
async def test_priming_event_not_sent_without_event_store():
17101750
"""Test that _maybe_send_priming_event returns early when no event_store is configured."""

0 commit comments

Comments
 (0)