This repository was archived by the owner on Oct 20, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbroker.hpp
More file actions
111 lines (80 loc) · 2.02 KB
/
broker.hpp
File metadata and controls
111 lines (80 loc) · 2.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#ifndef SERVICE_QUEUE_BROKER_H
#define SERVICE_QUEUE_BROKER_H
#include "zmq.hpp"
#include <vector>
#include <mutex>
#include <condition_variable>
using namespace std;
typedef struct
{
string name;
time_t heartbeatSent;
time_t lastHeartbitRecieved;
} worker_t;
class broker
{
private:
zmq::context_t *ctx;
zmq::socket_t *input;
zmq::socket_t *output;
zmq::socket_t *service;
string inputDSN;
string outputDSN;
string serviceDSN;
vector<worker_t> workers;
int currentWorkerIndex;
mutex writeLock;
mutex workersLock;
condition_variable waitForWorkers;
bool connected;
bool interrupted;
void connect();
void registerWorker(const string &id);
void removeWorker(const string &id);
bool getNextWorker(string &workerName);
void shutdownAllWorkers();
void dispatchService();
void send(const string &data);
void sendMore(const string &data);
void send(const string &data, bool more);
void send(const zmq::message_t &msg);
void send(const zmq::message_t &msg, bool more);
void sendToWorker(const string &id, const string &data);
string getAction(const string &data);
string getMessageData(zmq::message_t &message);
static broker instance;
void heartbeat();
void workerPong(const string &id);
public:
broker();
void run();
void setInputDSN(string inputDSN)
{
broker::inputDSN = inputDSN;
}
void setOutputDSN(string outputDSN)
{
broker::outputDSN = outputDSN;
}
void setServiceDSN(string serviceDSN)
{
broker::serviceDSN = serviceDSN;
}
virtual ~broker()
{
if (connected)
{
input->close();
output->close();
service->close();
ctx->close();
delete input;
delete output;
delete service;
delete ctx;
}
}
static broker * getInstance();
static void signalHandler(int signal);
};
#endif //SERVICE_QUEUE_BROKER_H