-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathEventMsgWorker.py
More file actions
126 lines (89 loc) · 3.32 KB
/
EventMsgWorker.py
File metadata and controls
126 lines (89 loc) · 3.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#! /usr/bin/python
# ArtCM OMQS Event Worker Class
#
# (C) ArtCM 2015
#
# Date: 2015.9.22
__author__ = 'dennis'
from multiprocessing import Queue
import threading
import os, signal, time, logging, platform
import pika
from pika.exceptions import *
from ConfigReader import ConfigReader
from MsgWorker import MsgWorker
from AsyncPublisher import OMQSAsyncPublisher
from OMQSExceptions import *
from OMQSLogManager import OMQSLogManager
class EventMsgWorker(MsgWorker):
EVENT_KEY = 'omqs.key.event'
def __init__(self, q, name='EventMsgWorker'):
super(EventMsgWorker, self).__init__(q, name)
self._MQPublisher = None
self._MQthread = None
manager = OMQSLogManager(name=__name__, file_name=name)
self._logger = manager.logger
def worker_will_run(self):
try:
if self._MQPublisher:
self._MQPublisher.stop()
self._MQPublisher = OMQSAsyncPublisher(exchange_name='omqs.exchange.event',
queue_name='omqs.queue.event',
exchange_durable=True,
queue_durable=True,
confirm=False)
if self._MQPublisher:
self._MQthread = threading.Thread(target=self._MQPublisher.run, name='PublishingThread')
self._MQthread.start()
while not self._MQPublisher.is_ready():
time.sleep(0.1)
self._logger.info('[%s][%d] ready to run!', self.name, self.pid)
else:
raise InvalidPublisherError
except Exception, e:
self._logger.error('[%s][%d] error: %s. Stopping the publisher', self.name, self.pid, str(e))
if self._MQPublisher:
self._MQPublisher.stop()
self._MQPublisher = None
def msg_did_receive(self, msg):
#print 'msg = ', msg
try:
if self._MQPublisher:
index = msg.find(':')
key = msg[0:index]
if key == '' or not key:
key = EventMsgWorker.EVENT_KEY
body = msg[index+1:]
self._MQPublisher.publish_message(body, key)
except Exception, e:
self._logger.error('[%s][%d] error when publishing: %s', self.name, self.pid, str(e))
# backup the log message if there is anything wrong of the MQ connection
backup_file = open('./log/backup.evt', 'a')
backup_file.write(msg+'\n')
backup_file.close()
# TODO here:
# Do particular task for special work, e.g: sending txt msg or email for login
def worker_will_stop(self):
if self._MQPublisher:
print 'worker will quit!!'
self._MQPublisher.stop()
self._MQthread.join()
self._MQthread = None
def main():
q = Queue()
worker = EventMsgWorker(q)
worker.start()
t1 = time.time()
for i in range(1, 100):
msg = 'omqs.key.event:%d' % i
q.put(msg)
time.sleep(0.1)
t2 = time.time()
dt =t2 - t1
print 'total time: %f' % dt
time.sleep(3)
worker.terminate()
worker.join()
# Just for Test
if __name__ == '__main__':
main()