Skip to content

Commit b6cfcaa

Browse files
authored
Merge pull request #475 from avinxshKD/feat/cpp-zmq-transport
feat: add ZMQ transport to concore.hpp/concore_base.hpp
2 parents d302863 + d8f3e6c commit b6cfcaa

File tree

2 files changed

+231
-0
lines changed

2 files changed

+231
-0
lines changed

concore.hpp

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ class Concore{
4949
int communication_iport = 0; // iport refers to input port
5050
int communication_oport = 0; // oport refers to input port
5151

52+
#ifdef CONCORE_USE_ZMQ
53+
map<string, concore_base::ZeroMQPort*> zmq_ports;
54+
#endif
55+
5256
public:
5357
double delay = 1;
5458
int retrycount = 0;
@@ -107,6 +111,11 @@ class Concore{
107111
*/
108112
~Concore()
109113
{
114+
#ifdef CONCORE_USE_ZMQ
115+
for (auto& kv : zmq_ports)
116+
delete kv.second;
117+
zmq_ports.clear();
118+
#endif
110119
#ifdef __linux__
111120
// Detach the shared memory segment from the process
112121
if (communication_oport == 1 && sharedData_create != nullptr) {
@@ -621,6 +630,117 @@ class Concore{
621630
}
622631
}
623632

633+
#ifdef CONCORE_USE_ZMQ
634+
/**
635+
* @brief Registers a ZMQ port for use with read()/write().
636+
* @param port_name The ZMQ port name.
637+
* @param port_type "bind" or "connect".
638+
* @param address The ZMQ address.
639+
* @param socket_type_str The socket type string.
640+
*/
641+
void init_zmq_port(string port_name, string port_type, string address, string socket_type_str) {
642+
if (zmq_ports.count(port_name)) return;
643+
int sock_type = concore_base::zmq_socket_type_from_string(socket_type_str);
644+
if (sock_type == -1) {
645+
cerr << "init_zmq_port: unknown socket type '" << socket_type_str << "'" << endl;
646+
return;
647+
}
648+
zmq_ports[port_name] = new concore_base::ZeroMQPort(port_type, address, sock_type);
649+
}
650+
651+
/**
652+
* @brief Reads data from a ZMQ port. Strips simtime prefix, updates simtime.
653+
* @param port_name The ZMQ port name.
654+
* @param name The name of the file.
655+
* @param initstr The initial string.
656+
* @return a vector of double values
657+
*/
658+
vector<double> read_ZMQ(string port_name, string name, string initstr) {
659+
auto it = zmq_ports.find(port_name);
660+
if (it == zmq_ports.end()) {
661+
cerr << "read_ZMQ: port '" << port_name << "' not initialized" << endl;
662+
return parser(initstr);
663+
}
664+
vector<double> inval = it->second->recv_with_retry();
665+
if (inval.empty())
666+
inval = parser(initstr);
667+
if (inval.empty()) return inval;
668+
simtime = simtime > inval[0] ? simtime : inval[0];
669+
s += port_name;
670+
inval.erase(inval.begin());
671+
return inval;
672+
}
673+
674+
/**
675+
* @brief Writes a vector of double values to a ZMQ port. Prepends simtime+delta.
676+
* @param port_name The ZMQ port name.
677+
* @param name The name of the file.
678+
* @param val The vector of double values to write.
679+
* @param delta The delta value (default: 0).
680+
*/
681+
void write_ZMQ(string port_name, string name, vector<double> val, int delta=0) {
682+
auto it = zmq_ports.find(port_name);
683+
if (it == zmq_ports.end()) {
684+
cerr << "write_ZMQ: port '" << port_name << "' not initialized" << endl;
685+
return;
686+
}
687+
val.insert(val.begin(), simtime + delta);
688+
it->second->send_with_retry(val);
689+
// simtime must not be mutated here (issue #385).
690+
}
691+
692+
/**
693+
* @brief Writes a string to a ZMQ port.
694+
* @param port_name The ZMQ port name.
695+
* @param name The name of the file.
696+
* @param val The string to write.
697+
* @param delta The delta value (default: 0).
698+
*/
699+
void write_ZMQ(string port_name, string name, string val, int delta=0) {
700+
auto it = zmq_ports.find(port_name);
701+
if (it == zmq_ports.end()) {
702+
cerr << "write_ZMQ: port '" << port_name << "' not initialized" << endl;
703+
return;
704+
}
705+
chrono::milliseconds timespan((int)(2000*delay));
706+
this_thread::sleep_for(timespan);
707+
it->second->send_string_with_retry(val);
708+
}
709+
710+
/**
711+
* @brief deviate the read to ZMQ communication protocol when port identifier is a string key.
712+
* @param port_name The ZMQ port name.
713+
* @param name The name of the file.
714+
* @param initstr The initial string.
715+
* @return
716+
*/
717+
vector<double> read(string port_name, string name, string initstr) {
718+
return read_ZMQ(port_name, name, initstr);
719+
}
720+
721+
/**
722+
* @brief deviate the write to ZMQ communication protocol when port identifier is a string key.
723+
* @param port_name The ZMQ port name.
724+
* @param name The name of the file.
725+
* @param val The vector of double values to write.
726+
* @param delta The delta value (default: 0).
727+
*/
728+
void write(string port_name, string name, vector<double> val, int delta=0) {
729+
return write_ZMQ(port_name, name, val, delta);
730+
}
731+
732+
/**
733+
* @brief deviate the write to ZMQ communication protocol when port identifier is a string key.
734+
* @param port_name The ZMQ port name.
735+
* @param name The name of the file.
736+
* @param val The string to write.
737+
* @param delta The delta value (default: 0).
738+
*/
739+
void write(string port_name, string name, string val, int delta=0) {
740+
return write_ZMQ(port_name, name, val, delta);
741+
}
742+
#endif // CONCORE_USE_ZMQ
743+
624744
/**
625745
* @brief Strips leading and trailing whitespace from a string.
626746
* @param str The input string.

concore_base.hpp

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,117 @@ inline std::string tryparam(
178178
return (it != params.end()) ? it->second : defaultValue;
179179
}
180180

181+
182+
// ===================================================================
183+
// ZeroMQ Transport (opt-in: compile with -DCONCORE_USE_ZMQ)
184+
// ===================================================================
185+
#ifdef CONCORE_USE_ZMQ
186+
#include <zmq.hpp>
187+
188+
/**
189+
* ZMQ socket wrapper with bind/connect, timeouts, and retry.
190+
*/
191+
class ZeroMQPort {
192+
public:
193+
zmq::context_t context;
194+
zmq::socket_t socket;
195+
std::string port_type;
196+
std::string address;
197+
198+
ZeroMQPort(const std::string& port_type_, const std::string& address_, int socket_type)
199+
: context(1), socket(context, socket_type),
200+
port_type(port_type_), address(address_)
201+
{
202+
socket.setsockopt(ZMQ_RCVTIMEO, 2000);
203+
socket.setsockopt(ZMQ_SNDTIMEO, 2000);
204+
socket.setsockopt(ZMQ_LINGER, 0);
205+
206+
if (port_type == "bind")
207+
socket.bind(address);
208+
else
209+
socket.connect(address);
210+
}
211+
212+
ZeroMQPort(const ZeroMQPort&) = delete;
213+
ZeroMQPort& operator=(const ZeroMQPort&) = delete;
214+
215+
/**
216+
* Sends a vector<double> as "[v0, v1, ...]" with retry on timeout.
217+
*/
218+
void send_with_retry(const std::vector<double>& payload) {
219+
std::ostringstream ss;
220+
ss << "[";
221+
for (size_t i = 0; i < payload.size(); ++i) {
222+
if (i) ss << ", ";
223+
ss << payload[i];
224+
}
225+
ss << "]";
226+
std::string msg = ss.str();
227+
for (int attempt = 0; attempt < 5; ++attempt) {
228+
try {
229+
zmq::message_t zmsg(msg.begin(), msg.end());
230+
socket.send(zmsg, zmq::send_flags::none);
231+
return;
232+
} catch (const zmq::error_t&) {
233+
std::this_thread::sleep_for(std::chrono::milliseconds(500));
234+
}
235+
}
236+
std::cerr << "ZMQ send failed after retries." << std::endl;
237+
}
238+
239+
/**
240+
* Sends a raw string with retry on timeout.
241+
*/
242+
void send_string_with_retry(const std::string& msg) {
243+
for (int attempt = 0; attempt < 5; ++attempt) {
244+
try {
245+
zmq::message_t zmsg(msg.begin(), msg.end());
246+
socket.send(zmsg, zmq::send_flags::none);
247+
return;
248+
} catch (const zmq::error_t&) {
249+
std::this_thread::sleep_for(std::chrono::milliseconds(500));
250+
}
251+
}
252+
std::cerr << "ZMQ send failed after retries." << std::endl;
253+
}
254+
255+
/**
256+
* Receives and parses "[v0, v1, ...]" back to vector<double>.
257+
*/
258+
std::vector<double> recv_with_retry() {
259+
for (int attempt = 0; attempt < 5; ++attempt) {
260+
try {
261+
zmq::message_t zmsg;
262+
auto res = socket.recv(zmsg, zmq::recv_flags::none);
263+
if (res) {
264+
std::string data(static_cast<char*>(zmsg.data()), zmsg.size());
265+
return parselist_double(data);
266+
}
267+
} catch (const zmq::error_t&) {
268+
std::this_thread::sleep_for(std::chrono::milliseconds(500));
269+
}
270+
}
271+
std::cerr << "ZMQ recv failed after retries." << std::endl;
272+
return {};
273+
}
274+
};
275+
276+
/**
277+
* Maps socket type string ("REQ", "REP", etc.) to ZMQ constant.
278+
* Returns -1 on unknown type.
279+
*/
280+
inline int zmq_socket_type_from_string(const std::string& s) {
281+
if (s == "REQ") return ZMQ_REQ;
282+
if (s == "REP") return ZMQ_REP;
283+
if (s == "PUB") return ZMQ_PUB;
284+
if (s == "SUB") return ZMQ_SUB;
285+
if (s == "PUSH") return ZMQ_PUSH;
286+
if (s == "PULL") return ZMQ_PULL;
287+
if (s == "PAIR") return ZMQ_PAIR;
288+
return -1;
289+
}
290+
#endif // CONCORE_USE_ZMQ
291+
181292
} // namespace concore_base
182293

183294
#endif // CONCORE_BASE_HPP

0 commit comments

Comments
 (0)