From 056437dfb09f6bc9e37a54d1f764fe64aeb6376e Mon Sep 17 00:00:00 2001 From: Gabriel C Date: Thu, 19 Mar 2026 00:03:52 +0100 Subject: [PATCH] fix: swallow shutdown RuntimeError in done_callback Catch RuntimeError related to interpreter shutdown in done_callback so that futures completing during shutdown don't raise. Non-shutdown RuntimeError is still re-raised. Closes #9211 --- distributed/client.py | 6 +++++- distributed/tests/test_client.py | 21 +++++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/distributed/client.py b/distributed/client.py index b2e227b9604..c4abed3e29b 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -755,7 +755,11 @@ async def done_callback(future, callback): """ while future.status == "pending": await future._state.wait() - callback(future) + try: + callback(future) + except RuntimeError as e: + if "shutdown" not in str(e) and "interpreter" not in str(e): + raise class AllExit(Exception): diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index fbb08076613..3a1f08cfead 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -8435,3 +8435,24 @@ def reducer(futs, *, offset=0, **kwargs): result = await future.result() assert result == 30 if not offset else 31 + + +@gen_cluster(client=True) +async def test_done_callback_shutdown_runtime_error(c, s, a, b): + from distributed.client import done_callback + + future = c.submit(inc, 1) + await future + + # Shutdown-related RuntimeError is swallowed + def cb_shutdown(fut): + raise RuntimeError("cannot schedule new futures after interpreter shutdown") + + await done_callback(future, cb_shutdown) + + # Unrelated RuntimeError is re-raised + def cb_other(fut): + raise RuntimeError("something else") + + with pytest.raises(RuntimeError, match="something else"): + await done_callback(future, cb_other)