@@ -36,6 +36,10 @@ class ConsumerComponent(ServiceComponent):
3636
3737 _onConsuming : OnConsuming
3838
39+ _busClass : type [BusComponent ]
40+
41+ _busArgs : List [Any ]
42+
3943 def __init__ (
4044 self ,
4145 route : str ,
@@ -46,6 +50,8 @@ def __init__(
4650 onConsuming : OnConsuming ,
4751 bus : BusComponent ,
4852 cache : CacheComponent ,
53+ busClass : type [BusComponent ],
54+ busArgs : List [Any ],
4955 otherComponents : List [ServiceComponent ] = [],
5056 ):
5157 super ().__init__ ()
@@ -57,6 +63,8 @@ def __init__(
5763 self ._onConsuming = onConsuming
5864 self ._bus = bus
5965 self ._cache = cache
66+ self ._busClass = busClass
67+ self ._busArgs = busArgs
6068
6169 self ._resolvers ["healthz" ] = lambda * args : self .healthz (self , * args )
6270
@@ -98,7 +106,7 @@ def healthz(
98106 return StatusCode .OK
99107
100108 def emitEvent (self , eventName : str , details : Any ):
101- bus = self ._bus
109+ bus = self ._busClass ( * self . _busArgs )
102110 eventMessage : InputPayload = {
103111 "type" : InputType .EVENT .value ,
104112 "event" : eventName ,
@@ -108,9 +116,10 @@ def emitEvent(self, eventName: str, details: Any):
108116 }
109117
110118 bus .publishMessage (self ._route , eventMessage , self ._emitFunction )
119+ bus .close ()
111120
112121 def inputProcessor (self , message : Any ) -> StatusCode :
113- bus = self ._bus
122+ bus = self ._busClass ( * self . _busArgs )
114123 cache = self ._cache
115124
116125 if "type" not in message or "route" not in message :
0 commit comments