11import json
2-
32import logging
3+ import multiprocessing as mp
4+ import os
45import time
6+ import traceback
57from ..config .config import TopicConfig
6- from ..resource_errors import ExceptionHandler
78from concurrent .futures import ThreadPoolExecutor
89from .abstract .topic_abstract import TopicAbstract
910from ..queue .models .queue_message import QueueMessage
1011from azure .servicebus import ServiceBusClient , ServiceBusMessage
1112from azure .servicebus import AutoLockRenewer
12- import concurrent .futures as cf
1313import threading
1414
1515
16- logging .basicConfig (format = '%(asctime)s %(levelname)-8s %(message)s' , datefmt = '%Y-%m-%d %H:%M:%S' )
1716logger = logging .getLogger ('AzureTopic' )
18- logger .setLevel (logging .INFO )
1917
2018
2119"""
@@ -49,11 +47,24 @@ def __init__(self, config: TopicConfig=None, topic_name=None, max_concurrent_mes
4947 self .topic_name = topic_name
5048 self .publisher = self .client .get_topic_sender (topic_name = topic_name )
5149 self .executor = ThreadPoolExecutor (max_workers = max_concurrent_messages )
50+ self .callback_execution_mode = self ._get_callback_execution_mode ()
51+ self .callback_process_start_method = self ._get_process_start_method ()
52+ self .process_context = self ._get_process_context ()
5253 self .internal_count = 0
53- self .lock_renewal = AutoLockRenewer (max_workers = max_concurrent_messages )
54- self .max_renewal_duration = 86400 # Renew the message upto 1 day
54+ self .max_renewal_duration = 86400 # Renew the message upto 1 day
55+ self .lock_renewal_margin = 60
56+ renewer_max_workers = max (max_concurrent_messages , 2 )
57+ self .lock_renewal = AutoLockRenewer (
58+ max_lock_renewal_duration = self .max_renewal_duration ,
59+ on_lock_renew_failure = self ._handle_lock_renew_failure ,
60+ max_workers = renewer_max_workers ,
61+ )
62+ # The SDK default renews only in the last 10 seconds of the lock window.
63+ # Start earlier so long-running jobs have more headroom for scheduler jitter.
64+ self .lock_renewal ._renew_period = min (self .lock_renewal_margin , self .max_renewal_duration )
5565 self .wait_time_for_message = 5
5666 self .thread_lock = threading .Lock ()
67+ self .pending_tasks = []
5768
5869
5970 def publish (self , data : QueueMessage ):
@@ -78,63 +89,269 @@ def subscribe(self, subscription: str, callback, max_receivable_messages=-1):
7889 self .receiver .local_received_messages = 0
7990 while True :
8091 try :
81- to_receive = (self .max_concurrent_messages - self .internal_count )
82- total_messages_to_recieve_more = max_receivable_messages - self .receiver .local_received_messages
83- if max_receivable_messages > 0 :
84- to_receive = min (to_receive , total_messages_to_recieve_more )
92+ self ._settle_completed_tasks ()
93+ to_receive = self ._get_receivable_count (max_receivable_messages = max_receivable_messages )
94+ if max_receivable_messages > 0 and self .receiver .local_received_messages >= max_receivable_messages :
95+ if len (self .pending_tasks ) == 0 :
96+ break
97+ self ._wait_for_pending_tasks (timeout = 0.5 )
98+ continue
8599 if to_receive > 0 :
86100 messages = self .receiver .receive_messages (max_message_count = to_receive , max_wait_time = self .wait_time_for_message )
87101 if not messages or len (messages ) == 0 :
102+ if len (self .pending_tasks ) > 0 :
103+ self ._wait_for_pending_tasks (timeout = 0.5 )
88104 continue
89105 self .receiver .local_received_messages += len (messages )
106+ with self .thread_lock :
107+ self .internal_count += len (messages )
90108 for message in messages :
91- self .lock_renewal .register (self .receiver , message , max_lock_renewal_duration = self .max_renewal_duration )
92- execution_task = self .executor .submit (self .internal_callback , message , callback )
93- execution_task .add_done_callback (lambda x : self .settle_message (x ))
94- if self .receiver .local_received_messages >= max_receivable_messages and max_receivable_messages > 0 : # Break if the messages are more than max_receivable_messages
95- break
109+ execution_task = self ._submit_processing_task (message , callback )
110+ self .lock_renewal .register (
111+ self .receiver ,
112+ message ,
113+ max_lock_renewal_duration = self .max_renewal_duration ,
114+ on_lock_renew_failure = self ._handle_lock_renew_failure ,
115+ )
116+ self .pending_tasks .append ((execution_task , message ))
96117 else :
97- time .sleep (self .wait_time_for_message )
118+ if len (self .pending_tasks ) > 0 :
119+ self ._wait_for_pending_tasks (timeout = 0.5 )
120+ else :
121+ time .sleep (self .wait_time_for_message )
98122 except Exception as e :
99123 logger .error (f'Error in receiving messages: { e } ' )
100124
101125
102- def internal_callback (self , message , callbackfn ):
126+ def internal_callback (self , message_payload , callbackfn ):
103127 """
104128 Internal callback function that processes a message and invokes the callback function.
105129 Args:
106- message (ServiceBusMessage ): The message to process.
130+ message_payload (str ): The message payload to process.
107131 callbackfn (function): The callback function to invoke.
108132 Returns:
109- ServiceBusMessage : The processed message .
133+ dict : The callback status payload .
110134 """
111135 try :
112- with self .thread_lock :
113- self .internal_count += 1 # thread safe.
114- queue_message = QueueMessage .data_from (str (message ))
136+ queue_message = QueueMessage .data_from (message_payload )
115137 callbackfn (queue_message )
116- return [ True ,message ]
138+ return { 'success' : True , 'error' : None }
117139 except Exception as e :
118- logger .error (f'Error in processing message: { e } ' )
119- return [False ,message ]
140+ return {
141+ 'success' : False ,
142+ 'error' : '' .join (traceback .format_exception (type (e ), e , e .__traceback__ )).strip (),
143+ }
120144
121145
122- def settle_message (self , x : cf .Future ):
146+ def settle_message (self , x ):
147+ return self ._settle_task (x )
148+
149+ def _submit_processing_task (self , message , callback ):
150+ message_payload = str (message )
151+ if self .callback_execution_mode == 'process' :
152+ try :
153+ return self ._submit_process_task (message_payload , callback )
154+ except Exception as exc :
155+ logger .warning (
156+ 'Falling back to thread execution for message %s because process start failed: %s' ,
157+ self ._get_message_id (message ),
158+ exc ,
159+ )
160+ return self ._submit_thread_task (message_payload , callback )
161+
162+ def _submit_thread_task (self , message_payload , callback ):
163+ future = self .executor .submit (self .internal_callback , message_payload , callback )
164+ return _FutureExecutionTask (future )
165+
166+ def _submit_process_task (self , message_payload , callback ):
167+ if self .process_context is None :
168+ raise RuntimeError ('Process execution mode is not available for this environment.' )
169+
170+ parent_connection , child_connection = self .process_context .Pipe (duplex = False )
171+ callback_process = self .process_context .Process (
172+ target = _run_callback_in_subprocess ,
173+ args = (message_payload , callback , child_connection ),
174+ )
175+ try :
176+ callback_process .start ()
177+ except Exception :
178+ parent_connection .close ()
179+ child_connection .close ()
180+ raise
181+ child_connection .close ()
182+ return _ProcessExecutionTask (callback_process , parent_connection )
183+
184+ def _get_receivable_count (self , max_receivable_messages = - 1 ):
185+ with self .thread_lock :
186+ available_slots = self .max_concurrent_messages - self .internal_count
187+ if max_receivable_messages > 0 :
188+ remaining_messages = max_receivable_messages - self .receiver .local_received_messages
189+ available_slots = min (available_slots , remaining_messages )
190+ return max (available_slots , 0 )
191+
192+ def _wait_for_pending_tasks (self , timeout = 0.5 ):
193+ if len (self .pending_tasks ) == 0 :
194+ return
195+ if timeout <= 0 :
196+ self ._settle_completed_tasks ()
197+ return
198+ deadline = time .time () + timeout
199+ while time .time () < deadline :
200+ if any (task .done () for task , _ in self .pending_tasks ):
201+ break
202+ time .sleep (min (0.1 , max (deadline - time .time (), 0 )))
203+ self ._settle_completed_tasks ()
204+
205+ def _settle_completed_tasks (self ):
206+ remaining_tasks = []
207+ for future , incoming_message in self .pending_tasks :
208+ if future .done ():
209+ self ._settle_task (future , incoming_message = incoming_message )
210+ else :
211+ remaining_tasks .append ((future , incoming_message ))
212+ self .pending_tasks = remaining_tasks
213+
214+ def _settle_task (self , x , incoming_message = None ):
123215 """
124216 Sets the message as completed and updates the internal count.
125217 Args:
126- x (cf.Future) : The future object representing the message processing.
218+ x: The task object representing the message processing.
127219 """
128- # Lock the internal count
129- with self .thread_lock :
130- self .internal_count -= 1
131- # Check if the future has an exception
132- [is_success ,incoming_message ] = x .result ()
133- if is_success :
134- self .receiver .complete_message (incoming_message )
135- else :
136- print (f'Abandoning message: { incoming_message } ' )
137- self .receiver .abandon_message (incoming_message ) # send back to the topic
138- return
139-
140-
220+ try :
221+ task_result = x .result ()
222+ except Exception as e :
223+ task_result = {
224+ 'success' : False ,
225+ 'error' : f'Callback worker exited before returning a result: { e } ' ,
226+ }
227+
228+ try :
229+ if incoming_message is None :
230+ return
231+ if getattr (incoming_message , '_lock_expired' , False ):
232+ logger .error (
233+ f'Skipping settlement for message { self ._get_message_id (incoming_message )} '
234+ f'because the lock expired at { getattr (incoming_message , "locked_until_utc" , None )} . '
235+ f'auto_renew_error={ getattr (incoming_message , "auto_renew_error" , None )} '
236+ )
237+ return
238+ if task_result .get ('success' ):
239+ self .receiver .complete_message (incoming_message )
240+ else :
241+ logger .error (
242+ 'Processing failed for message %s: %s' ,
243+ self ._get_message_id (incoming_message ),
244+ task_result .get ('error' , 'unknown processing failure' ),
245+ )
246+ self .receiver .abandon_message (incoming_message )
247+ except Exception as e :
248+ logger .error (f'Error in settling message: { e } ' )
249+ finally :
250+ with self .thread_lock :
251+ self .internal_count = max (self .internal_count - 1 , 0 )
252+ return
253+
254+ def _handle_lock_renew_failure (self , renewable , error ):
255+ message_id = self ._get_message_id (renewable )
256+ failure_reason = error or getattr (renewable , 'auto_renew_error' , None ) or 'lock expired before renewal could complete'
257+ logger .error (
258+ f'Error renewing lock for message { message_id } : { failure_reason } ; '
259+ f'locked_until_utc={ getattr (renewable , "locked_until_utc" , None )} '
260+ )
261+
262+ @staticmethod
263+ def _get_message_id (message ):
264+ return getattr (message , 'message_id' , None ) or getattr (message , 'messageId' , 'unknown' )
265+
266+ @staticmethod
267+ def _get_callback_execution_mode ():
268+ value = os .environ .get ('TOPIC_CALLBACK_EXECUTION_MODE' , 'process' )
269+ normalized = str (value ).strip ().lower ()
270+ if normalized in ('process' , 'thread' ):
271+ return normalized
272+ logger .warning (
273+ 'Invalid value for TOPIC_CALLBACK_EXECUTION_MODE: %s. Using default process.' ,
274+ value ,
275+ )
276+ return 'process'
277+
278+ @staticmethod
279+ def _get_process_start_method ():
280+ available_methods = mp .get_all_start_methods ()
281+ default_method = 'fork' if 'fork' in available_methods else mp .get_start_method () or available_methods [0 ]
282+ configured_method = os .environ .get ('TOPIC_CALLBACK_PROCESS_START_METHOD' , default_method )
283+ normalized_method = str (configured_method ).strip ().lower ()
284+ if normalized_method in available_methods :
285+ return normalized_method
286+ logger .warning (
287+ 'Invalid value for TOPIC_CALLBACK_PROCESS_START_METHOD: %s. Using default %s.' ,
288+ configured_method ,
289+ default_method ,
290+ )
291+ return default_method
292+
293+ def _get_process_context (self ):
294+ if self .callback_execution_mode != 'process' :
295+ return None
296+ try :
297+ return mp .get_context (self .callback_process_start_method )
298+ except ValueError :
299+ logger .warning (
300+ 'Process start method %s is unavailable. Falling back to thread execution.' ,
301+ self .callback_process_start_method ,
302+ )
303+ self .callback_execution_mode = 'thread'
304+ return None
305+
306+
307+ def _run_callback_in_subprocess (message_payload , callbackfn , result_connection ):
308+ try :
309+ queue_message = QueueMessage .data_from (message_payload )
310+ callbackfn (queue_message )
311+ result_connection .send ({'success' : True , 'error' : None })
312+ except BaseException as exc : # pragma: no cover - exercised through the parent process wrapper
313+ result_connection .send ({
314+ 'success' : False ,
315+ 'error' : '' .join (traceback .format_exception (type (exc ), exc , exc .__traceback__ )).strip (),
316+ })
317+ finally :
318+ result_connection .close ()
319+
320+
321+ class _FutureExecutionTask :
322+ def __init__ (self , future ):
323+ self ._future = future
324+
325+ def done (self ):
326+ return self ._future .done ()
327+
328+ def result (self ):
329+ return self ._future .result ()
330+
331+
332+ class _ProcessExecutionTask :
333+ def __init__ (self , process , result_connection ):
334+ self ._process = process
335+ self ._result_connection = result_connection
336+ self ._result = None
337+
338+ def done (self ):
339+ return not self ._process .is_alive ()
340+
341+ def result (self ):
342+ if self ._result is not None :
343+ return self ._result
344+
345+ self ._process .join ()
346+ try :
347+ if self ._result_connection .poll ():
348+ self ._result = self ._result_connection .recv ()
349+ else :
350+ self ._result = {
351+ 'success' : False ,
352+ 'error' : f'Callback worker exited with code { self ._process .exitcode } without returning a result.' ,
353+ }
354+ finally :
355+ self ._result_connection .close ()
356+
357+ return self ._result
0 commit comments