Skip to content

Commit d302863

Browse files
authored
Merge pull request #473 from GREENRAT-K405/fix/redundant-context-instances
prevent redundant zeroMQ thread spawning by sharing a single context
2 parents 23a28d6 + 0081e83 commit d302863

1 file changed

Lines changed: 36 additions & 10 deletions

File tree

concore_base.py

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,29 @@
1111
# ===================================================================
1212
# ZeroMQ Communication Wrapper
1313
# ===================================================================
14+
# lazy-initialized shared ZMQ context for the entire process.
15+
# using None until first ZMQ port is created, so file-only workflows
16+
# never spawn ZMQ I/O threads at import time.
17+
_zmq_context = None
18+
19+
def _get_zmq_context():
20+
"""Return the process-level shared ZMQ context, creating it on first call."""
21+
global _zmq_context
22+
if _zmq_context is None or _zmq_context.closed:
23+
_zmq_context = zmq.Context()
24+
return _zmq_context
25+
1426
class ZeroMQPort:
15-
def __init__(self, port_type, address, zmq_socket_type):
27+
def __init__(self, port_type, address, zmq_socket_type, context=None):
1628
"""
1729
port_type: "bind" or "connect"
1830
address: ZeroMQ address (e.g., "tcp://*:5555")
1931
zmq_socket_type: zmq.REQ, zmq.REP, zmq.PUB, zmq.SUB etc.
32+
context: optional zmq.Context() for the process; defaults to the shared _zmq_context.
2033
"""
21-
self.context = zmq.Context()
22-
self.socket = self.context.socket(zmq_socket_type)
34+
if context is None:
35+
context = _get_zmq_context()
36+
self.socket = context.socket(zmq_socket_type)
2337
self.port_type = port_type # "bind" or "connect"
2438
self.address = address
2539

@@ -76,7 +90,7 @@ def init_zmq_port(mod, port_name, port_type, address, socket_type_str):
7690
try:
7791
# Map socket type string to actual ZMQ constant (e.g., zmq.REQ, zmq.REP)
7892
zmq_socket_type = getattr(zmq, socket_type_str.upper())
79-
mod.zmq_ports[port_name] = ZeroMQPort(port_type, address, zmq_socket_type)
93+
mod.zmq_ports[port_name] = ZeroMQPort(port_type, address, zmq_socket_type, _get_zmq_context())
8094
logger.info(f"Initialized ZMQ port: {port_name} ({socket_type_str}) on {address}")
8195
except AttributeError:
8296
logger.error(f"Error: Invalid ZMQ socket type string '{socket_type_str}'.")
@@ -86,23 +100,35 @@ def init_zmq_port(mod, port_name, port_type, address, socket_type_str):
86100
logger.error(f"An unexpected error occurred during ZMQ port initialization for {port_name}: {e}")
87101

88102
def terminate_zmq(mod):
89-
"""Clean up all ZMQ sockets and contexts before exit."""
103+
"""Clean up all ZMQ sockets, then terminate the shared context once."""
104+
global _zmq_context # declared first — used both in the early-return guard and reset below
90105
if mod._cleanup_in_progress:
91106
return # Already cleaning up, prevent reentrant calls
92-
93-
if not mod.zmq_ports:
94-
return # No ports to clean up
95-
107+
108+
if not mod.zmq_ports and (_zmq_context is None or _zmq_context.closed):
109+
return # Nothing to clean up: no ports and no active context
110+
96111
mod._cleanup_in_progress = True
97112
print("\nCleaning up ZMQ resources...")
113+
114+
# all sockets must be closed before context.term() is called.
98115
for port_name, port in mod.zmq_ports.items():
99116
try:
100117
port.socket.close()
101-
port.context.term()
102118
print(f"Closed ZMQ port: {port_name}")
103119
except Exception as e:
104120
logger.error(f"Error while terminating ZMQ port {port.address}: {e}")
105121
mod.zmq_ports.clear()
122+
123+
# terminate the single shared context exactly once, then reset so it
124+
# can be safely recreated if init_zmq_port is called again later.
125+
if _zmq_context is not None and not _zmq_context.closed:
126+
try:
127+
_zmq_context.term()
128+
except Exception as e:
129+
logger.error(f"Error while terminating shared ZMQ context: {e}")
130+
_zmq_context = None
131+
106132
mod._cleanup_in_progress = False
107133

108134
# --- ZeroMQ Integration End ---

0 commit comments

Comments
 (0)