Skip to content

Commit 36a974a

Browse files
committed
fix: handle transport close with pending server-to-client requests
Two additional races in the same transport-close window as the previous commit, both triggered when handlers are blocked on server-to-client requests (sampling, roots, elicitation) at the moment the transport closes: 1. _receive_loop's finally iterates _response_streams.items() with await checkpoints inside the loop. The woken handler's send_request finally pops from that dict before the iterator's next __next__(), raising RuntimeError: dictionary changed size during iteration. Fix: snapshot with list() before iterating. 2. The woken handler's send_request raises MCPError (CONNECTION_CLOSED), which _handle_request catches and converts to an error response. It then falls through to message.respond() against a write stream that _receive_loop already closed. Fix: catch ClosedResourceError and drop the response. Both reproduce deterministically with two handlers blocked on list_roots() when to_server is closed. Single test covers both: fails 20/20 with either fix reverted, passes 50/50 with both.
1 parent f638a9d commit 36a974a

File tree

3 files changed

+89
-2
lines changed

3 files changed

+89
-2
lines changed

src/mcp/server/lowlevel/server.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,14 @@ async def _handle_request(
490490
raise err
491491
response = types.ErrorData(code=0, message=str(err))
492492

493-
await message.respond(response)
493+
try:
494+
await message.respond(response)
495+
except anyio.ClosedResourceError:
496+
# Transport closed between handler unblocking and respond. Happens
497+
# when _receive_loop's finally wakes a handler blocked on
498+
# send_request: the handler runs to respond() before run()'s TG
499+
# cancel fires, but after _receive_loop closed _write_stream.
500+
logger.debug("Response for %s dropped - transport closed", message.request_id)
494501
else: # pragma: no cover
495502
await message.respond(types.ErrorData(code=types.METHOD_NOT_FOUND, message="Method not found"))
496503

src/mcp/shared/session.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,9 @@ async def _receive_loop(self) -> None:
418418
finally:
419419
# after the read stream is closed, we need to send errors
420420
# to any pending requests
421-
for id, stream in self._response_streams.items():
421+
# Snapshot: stream.send() wakes the waiter, whose finally pops
422+
# from _response_streams before the next __next__() call.
423+
for id, stream in list(self._response_streams.items()):
422424
error = ErrorData(code=CONNECTION_CLOSED, message="Connection closed")
423425
try:
424426
await stream.send(JSONRPCError(jsonrpc="2.0", id=id, error=error))

tests/server/test_cancel_handling.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,3 +170,81 @@ async def run_server():
170170
await server_run_returned.wait()
171171

172172
assert handler_cancelled.is_set()
173+
174+
175+
@pytest.mark.anyio
176+
async def test_server_handles_transport_close_with_pending_server_to_client_requests():
177+
"""When the transport closes while handlers are blocked on server→client
178+
requests (sampling, roots, elicitation), server.run() must still exit cleanly.
179+
180+
Two bugs covered:
181+
1. _receive_loop's finally iterates _response_streams with await checkpoints
182+
inside; the woken handler's send_request finally pops from that dict
183+
before the next __next__() — RuntimeError: dictionary changed size.
184+
2. The woken handler's MCPError is caught in _handle_request, which falls
185+
through to respond() against a write stream _receive_loop already closed.
186+
"""
187+
handlers_started = 0
188+
both_started = anyio.Event()
189+
server_run_returned = anyio.Event()
190+
191+
async def handle_call_tool(ctx: ServerRequestContext, params: CallToolRequestParams) -> CallToolResult:
192+
nonlocal handlers_started
193+
handlers_started += 1
194+
if handlers_started == 2:
195+
both_started.set()
196+
# Blocks on send_request waiting for a client response that never comes.
197+
# _receive_loop's finally will wake this with CONNECTION_CLOSED.
198+
await ctx.session.list_roots()
199+
raise AssertionError # pragma: no cover
200+
201+
server = Server("test", on_call_tool=handle_call_tool)
202+
203+
to_server, server_read = anyio.create_memory_object_stream[SessionMessage | Exception](10)
204+
server_write, from_server = anyio.create_memory_object_stream[SessionMessage](10)
205+
206+
async def run_server():
207+
await server.run(server_read, server_write, server.create_initialization_options())
208+
server_run_returned.set()
209+
210+
init_req = JSONRPCRequest(
211+
jsonrpc="2.0",
212+
id=1,
213+
method="initialize",
214+
params=InitializeRequestParams(
215+
protocol_version=LATEST_PROTOCOL_VERSION,
216+
capabilities=ClientCapabilities(),
217+
client_info=Implementation(name="test", version="1.0"),
218+
).model_dump(by_alias=True, mode="json", exclude_none=True),
219+
)
220+
initialized = JSONRPCNotification(jsonrpc="2.0", method="notifications/initialized")
221+
222+
with anyio.fail_after(5):
223+
async with anyio.create_task_group() as tg, to_server, server_read, server_write, from_server:
224+
tg.start_soon(run_server)
225+
226+
await to_server.send(SessionMessage(init_req))
227+
await from_server.receive() # init response
228+
await to_server.send(SessionMessage(initialized))
229+
230+
# Two tool calls → two handlers → two _response_streams entries.
231+
for rid in (2, 3):
232+
call_req = JSONRPCRequest(
233+
jsonrpc="2.0",
234+
id=rid,
235+
method="tools/call",
236+
params=CallToolRequestParams(name="t", arguments={}).model_dump(by_alias=True, mode="json"),
237+
)
238+
await to_server.send(SessionMessage(call_req))
239+
240+
await both_started.wait()
241+
# Drain the two roots/list requests so send_request's _write_stream.send()
242+
# completes and both handlers are parked at response_stream_reader.receive().
243+
await from_server.receive()
244+
await from_server.receive()
245+
246+
await to_server.aclose()
247+
248+
# Without the fixes: RuntimeError (dict mutation) or ClosedResourceError
249+
# (respond after write-stream close) escapes run_server and this hangs.
250+
await server_run_returned.wait()

0 commit comments

Comments
 (0)