Skip to content

Commit b3f0cb7

Browse files
fix: raise TimeoutError on ZMQ retry exhaustion instead of returning None (#393)
- recv_json_with_retry() now raises TimeoutError after 5 failed attempts instead of returning None - send_json_with_retry() now raises TimeoutError after 5 failed attempts instead of silently returning None - read() catches TimeoutError explicitly and returns default_return_val - write() catches TimeoutError explicitly and logs error without crashing - Added new test class TestZMQRetryExhaustion in test_concore.py with 4 tests - Added new test class TestZMQRetryExhaustion in test_concoredocker.py with 2 tests Closes #393
1 parent 5d46e60 commit b3f0cb7

3 files changed

Lines changed: 149 additions & 4 deletions

File tree

concore_base.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,7 @@ def send_json_with_retry(self, message):
4545
except zmq.Again:
4646
logger.warning(f"Send timeout (attempt {attempt + 1}/5)")
4747
time.sleep(0.5)
48-
logger.error("Failed to send after retries.")
49-
return
48+
raise TimeoutError(f"ZMQ send failed after 5 retries on {self.address}")
5049

5150
def recv_json_with_retry(self):
5251
"""Receive JSON message with retries if timeout occurs."""
@@ -56,8 +55,7 @@ def recv_json_with_retry(self):
5655
except zmq.Again:
5756
logger.warning(f"Receive timeout (attempt {attempt + 1}/5)")
5857
time.sleep(0.5)
59-
logger.error("Failed to receive after retries.")
60-
return None
58+
raise TimeoutError(f"ZMQ recv failed after 5 retries on {self.address}")
6159

6260

6361
def init_zmq_port(mod, port_name, port_type, address, socket_type_str):
@@ -221,6 +219,9 @@ def read(mod, port_identifier, name, initstr_val):
221219
mod.simtime = max(mod.simtime, first_element)
222220
return message[1:]
223221
return message
222+
except TimeoutError as e:
223+
logger.error(f"ZMQ recv timeout on port {port_identifier} (name: {name}): {e}. Returning default.")
224+
return default_return_val
224225
except zmq.error.ZMQError as e:
225226
logger.error(f"ZMQ read error on port {port_identifier} (name: {name}): {e}. Returning default.")
226227
return default_return_val
@@ -304,6 +305,8 @@ def write(mod, port_identifier, name, val, delta=0):
304305
# Mutation breaks cross-language determinism (see issue #385).
305306
else:
306307
zmq_p.send_json_with_retry(zmq_val)
308+
except TimeoutError as e:
309+
logger.error(f"ZMQ send timeout on port {port_identifier} (name: {name}): {e}")
307310
except zmq.error.ZMQError as e:
308311
logger.error(f"ZMQ write error on port {port_identifier} (name: {name}): {e}")
309312
except Exception as e:

tests/test_concore.py

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,3 +419,91 @@ def test_write_timestamp_matches_cpp_semantics(self, temp_dir):
419419
"After 3 writes with delta=1 simtime must remain 0 "
420420
"(matching C++/MATLAB/Verilog); got %s" % concore.simtime
421421
)
422+
423+
424+
# ===================================================================
425+
# ZMQ Retry Exhaustion Tests (Issue #393)
426+
# ===================================================================
427+
428+
class TestZMQRetryExhaustion:
429+
"""Tests for issue #393 — TimeoutError on retry exhaustion."""
430+
431+
@pytest.fixture(autouse=True)
432+
def reset_zmq_ports(self):
433+
import concore
434+
original_ports = concore.zmq_ports.copy()
435+
yield
436+
concore.zmq_ports.clear()
437+
concore.zmq_ports.update(original_ports)
438+
439+
@pytest.fixture(autouse=True)
440+
def reset_simtime(self):
441+
import concore
442+
old_simtime = concore.simtime
443+
yield
444+
concore.simtime = old_simtime
445+
446+
def test_recv_json_with_retry_raises_timeout_error(self):
447+
"""recv_json_with_retry must raise TimeoutError after 5 failed attempts."""
448+
from concore import ZeroMQPort
449+
from unittest.mock import MagicMock, patch
450+
import zmq
451+
452+
with patch.object(ZeroMQPort, '__init__', lambda self, *a, **kw: None):
453+
port = ZeroMQPort.__new__(ZeroMQPort)
454+
port.socket = MagicMock()
455+
port.socket.recv_json.side_effect = zmq.Again()
456+
port.address = "tcp://test:5555"
457+
458+
with pytest.raises(TimeoutError, match="ZMQ recv failed after 5 retries"):
459+
port.recv_json_with_retry()
460+
461+
assert port.socket.recv_json.call_count == 5
462+
463+
def test_send_json_with_retry_raises_timeout_error(self):
464+
"""send_json_with_retry must raise TimeoutError after 5 failed attempts."""
465+
from concore import ZeroMQPort
466+
from unittest.mock import MagicMock, patch
467+
import zmq
468+
469+
with patch.object(ZeroMQPort, '__init__', lambda self, *a, **kw: None):
470+
port = ZeroMQPort.__new__(ZeroMQPort)
471+
port.socket = MagicMock()
472+
port.socket.send_json.side_effect = zmq.Again()
473+
port.address = "tcp://test:5555"
474+
475+
with pytest.raises(TimeoutError, match="ZMQ send failed after 5 retries"):
476+
port.send_json_with_retry({"test": "data"})
477+
478+
assert port.socket.send_json.call_count == 5
479+
480+
def test_read_returns_default_on_zmq_timeout(self):
481+
"""read() must return default_return_val when recv exhausts retries, not None."""
482+
import concore
483+
484+
class MockZMQPort:
485+
def recv_json_with_retry(self):
486+
raise TimeoutError("ZMQ recv failed after 5 retries on tcp://test:5555")
487+
488+
concore.zmq_ports["test_timeout_port"] = MockZMQPort()
489+
concore.simtime = 0
490+
491+
result = concore.read("test_timeout_port", "test_name", "[1.0, 2.0]")
492+
493+
assert result == [1.0, 2.0], (
494+
"read() must return default_return_val on TimeoutError, got %s" % result
495+
)
496+
497+
def test_write_does_not_crash_on_zmq_send_timeout(self):
498+
"""write() must handle TimeoutError from send gracefully."""
499+
import concore
500+
501+
class MockZMQPort:
502+
def send_json_with_retry(self, message):
503+
raise TimeoutError("ZMQ send failed after 5 retries on tcp://test:5555")
504+
505+
concore.zmq_ports["test_timeout_port"] = MockZMQPort()
506+
concore.simtime = 0
507+
508+
# Should not raise — just log the error
509+
concore.write("test_timeout_port", "test_name", [1.0, 2.0])

tests/test_concoredocker.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,3 +235,57 @@ def recv_json_with_retry(self):
235235
result = concoredocker.read("roundtrip", "data", "[]")
236236

237237
assert result == original
238+
239+
240+
# ===================================================================
241+
# ZMQ Retry Exhaustion Tests (Issue #393)
242+
# ===================================================================
243+
244+
class TestZMQRetryExhaustion:
245+
"""Tests for issue #393 — TimeoutError on retry exhaustion via concoredocker."""
246+
247+
@pytest.fixture(autouse=True)
248+
def reset_zmq_ports(self):
249+
import concoredocker
250+
original_ports = concoredocker.zmq_ports.copy()
251+
yield
252+
concoredocker.zmq_ports.clear()
253+
concoredocker.zmq_ports.update(original_ports)
254+
255+
@pytest.fixture(autouse=True)
256+
def reset_simtime(self):
257+
import concoredocker
258+
old_simtime = concoredocker.simtime
259+
yield
260+
concoredocker.simtime = old_simtime
261+
262+
def test_read_returns_default_on_zmq_timeout(self):
263+
"""read() must return default_return_val when recv exhausts retries, not None."""
264+
import concoredocker
265+
266+
class MockZMQPort:
267+
def recv_json_with_retry(self):
268+
raise TimeoutError("ZMQ recv failed after 5 retries on tcp://test:5555")
269+
270+
concoredocker.zmq_ports["test_timeout_port"] = MockZMQPort()
271+
concoredocker.simtime = 0
272+
273+
result = concoredocker.read("test_timeout_port", "test_name", "[1.0, 2.0]")
274+
275+
assert result == [1.0, 2.0], (
276+
"read() must return default_return_val on TimeoutError, got %s" % result
277+
)
278+
279+
def test_write_does_not_crash_on_zmq_send_timeout(self):
280+
"""write() must handle TimeoutError from send gracefully."""
281+
import concoredocker
282+
283+
class MockZMQPort:
284+
def send_json_with_retry(self, message):
285+
raise TimeoutError("ZMQ send failed after 5 retries on tcp://test:5555")
286+
287+
concoredocker.zmq_ports["test_timeout_port"] = MockZMQPort()
288+
concoredocker.simtime = 0
289+
290+
# Should not raise — just log the error
291+
concoredocker.write("test_timeout_port", "test_name", [1.0, 2.0])

0 commit comments

Comments
 (0)