5757from logging import Logger
5858from queue import Queue
5959
60+ import stomp as _stomp_module
6061from stomp import Connection12 as Connection
6162from stomp .exception import NotConnectedException
6263from time import sleep
6566
6667_log : Logger = logging .getLogger (inspect .getmodulename (__file__ ))
6768
69+ # stomp.py 8.x changed listener callbacks from (headers, body) to (frame)
70+ _stomp_major = int (getattr (_stomp_module , '__version__' , (0 ,))[0 ]) if isinstance (getattr (_stomp_module , '__version__' , None ), tuple ) else 0
71+ try :
72+ from importlib .metadata import version as _pkg_version
73+ _stomp_major = int (_pkg_version ('stomp-py' ).split ('.' )[0 ])
74+ except Exception :
75+ pass
76+ _STOMP_V8 = _stomp_major >= 8
77+
6878
6979class GRIDAPPSD_ENV_ENUM (Enum ):
7080 GRIDAPPSD_USER = "GRIDAPPSD_USER"
@@ -182,7 +192,12 @@ def __init__(self, topic, result_format):
182192 self ._topic = topic
183193 self .result_format = result_format
184194
185- def on_message (self , header , message ):
195+ def on_message (self , * args ):
196+ if _STOMP_V8 :
197+ frame = args [0 ]
198+ header , message = frame .headers , frame .body
199+ else :
200+ header , message = args [0 ], args [1 ]
186201 _log .debug ("Internal on message is: {} {}" .format (header , message ))
187202 try :
188203 if self .result_format == "JSON" :
@@ -195,13 +210,15 @@ def on_message(self, header, message):
195210 except ValueError :
196211 self .response = dict (error = "Invalid json returned" , header = header , message = message )
197212
198- def on_error (self , headers , message ):
213+ def on_error (self , * args ):
214+ if _STOMP_V8 :
215+ frame = args [0 ]
216+ headers , message = frame .headers , frame .body
217+ else :
218+ headers , message = args [0 ], args [1 ]
199219 _log .error ("ERR: {}" .format (headers ))
200220 _log .error ("OUR ERROR: {}" .format (message ))
201221
202- def on_disconnect (self , header , message ):
203- _log .debug ("Disconnected" )
204-
205222 listener = ResponseListener (reply_to , self .result_format )
206223 self .subscribe (reply_to , listener )
207224
@@ -314,18 +331,25 @@ def __init__(self):
314331 def get_token (self ):
315332 return self .__token
316333
317- def on_message (self , header , message ):
334+ def on_message (self , * args ):
335+ if _STOMP_V8 :
336+ frame = args [0 ]
337+ header , message = frame .headers , frame .body
338+ else :
339+ header , message = args [0 ], args [1 ]
318340 _log .debug ("Internal on message is: {} {}" .format (header , message ))
319341
320342 self .__token = str (message )
321343
322- def on_error (self , headers , message ):
344+ def on_error (self , * args ):
345+ if _STOMP_V8 :
346+ frame = args [0 ]
347+ headers , message = frame .headers , frame .body
348+ else :
349+ headers , message = args [0 ], args [1 ]
323350 _log .error ("ERR: {}" .format (headers ))
324351 _log .error ("OUR ERROR: {}" .format (message ))
325352
326- def on_disconnect (self , header , message ):
327- _log .debug ("Disconnected" )
328-
329353 # receive token and set token variable
330354 # set callback
331355 listener = TokenResponseListener ()
@@ -399,15 +423,25 @@ def remove_callback(self, topic, callback):
399423 except ValueError :
400424 pass
401425
402- def on_message (self , headers , message ):
426+ def on_message (self , * args ):
427+ if _STOMP_V8 :
428+ frame = args [0 ]
429+ headers , message = frame .headers , frame .body
430+ else :
431+ headers , message = args [0 ], args [1 ]
403432 destination = headers ["destination" ]
404433 # _log.debug("Topic map keys are: {keys}".format(keys=self._topics_callback_map.keys()))
405434 if destination in self ._topics_callback_map :
406435 self ._queue_callerback .put ((self ._topics_callback_map [destination ], headers , message ))
407436 else :
408437 _log .error ("INVALID DESTINATION {destination}" .format (destination = destination ))
409438
410- def on_error (self , header , message ):
439+ def on_error (self , * args ):
440+ if _STOMP_V8 :
441+ frame = args [0 ]
442+ header , message = frame .headers , frame .body
443+ else :
444+ header , message = args [0 ], args [1 ]
411445 _log .error ("Error in callback router" )
412446 _log .error (header )
413447 _log .error (message )
0 commit comments