44from servc .svc .com .bus import BusComponent , OnConsuming
55from servc .svc .com .cache import CacheComponent
66from servc .svc .config import Config
7- from servc .svc .io .input import EventPayload , InputType
7+ from servc .svc .io .input import EventPayload , InputPayload , InputType , ArgumentArtifact
88from servc .svc .io .output import StatusCode
99from servc .svc .io .response import getAnswerArtifact , getErrorArtifact
10+ from servc .svc .client .send import sendMessage
11+ from servc .svc .idgen .simple import simple as idGenerator
1012
1113RESOLVER = Callable [
1214 [str , BusComponent , CacheComponent , Any , List [Middleware ]],
@@ -46,6 +48,8 @@ class WorkerComponent(Middleware):
4648
4749 _bindToEventExchange : bool
4850
51+ _busClass : type [BusComponent ]
52+
4953 def __init__ (
5054 self ,
5155 route : str ,
@@ -54,6 +58,7 @@ def __init__(
5458 eventResolvers : RESOLVER_MAPPING ,
5559 onConsuming : OnConsuming ,
5660 bus : BusComponent ,
61+ busClass : type [BusComponent ],
5762 cache : CacheComponent ,
5863 config : Config ,
5964 otherComponents : List [Middleware ] = [],
@@ -65,6 +70,7 @@ def __init__(
6570 self ._eventResolvers = eventResolvers
6671 self ._onConsuming = onConsuming
6772 self ._bus = bus
73+ self ._busClass = busClass
6874 self ._cache = cache
6975 self ._config = config
7076 self ._bindToEventExchange = (
@@ -116,8 +122,38 @@ def emitEvent(self, eventName: str, details: Any):
116122
117123 self ._bus .publishMessage (self ._route , eventMessage )
118124
125+ def processPostHooks (self , bus : BusComponent , message : InputPayload , artifact : ArgumentArtifact ):
126+ # print(artifact)
127+ if "hooks" in artifact and "on_complete" in artifact ["hooks" ]:
128+ for hook in artifact ["hooks" ]["on_complete" ]:
129+ if hook ["type" ] == "sendmessage" :
130+ try :
131+ payload : InputPayload = {
132+ "id" : "" ,
133+ "type" : InputType .INPUT .value ,
134+ "route" : hook ["route" ],
135+ "force" : message ["force" ] if "force" in message else False ,
136+ "argumentId" : "" ,
137+ "argument" : {
138+ "method" : hook ["method" ],
139+ "inputs" : {
140+ "id" : message ["id" ],
141+ "method" : artifact ["method" ],
142+ "inputs" : artifact ["inputs" ]
143+ },
144+ }
145+ }
146+ sendMessage (payload , bus , self ._cache , idGenerator )
147+ except Exception as e :
148+ print ("Unable to process post hook" , flush = True )
149+ print (e , flush = True )
150+
119151 def inputProcessor (self , message : Any ) -> StatusCode :
120- bus = self ._bus
152+ bus = self ._busClass (
153+ self ._config .get ("conf.bus.url" ),
154+ self ._config .get ("conf.bus.routemap" ),
155+ self ._config .get ("conf.bus.prefix" ),
156+ )
121157 cache = self ._cache
122158
123159 if "type" not in message or "route" not in message :
@@ -188,13 +224,17 @@ def inputProcessor(self, message: Any) -> StatusCode:
188224 artifact ["inputs" ],
189225 self ._children ,
190226 )
191- cache .setKey (message ["id" ], getAnswerArtifact (message ["id" ], response ))
227+ cache .setKey (message ["id" ], getAnswerArtifact (
228+ message ["id" ], response ))
229+ self .processPostHooks (bus , message , artifact )
192230 return StatusCode .OK
193231 except Exception as e :
194232 cache .setKey (
195233 message ["id" ],
196- getErrorArtifact (message ["id" ], str (e ), StatusCode .SERVER_ERROR ),
234+ getErrorArtifact (message ["id" ], str (
235+ e ), StatusCode .SERVER_ERROR ),
197236 )
237+ self .processPostHooks (bus , message , artifact )
198238 return StatusCode .SERVER_ERROR
199239
200240 cache .setKey (
0 commit comments