-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathLogMsgWorker.py
More file actions
118 lines (86 loc) · 3.15 KB
/
LogMsgWorker.py
File metadata and controls
118 lines (86 loc) · 3.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
#! /usr/bin/python
# ArtCM OMQS Log Worker Class
#
# (C) ArtCM 2015
#
# Date: 2015.9.22
__author__ = 'dennis'
from multiprocessing import Queue
import time, logging, os, sys
from ConfigReader import ConfigReader
from MsgWorker import MsgWorker
from SyncPublisher import OMQSSyncPublisher
from OMQSExceptions import *
from OMQSLogManager import OMQSLogManager
class LogMsgWorker(MsgWorker):
def __init__(self, q, name='LogMsgWorker'):
super(LogMsgWorker, self).__init__(q, name)
self._MQPublisher = None
self._logger = None
manager = OMQSLogManager(name=__name__, file_name=name)
self._logger = manager.logger
def worker_will_run(self):
try:
if self._MQPublisher:
self._MQPublisher.stop()
self._MQPublisher = OMQSSyncPublisher(exchange_name='omqs.exchange.log',
exchange_type='topic',
exchange_durable=True)
if self._MQPublisher:
self._MQPublisher.run()
else:
raise InvalidPublisherError
except Exception, e:
self._logger.error('[%s][%d] Failed to start the publisher: %s', self.name, self.pid, str(e))
if self._MQPublisher:
self._MQPublisher.stop()
self._MQPublisher = None
def msg_did_receive(self, msg):
if self._MQPublisher:
try:
# Each log msg would be as following format: '[type]:[body]'
# for example:
# fatal: fatal msg
# error: error msg
# warning: warning msg
# info: info msg
# debug: debug msg
#
#strs = msg.split(':')
index = msg.find(':')
key = msg[0:index]
body = msg[index+1:]
self._MQPublisher.publish_message(key, body)
except Exception, e:
self._logger.error('[%s][%d] Failed to publish the publisher: %s', self.name, self.pid, str(e))
# backup the log message if there is anything wrong of the MQ connection
backup_file = open('./log/backup.log', 'a')
backup_file.write(msg+'\n')
backup_file.close()
def worker_will_stop(self):
if self._MQPublisher:
try:
self._MQPublisher.stop()
except Exception, e:
self._MQPublisher = None
self._logger.warning('[%s][%d] Error when stopping publisher: %s', self.name, self.pid, str(e))
def main():
q = Queue()
worker = LogMsgWorker(q)
worker.start()
t1 = time.time()
for i in range(1, 100):
q.put('info:this is test log %d' % i)
q.put('debug:this is test log %d' % i)
q.put('error:this is test error log %d' % i)
q.put('fatal:this is test fatal log %d' % i)
time.sleep(0.1)
t2 = time.time()
dt = t2 - t1
print 'total time: %f' % dt
time.sleep(3)
worker.terminate()
worker.join()
# Just for Test
if __name__ == '__main__':
main()