From 30e447b527d6744b5ce72e9460f21e22675a3126 Mon Sep 17 00:00:00 2001 From: sujata-m Date: Tue, 17 Mar 2026 15:52:52 +0530 Subject: [PATCH 1/5] Feature fix lock issue --- CHANGELOG.md | 8 +- MANIFEST.in | 1 + src/python_ms_core/core/topic/azure_topic.py | 95 ++++++++++++++----- src/python_ms_core/version.py | 2 +- .../unit_tests/test_topic/test_azure_topic.py | 70 ++++++++++++++ version.py | 1 + 6 files changed, 152 insertions(+), 25 deletions(-) create mode 100644 MANIFEST.in create mode 100644 tests/unit_tests/test_topic/test_azure_topic.py create mode 100644 version.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 5f9d565..d1ee198 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Change log +# Version 0.0.25 +### Fixes +- **Azure Topic Settlement Stability:** Moved Azure Service Bus message settlement back onto the receiver-owning loop instead of settling from worker callback threads. This keeps receive and complete/abandon operations on the same receiver flow for long-running jobs. +- **Receiver Slot Tracking:** Reserved and released in-flight message slots on the receive loop so concurrency limits remain accurate while messages are still processing. +- **Lock Renewal Diagnostics:** Added logging for Service Bus lock-renew failures to make long-running lock-loss issues visible before final settlement. + # Version 0.0.23 - Updated unit test cases pipeline - Added support to upload test cases results on Azure blob @@ -109,4 +115,4 @@ Reference task: - Added classes and methods - Topic - publish - - subscribe \ No newline at end of file + - subscribe diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..540b720 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1 @@ +include requirements.txt \ No newline at end of file diff --git a/src/python_ms_core/core/topic/azure_topic.py b/src/python_ms_core/core/topic/azure_topic.py index deb25c3..05eab2b 100644 --- a/src/python_ms_core/core/topic/azure_topic.py +++ b/src/python_ms_core/core/topic/azure_topic.py @@ -54,6 +54,7 @@ def __init__(self, config: TopicConfig=None, topic_name=None, max_concurrent_mes self.max_renewal_duration = 86400 # Renew the message upto 1 day self.wait_time_for_message = 5 self.thread_lock = threading.Lock() + self.pending_tasks = [] def publish(self, data: QueueMessage): @@ -78,23 +79,36 @@ def subscribe(self, subscription: str, callback, max_receivable_messages=-1): self.receiver.local_received_messages = 0 while True: try: - to_receive = (self.max_concurrent_messages - self.internal_count) - total_messages_to_recieve_more = max_receivable_messages - self.receiver.local_received_messages - if max_receivable_messages > 0: - to_receive = min(to_receive, total_messages_to_recieve_more) + self._settle_completed_tasks() + to_receive = self._get_receivable_count(max_receivable_messages=max_receivable_messages) + if max_receivable_messages > 0 and self.receiver.local_received_messages >= max_receivable_messages: + if len(self.pending_tasks) == 0: + break + self._wait_for_pending_tasks(timeout=0.5) + continue if to_receive > 0: messages = self.receiver.receive_messages(max_message_count=to_receive, max_wait_time=self.wait_time_for_message) if not messages or len(messages) == 0: + if len(self.pending_tasks) > 0: + self._wait_for_pending_tasks(timeout=0.5) continue self.receiver.local_received_messages += len(messages) + with self.thread_lock: + self.internal_count += len(messages) for message in messages: - self.lock_renewal.register(self.receiver, message, max_lock_renewal_duration=self.max_renewal_duration) + self.lock_renewal.register( + self.receiver, + message, + max_lock_renewal_duration=self.max_renewal_duration, + on_lock_renew_failure=self._handle_lock_renew_failure, + ) execution_task = self.executor.submit(self.internal_callback, message, callback) - execution_task.add_done_callback(lambda x: self.settle_message(x)) - if self.receiver.local_received_messages >= max_receivable_messages and max_receivable_messages > 0: # Break if the messages are more than max_receivable_messages - break + self.pending_tasks.append((execution_task, message)) else: - time.sleep(self.wait_time_for_message) + if len(self.pending_tasks) > 0: + self._wait_for_pending_tasks(timeout=0.5) + else: + time.sleep(self.wait_time_for_message) except Exception as e: logger.error(f'Error in receiving messages: {e}') @@ -109,8 +123,6 @@ def internal_callback(self, message, callbackfn): ServiceBusMessage: The processed message. """ try: - with self.thread_lock: - self.internal_count += 1 # thread safe. queue_message = QueueMessage.data_from(str(message)) callbackfn(queue_message) return [True,message] @@ -120,21 +132,58 @@ def internal_callback(self, message, callbackfn): def settle_message(self, x: cf.Future): + return self._settle_task(x) + + def _get_receivable_count(self, max_receivable_messages=-1): + with self.thread_lock: + available_slots = self.max_concurrent_messages - self.internal_count + if max_receivable_messages > 0: + remaining_messages = max_receivable_messages - self.receiver.local_received_messages + available_slots = min(available_slots, remaining_messages) + return max(available_slots, 0) + + def _wait_for_pending_tasks(self, timeout=0.5): + if len(self.pending_tasks) == 0: + return + futures = [future for future, _ in self.pending_tasks] + cf.wait(futures, timeout=timeout, return_when=cf.FIRST_COMPLETED) + self._settle_completed_tasks() + + def _settle_completed_tasks(self): + remaining_tasks = [] + for future, incoming_message in self.pending_tasks: + if future.done(): + self._settle_task(future, incoming_message=incoming_message) + else: + remaining_tasks.append((future, incoming_message)) + self.pending_tasks = remaining_tasks + + def _settle_task(self, x: cf.Future, incoming_message=None): """ Sets the message as completed and updates the internal count. Args: x (cf.Future): The future object representing the message processing. """ - # Lock the internal count - with self.thread_lock: - self.internal_count -= 1 - # Check if the future has an exception - [is_success,incoming_message] = x.result() - if is_success: - self.receiver.complete_message(incoming_message) - else: - print(f'Abandoning message: {incoming_message}') - self.receiver.abandon_message(incoming_message) # send back to the topic - return + try: + [is_success, settled_message] = x.result() + if settled_message is not None: + incoming_message = settled_message + if incoming_message is None: + return + if is_success: + self.receiver.complete_message(incoming_message) + else: + logger.info(f'Abandoning message: {incoming_message}') + self.receiver.abandon_message(incoming_message) # send back to the topic + except Exception as e: + logger.error(f'Error in settling message: {e}') + finally: + with self.thread_lock: + self.internal_count = max(self.internal_count - 1, 0) + return + + def _handle_lock_renew_failure(self, renewable, error): + message_id = getattr(renewable, 'message_id', None) or getattr(renewable, 'messageId', 'unknown') + logger.error(f'Error renewing lock for message {message_id}: {error}') - \ No newline at end of file + diff --git a/src/python_ms_core/version.py b/src/python_ms_core/version.py index 071e34f..858d0a1 100644 --- a/src/python_ms_core/version.py +++ b/src/python_ms_core/version.py @@ -1 +1 @@ -__version__ = '0.0.24' \ No newline at end of file +__version__ = '0.0.25' diff --git a/tests/unit_tests/test_topic/test_azure_topic.py b/tests/unit_tests/test_topic/test_azure_topic.py new file mode 100644 index 0000000..330ced7 --- /dev/null +++ b/tests/unit_tests/test_topic/test_azure_topic.py @@ -0,0 +1,70 @@ +import unittest +import concurrent.futures as cf +from unittest.mock import MagicMock, patch + +from src.python_ms_core.core.topic.azure_topic import AzureTopic +from src.python_ms_core.core.queue.models.queue_message import QueueMessage + + +class TestAzureTopic(unittest.TestCase): + + @patch('src.python_ms_core.core.topic.azure_topic.AutoLockRenewer') + @patch('src.python_ms_core.core.topic.azure_topic.ServiceBusClient') + def test_subscribe_settles_completed_tasks_on_receiver_loop(self, mock_service_bus_client, mock_auto_lock_renewer): + mock_client = MagicMock() + mock_receiver = MagicMock() + mock_message = MagicMock() + mock_future = MagicMock() + mock_config = MagicMock(connection_string='Endpoint=sb://test/') + + mock_service_bus_client.from_connection_string.return_value = mock_client + mock_client.get_topic_sender.return_value = MagicMock() + mock_client.get_subscription_receiver.return_value = mock_receiver + mock_receiver.receive_messages.side_effect = [[mock_message]] + + topic = AzureTopic(config=mock_config, topic_name='mock-topic', max_concurrent_messages=1) + callback = MagicMock() + + def submit_side_effect(fn, *args, **kwargs): + mock_future = cf.Future() + mock_future.set_result(fn(*args, **kwargs)) + return mock_future + + topic.executor.submit = MagicMock(side_effect=submit_side_effect) + with patch.object(QueueMessage, 'data_from', return_value=QueueMessage()): + topic.subscribe(subscription='mock-subscription', callback=callback, max_receivable_messages=1) + + callback.assert_called_once() + mock_auto_lock_renewer.return_value.register.assert_called_once() + mock_receiver.complete_message.assert_called_once_with(mock_message) + mock_receiver.abandon_message.assert_not_called() + + @patch('src.python_ms_core.core.topic.azure_topic.logger') + @patch('src.python_ms_core.core.topic.azure_topic.AutoLockRenewer') + @patch('src.python_ms_core.core.topic.azure_topic.ServiceBusClient') + def test_settle_task_logs_error_and_releases_slot(self, mock_service_bus_client, mock_auto_lock_renewer, mock_logger): + mock_client = MagicMock() + mock_receiver = MagicMock() + mock_message = MagicMock() + mock_future = MagicMock() + mock_config = MagicMock(connection_string='Endpoint=sb://test/') + + mock_service_bus_client.from_connection_string.return_value = mock_client + mock_client.get_topic_sender.return_value = MagicMock() + mock_client.get_subscription_receiver.return_value = mock_receiver + mock_future.result.return_value = [True, mock_message] + mock_receiver.complete_message.side_effect = Exception('Mocked settlement failure') + + topic = AzureTopic(config=mock_config, topic_name='mock-topic', max_concurrent_messages=1) + topic.receiver = mock_receiver + topic.internal_count = 1 + + topic._settle_task(mock_future, incoming_message=mock_message) + + mock_receiver.complete_message.assert_called_once_with(mock_message) + mock_logger.error.assert_called_once_with('Error in settling message: Mocked settlement failure') + self.assertEqual(topic.internal_count, 0) + + +if __name__ == '__main__': + unittest.main() diff --git a/version.py b/version.py new file mode 100644 index 0000000..494cb0a --- /dev/null +++ b/version.py @@ -0,0 +1 @@ +version = '0.0.25' From 7c4a4ad16d5e8825587366d91a17dd3b71598bb6 Mon Sep 17 00:00:00 2001 From: sujata-m Date: Tue, 17 Mar 2026 23:12:13 +0530 Subject: [PATCH 2/5] Trying from fix lock issue --- src/python_ms_core/core/topic/azure_topic.py | 146 +++++++++++++++++- src/python_ms_core/version.py | 2 +- .../unit_tests/test_topic/test_azure_topic.py | 137 ++++++++++++++++ version.py | 5 +- 4 files changed, 282 insertions(+), 8 deletions(-) diff --git a/src/python_ms_core/core/topic/azure_topic.py b/src/python_ms_core/core/topic/azure_topic.py index 05eab2b..a9e3f13 100644 --- a/src/python_ms_core/core/topic/azure_topic.py +++ b/src/python_ms_core/core/topic/azure_topic.py @@ -1,6 +1,8 @@ import json +import gc import logging +import os import time from ..config.config import TopicConfig from ..resource_errors import ExceptionHandler @@ -12,6 +14,11 @@ import concurrent.futures as cf import threading +try: + import psutil +except ImportError: # pragma: no cover - dependency exists in the package, but keep logging resilient. + psutil = None + logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s', datefmt='%Y-%m-%d %H:%M:%S') logger = logging.getLogger('AzureTopic') @@ -50,11 +57,37 @@ def __init__(self, config: TopicConfig=None, topic_name=None, max_concurrent_mes self.publisher = self.client.get_topic_sender(topic_name=topic_name) self.executor = ThreadPoolExecutor(max_workers=max_concurrent_messages) self.internal_count = 0 - self.lock_renewal = AutoLockRenewer(max_workers=max_concurrent_messages) - self.max_renewal_duration = 86400 # Renew the message upto 1 day + self.max_renewal_duration = self._get_positive_int_from_env( + 'TOPIC_MAX_LOCK_RENEWAL_DURATION_SECONDS', + 86400, + ) # Renew the message upto 1 day + self.lock_renewal_margin = self._get_positive_int_from_env( + 'TOPIC_LOCK_RENEWAL_MARGIN_SECONDS', + 60, + ) + renewer_max_workers = max(max_concurrent_messages, 2) + self.lock_renewal = AutoLockRenewer( + max_lock_renewal_duration=self.max_renewal_duration, + on_lock_renew_failure=self._handle_lock_renew_failure, + max_workers=renewer_max_workers, + ) + # The SDK default renews only in the last 10 seconds of the lock window. + # Start earlier so long-running jobs have more headroom for scheduler jitter. + self.lock_renewal._renew_period = min(self.lock_renewal_margin, self.max_renewal_duration) + self.lock_renew_receiver = _LockRenewLoggingReceiver(self) self.wait_time_for_message = 5 self.thread_lock = threading.Lock() self.pending_tasks = [] + self._process = None + self._prime_runtime_samplers() + logger.info( + 'Configured lock renewal for topic %s: max_lock_renewal_duration=%s seconds, ' + 'renew_margin=%s seconds, renewer_max_workers=%s', + self.topic_name, + self.max_renewal_duration, + self.lock_renewal_margin, + renewer_max_workers, + ) def publish(self, data: QueueMessage): @@ -97,7 +130,7 @@ def subscribe(self, subscription: str, callback, max_receivable_messages=-1): self.internal_count += len(messages) for message in messages: self.lock_renewal.register( - self.receiver, + self.lock_renew_receiver, message, max_lock_renewal_duration=self.max_renewal_duration, on_lock_renew_failure=self._handle_lock_renew_failure, @@ -170,6 +203,13 @@ def _settle_task(self, x: cf.Future, incoming_message=None): incoming_message = settled_message if incoming_message is None: return + if getattr(incoming_message, '_lock_expired', False): + logger.error( + f'Skipping settlement for message {self._get_message_id(incoming_message)} ' + f'because the lock expired at {getattr(incoming_message, "locked_until_utc", None)}. ' + f'auto_renew_error={getattr(incoming_message, "auto_renew_error", None)}' + ) + return if is_success: self.receiver.complete_message(incoming_message) else: @@ -183,7 +223,101 @@ def _settle_task(self, x: cf.Future, incoming_message=None): return def _handle_lock_renew_failure(self, renewable, error): - message_id = getattr(renewable, 'message_id', None) or getattr(renewable, 'messageId', 'unknown') - logger.error(f'Error renewing lock for message {message_id}: {error}') + message_id = self._get_message_id(renewable) + failure_reason = error or getattr(renewable, 'auto_renew_error', None) or 'lock expired before renewal could complete' + logger.error( + f'Error renewing lock for message {message_id}: {failure_reason}; ' + f'locked_until_utc={getattr(renewable, "locked_until_utc", None)}; ' + f'runtime_snapshot={self._get_runtime_snapshot()}' + ) - + @staticmethod + def _get_message_id(message): + return getattr(message, 'message_id', None) or getattr(message, 'messageId', 'unknown') + + def _prime_runtime_samplers(self): + if psutil is None: + return + try: + self._process = psutil.Process(os.getpid()) + self._process.cpu_percent(interval=None) + psutil.cpu_percent(interval=None) + except Exception: # pragma: no cover - best effort diagnostics + self._process = None + + def _get_runtime_snapshot(self): + return f'{self._get_memory_snapshot()}, {self._get_cpu_snapshot()}, {self._get_gc_snapshot()}' + + def _get_memory_snapshot(self): + if self._process is None: + return 'memory=psutil-unavailable' + try: + memory_info = self._process.memory_info() + rss_mb = memory_info.rss / (1024 * 1024) + vms_mb = memory_info.vms / (1024 * 1024) + return f'memory=rss_mb={rss_mb:.2f}, vms_mb={vms_mb:.2f}, num_threads={self._process.num_threads()}' + except Exception as exc: # pragma: no cover - diagnostic fallback + return f'memory=unavailable({exc})' + + def _get_cpu_snapshot(self): + if self._process is None: + return 'cpu=psutil-unavailable' + try: + process_cpu_percent = self._process.cpu_percent(interval=None) + system_cpu_percent = psutil.cpu_percent(interval=None) + return ( + f'cpu=process_percent={process_cpu_percent:.2f}, ' + f'system_percent={system_cpu_percent:.2f}' + ) + except Exception as exc: # pragma: no cover - diagnostic fallback + return f'cpu=unavailable({exc})' + + @staticmethod + def _get_gc_snapshot(): + try: + gc_counts = gc.get_count() + gc_thresholds = gc.get_threshold() + gc_stats = gc.get_stats() + generation_summaries = [] + for generation, stats in enumerate(gc_stats): + generation_summaries.append( + 'gen{generation}[collections={collections}, collected={collected}, uncollectable={uncollectable}]'.format( + generation=generation, + collections=stats.get('collections', 0), + collected=stats.get('collected', 0), + uncollectable=stats.get('uncollectable', 0), + ) + ) + return ( + f'gc=enabled={gc.isenabled()}, counts={gc_counts}, thresholds={gc_thresholds}, ' + f'stats={"; ".join(generation_summaries)}' + ) + except Exception as exc: # pragma: no cover - diagnostic fallback + return f'gc=unavailable({exc})' + + @staticmethod + def _get_positive_int_from_env(name, default): + value = os.environ.get(name) + if value is None: + return default + try: + parsed = int(value) + if parsed > 0: + return parsed + except (TypeError, ValueError): + pass + logger.warning(f'Invalid value for {name}: {value}. Using default {default}.') + return default + +class _LockRenewLoggingReceiver: + def __init__(self, topic): + self._topic = topic + + def renew_message_lock(self, renewable): + logger.info( + 'Attempting lock renewal for message %s; locked_until_utc=%s; runtime_snapshot=%s', + self._topic._get_message_id(renewable), + getattr(renewable, 'locked_until_utc', None), + self._topic._get_runtime_snapshot(), + ) + return self._topic.receiver.renew_message_lock(renewable) diff --git a/src/python_ms_core/version.py b/src/python_ms_core/version.py index 858d0a1..58bdbd8 100644 --- a/src/python_ms_core/version.py +++ b/src/python_ms_core/version.py @@ -1 +1 @@ -__version__ = '0.0.25' +__version__ = '0.2.5.1' diff --git a/tests/unit_tests/test_topic/test_azure_topic.py b/tests/unit_tests/test_topic/test_azure_topic.py index 330ced7..2be079e 100644 --- a/tests/unit_tests/test_topic/test_azure_topic.py +++ b/tests/unit_tests/test_topic/test_azure_topic.py @@ -8,6 +8,25 @@ class TestAzureTopic(unittest.TestCase): + @patch('src.python_ms_core.core.topic.azure_topic.AutoLockRenewer') + @patch('src.python_ms_core.core.topic.azure_topic.ServiceBusClient') + def test_init_sets_long_running_lock_renewal_defaults(self, mock_service_bus_client, mock_auto_lock_renewer): + mock_client = MagicMock() + mock_config = MagicMock(connection_string='Endpoint=sb://test/') + mock_renewer = MagicMock() + + mock_service_bus_client.from_connection_string.return_value = mock_client + mock_client.get_topic_sender.return_value = MagicMock() + mock_auto_lock_renewer.return_value = mock_renewer + + AzureTopic(config=mock_config, topic_name='mock-topic', max_concurrent_messages=1) + + mock_auto_lock_renewer.assert_called_once() + _, kwargs = mock_auto_lock_renewer.call_args + self.assertEqual(kwargs['max_lock_renewal_duration'], 86400) + self.assertEqual(kwargs['max_workers'], 2) + self.assertEqual(mock_renewer._renew_period, 60) + @patch('src.python_ms_core.core.topic.azure_topic.AutoLockRenewer') @patch('src.python_ms_core.core.topic.azure_topic.ServiceBusClient') def test_subscribe_settles_completed_tasks_on_receiver_loop(self, mock_service_bus_client, mock_auto_lock_renewer): @@ -16,6 +35,7 @@ def test_subscribe_settles_completed_tasks_on_receiver_loop(self, mock_service_b mock_message = MagicMock() mock_future = MagicMock() mock_config = MagicMock(connection_string='Endpoint=sb://test/') + mock_message._lock_expired = False mock_service_bus_client.from_connection_string.return_value = mock_client mock_client.get_topic_sender.return_value = MagicMock() @@ -48,6 +68,7 @@ def test_settle_task_logs_error_and_releases_slot(self, mock_service_bus_client, mock_message = MagicMock() mock_future = MagicMock() mock_config = MagicMock(connection_string='Endpoint=sb://test/') + mock_message._lock_expired = False mock_service_bus_client.from_connection_string.return_value = mock_client mock_client.get_topic_sender.return_value = MagicMock() @@ -65,6 +86,122 @@ def test_settle_task_logs_error_and_releases_slot(self, mock_service_bus_client, mock_logger.error.assert_called_once_with('Error in settling message: Mocked settlement failure') self.assertEqual(topic.internal_count, 0) + @patch('src.python_ms_core.core.topic.azure_topic.logger') + @patch('src.python_ms_core.core.topic.azure_topic.AutoLockRenewer') + @patch('src.python_ms_core.core.topic.azure_topic.ServiceBusClient') + def test_settle_task_skips_expired_message(self, mock_service_bus_client, mock_auto_lock_renewer, mock_logger): + mock_client = MagicMock() + mock_receiver = MagicMock() + mock_message = MagicMock() + mock_future = MagicMock() + mock_config = MagicMock(connection_string='Endpoint=sb://test/') + + mock_message._lock_expired = True + mock_message.message_id = 'message-1' + mock_message.locked_until_utc = '2026-03-17T09:39:28Z' + mock_message.auto_renew_error = None + mock_service_bus_client.from_connection_string.return_value = mock_client + mock_client.get_topic_sender.return_value = MagicMock() + mock_client.get_subscription_receiver.return_value = mock_receiver + mock_future.result.return_value = [True, mock_message] + + topic = AzureTopic(config=mock_config, topic_name='mock-topic', max_concurrent_messages=1) + topic.receiver = mock_receiver + topic.internal_count = 1 + + topic._settle_task(mock_future, incoming_message=mock_message) + + mock_receiver.complete_message.assert_not_called() + mock_receiver.abandon_message.assert_not_called() + mock_logger.error.assert_called_once_with( + 'Skipping settlement for message message-1 because the lock expired at ' + '2026-03-17T09:39:28Z. auto_renew_error=None' + ) + self.assertEqual(topic.internal_count, 0) + + @patch('src.python_ms_core.core.topic.azure_topic.logger') + @patch('src.python_ms_core.core.topic.azure_topic.AutoLockRenewer') + @patch('src.python_ms_core.core.topic.azure_topic.ServiceBusClient') + def test_handle_lock_renew_failure_logs_when_sdk_returns_no_error( + self, + mock_service_bus_client, + mock_auto_lock_renewer, + mock_logger, + ): + mock_client = MagicMock() + mock_config = MagicMock(connection_string='Endpoint=sb://test/') + mock_message = MagicMock() + + mock_service_bus_client.from_connection_string.return_value = mock_client + mock_client.get_topic_sender.return_value = MagicMock() + mock_message.message_id = 'message-1' + mock_message.locked_until_utc = '2026-03-17T09:39:28Z' + mock_message.auto_renew_error = None + + topic = AzureTopic(config=mock_config, topic_name='mock-topic', max_concurrent_messages=1) + topic._get_runtime_snapshot = MagicMock( + return_value=( + 'memory=rss_mb=128.00, vms_mb=256.00, num_threads=4, ' + 'cpu=process_percent=80.00, system_percent=91.00, ' + 'gc=enabled=True, counts=(1, 2, 3), thresholds=(700, 10, 10), ' + 'stats=gen0[collections=1, collected=2, uncollectable=0]' + ) + ) + + topic._handle_lock_renew_failure(mock_message, None) + + mock_logger.error.assert_called_once_with( + 'Error renewing lock for message message-1: lock expired before renewal could complete; ' + 'locked_until_utc=2026-03-17T09:39:28Z; ' + 'runtime_snapshot=memory=rss_mb=128.00, vms_mb=256.00, num_threads=4, ' + 'cpu=process_percent=80.00, system_percent=91.00, ' + 'gc=enabled=True, counts=(1, 2, 3), thresholds=(700, 10, 10), ' + 'stats=gen0[collections=1, collected=2, uncollectable=0]' + ) + + @patch('src.python_ms_core.core.topic.azure_topic.logger') + @patch('src.python_ms_core.core.topic.azure_topic.AutoLockRenewer') + @patch('src.python_ms_core.core.topic.azure_topic.ServiceBusClient') + def test_lock_renew_attempt_logs_memory_snapshot( + self, + mock_service_bus_client, + mock_auto_lock_renewer, + mock_logger, + ): + mock_client = MagicMock() + mock_config = MagicMock(connection_string='Endpoint=sb://test/') + mock_service_bus_client.from_connection_string.return_value = mock_client + mock_client.get_topic_sender.return_value = MagicMock() + mock_auto_lock_renewer.return_value = MagicMock() + + topic = AzureTopic(config=mock_config, topic_name='mock-topic', max_concurrent_messages=1) + topic._get_runtime_snapshot = MagicMock( + return_value=( + 'memory=rss_mb=64.00, vms_mb=128.00, num_threads=3, ' + 'cpu=process_percent=55.00, system_percent=70.00, ' + 'gc=enabled=True, counts=(4, 5, 6), thresholds=(700, 10, 10), ' + 'stats=gen0[collections=3, collected=10, uncollectable=0]' + ) + ) + + mock_message = MagicMock() + mock_message.message_id = 'message-2' + mock_message.locked_until_utc = '2026-03-17T09:39:33Z' + topic.receiver = MagicMock() + + topic.lock_renew_receiver.renew_message_lock(mock_message) + + mock_logger.info.assert_any_call( + 'Attempting lock renewal for message %s; locked_until_utc=%s; runtime_snapshot=%s', + 'message-2', + '2026-03-17T09:39:33Z', + 'memory=rss_mb=64.00, vms_mb=128.00, num_threads=3, ' + 'cpu=process_percent=55.00, system_percent=70.00, ' + 'gc=enabled=True, counts=(4, 5, 6), thresholds=(700, 10, 10), ' + 'stats=gen0[collections=3, collected=10, uncollectable=0]', + ) + topic.receiver.renew_message_lock.assert_called_once_with(mock_message) + if __name__ == '__main__': unittest.main() diff --git a/version.py b/version.py index 494cb0a..f0cce3d 100644 --- a/version.py +++ b/version.py @@ -1 +1,4 @@ -version = '0.0.25' +version = '0.2.5.1' +lastCommit = '30e447b527d6744b5ce72e9460f21e22675a3126' +lastCommitShort = '30e4' +buildDate = '2026-03-17' From 7284ee8ae462d5a3295a201145d3dc67f851274c Mon Sep 17 00:00:00 2001 From: sujata-m Date: Wed, 18 Mar 2026 15:23:11 +0530 Subject: [PATCH 3/5] Trying from fix lock issue --- src/python_ms_core/core/topic/azure_topic.py | 197 ++++++++++-- src/python_ms_core/version.py | 2 +- .../unit_tests/test_topic/test_azure_topic.py | 288 ++++++++++++++++-- version.py | 8 +- 4 files changed, 440 insertions(+), 55 deletions(-) diff --git a/src/python_ms_core/core/topic/azure_topic.py b/src/python_ms_core/core/topic/azure_topic.py index a9e3f13..dc8eb38 100644 --- a/src/python_ms_core/core/topic/azure_topic.py +++ b/src/python_ms_core/core/topic/azure_topic.py @@ -1,17 +1,16 @@ import json - import gc import logging +import multiprocessing as mp import os import time +import traceback from ..config.config import TopicConfig -from ..resource_errors import ExceptionHandler from concurrent.futures import ThreadPoolExecutor from .abstract.topic_abstract import TopicAbstract from ..queue.models.queue_message import QueueMessage from azure.servicebus import ServiceBusClient, ServiceBusMessage from azure.servicebus import AutoLockRenewer -import concurrent.futures as cf import threading try: @@ -56,6 +55,9 @@ def __init__(self, config: TopicConfig=None, topic_name=None, max_concurrent_mes self.topic_name = topic_name self.publisher = self.client.get_topic_sender(topic_name=topic_name) self.executor = ThreadPoolExecutor(max_workers=max_concurrent_messages) + self.callback_execution_mode = self._get_callback_execution_mode() + self.callback_process_start_method = self._get_process_start_method() + self.process_context = self._get_process_context() self.internal_count = 0 self.max_renewal_duration = self._get_positive_int_from_env( 'TOPIC_MAX_LOCK_RENEWAL_DURATION_SECONDS', @@ -81,9 +83,11 @@ def __init__(self, config: TopicConfig=None, topic_name=None, max_concurrent_mes self._process = None self._prime_runtime_samplers() logger.info( - 'Configured lock renewal for topic %s: max_lock_renewal_duration=%s seconds, ' - 'renew_margin=%s seconds, renewer_max_workers=%s', + 'Configured AzureTopic %s: execution_mode=%s, process_start_method=%s, ' + 'max_lock_renewal_duration=%s seconds, renew_margin=%s seconds, renewer_max_workers=%s', self.topic_name, + self.callback_execution_mode, + self.callback_process_start_method, self.max_renewal_duration, self.lock_renewal_margin, renewer_max_workers, @@ -129,13 +133,13 @@ def subscribe(self, subscription: str, callback, max_receivable_messages=-1): with self.thread_lock: self.internal_count += len(messages) for message in messages: + execution_task = self._submit_processing_task(message, callback) self.lock_renewal.register( self.lock_renew_receiver, message, max_lock_renewal_duration=self.max_renewal_duration, on_lock_renew_failure=self._handle_lock_renew_failure, ) - execution_task = self.executor.submit(self.internal_callback, message, callback) self.pending_tasks.append((execution_task, message)) else: if len(self.pending_tasks) > 0: @@ -146,27 +150,64 @@ def subscribe(self, subscription: str, callback, max_receivable_messages=-1): logger.error(f'Error in receiving messages: {e}') - def internal_callback(self, message, callbackfn): + def internal_callback(self, message_payload, callbackfn): """ Internal callback function that processes a message and invokes the callback function. Args: - message (ServiceBusMessage): The message to process. + message_payload (str): The message payload to process. callbackfn (function): The callback function to invoke. Returns: - ServiceBusMessage: The processed message. + dict: The callback status payload. """ try: - queue_message = QueueMessage.data_from(str(message)) + queue_message = QueueMessage.data_from(message_payload) callbackfn(queue_message) - return [True,message] + return {'success': True, 'error': None} except Exception as e: - logger.error(f'Error in processing message: {e}') - return [False,message] + return { + 'success': False, + 'error': ''.join(traceback.format_exception(type(e), e, e.__traceback__)).strip(), + } - def settle_message(self, x: cf.Future): + def settle_message(self, x): return self._settle_task(x) + def _submit_processing_task(self, message, callback): + message_payload = str(message) + if self.callback_execution_mode == 'process': + try: + return self._submit_process_task(message_payload, callback) + except Exception as exc: + logger.warning( + 'Falling back to thread execution for message %s because process start failed: %s', + self._get_message_id(message), + exc, + ) + return self._submit_thread_task(message_payload, callback) + + def _submit_thread_task(self, message_payload, callback): + future = self.executor.submit(self.internal_callback, message_payload, callback) + return _FutureExecutionTask(future) + + def _submit_process_task(self, message_payload, callback): + if self.process_context is None: + raise RuntimeError('Process execution mode is not available for this environment.') + + parent_connection, child_connection = self.process_context.Pipe(duplex=False) + callback_process = self.process_context.Process( + target=_run_callback_in_subprocess, + args=(message_payload, callback, child_connection), + ) + try: + callback_process.start() + except Exception: + parent_connection.close() + child_connection.close() + raise + child_connection.close() + return _ProcessExecutionTask(callback_process, parent_connection) + def _get_receivable_count(self, max_receivable_messages=-1): with self.thread_lock: available_slots = self.max_concurrent_messages - self.internal_count @@ -178,8 +219,14 @@ def _get_receivable_count(self, max_receivable_messages=-1): def _wait_for_pending_tasks(self, timeout=0.5): if len(self.pending_tasks) == 0: return - futures = [future for future, _ in self.pending_tasks] - cf.wait(futures, timeout=timeout, return_when=cf.FIRST_COMPLETED) + if timeout <= 0: + self._settle_completed_tasks() + return + deadline = time.time() + timeout + while time.time() < deadline: + if any(task.done() for task, _ in self.pending_tasks): + break + time.sleep(min(0.1, max(deadline - time.time(), 0))) self._settle_completed_tasks() def _settle_completed_tasks(self): @@ -191,16 +238,21 @@ def _settle_completed_tasks(self): remaining_tasks.append((future, incoming_message)) self.pending_tasks = remaining_tasks - def _settle_task(self, x: cf.Future, incoming_message=None): + def _settle_task(self, x, incoming_message=None): """ Sets the message as completed and updates the internal count. Args: - x (cf.Future): The future object representing the message processing. + x: The task object representing the message processing. """ try: - [is_success, settled_message] = x.result() - if settled_message is not None: - incoming_message = settled_message + task_result = x.result() + except Exception as e: + task_result = { + 'success': False, + 'error': f'Callback worker exited before returning a result: {e}', + } + + try: if incoming_message is None: return if getattr(incoming_message, '_lock_expired', False): @@ -210,11 +262,15 @@ def _settle_task(self, x: cf.Future, incoming_message=None): f'auto_renew_error={getattr(incoming_message, "auto_renew_error", None)}' ) return - if is_success: + if task_result.get('success'): self.receiver.complete_message(incoming_message) else: - logger.info(f'Abandoning message: {incoming_message}') - self.receiver.abandon_message(incoming_message) # send back to the topic + logger.error( + 'Processing failed for message %s: %s', + self._get_message_id(incoming_message), + task_result.get('error', 'unknown processing failure'), + ) + self.receiver.abandon_message(incoming_message) except Exception as e: logger.error(f'Error in settling message: {e}') finally: @@ -309,6 +365,46 @@ def _get_positive_int_from_env(name, default): logger.warning(f'Invalid value for {name}: {value}. Using default {default}.') return default + @staticmethod + def _get_callback_execution_mode(): + value = os.environ.get('TOPIC_CALLBACK_EXECUTION_MODE', 'process') + normalized = str(value).strip().lower() + if normalized in ('process', 'thread'): + return normalized + logger.warning( + 'Invalid value for TOPIC_CALLBACK_EXECUTION_MODE: %s. Using default process.', + value, + ) + return 'process' + + @staticmethod + def _get_process_start_method(): + available_methods = mp.get_all_start_methods() + default_method = 'fork' if 'fork' in available_methods else mp.get_start_method() or available_methods[0] + configured_method = os.environ.get('TOPIC_CALLBACK_PROCESS_START_METHOD', default_method) + normalized_method = str(configured_method).strip().lower() + if normalized_method in available_methods: + return normalized_method + logger.warning( + 'Invalid value for TOPIC_CALLBACK_PROCESS_START_METHOD: %s. Using default %s.', + configured_method, + default_method, + ) + return default_method + + def _get_process_context(self): + if self.callback_execution_mode != 'process': + return None + try: + return mp.get_context(self.callback_process_start_method) + except ValueError: + logger.warning( + 'Process start method %s is unavailable. Falling back to thread execution.', + self.callback_process_start_method, + ) + self.callback_execution_mode = 'thread' + return None + class _LockRenewLoggingReceiver: def __init__(self, topic): self._topic = topic @@ -321,3 +417,56 @@ def renew_message_lock(self, renewable): self._topic._get_runtime_snapshot(), ) return self._topic.receiver.renew_message_lock(renewable) + + +def _run_callback_in_subprocess(message_payload, callbackfn, result_connection): + try: + queue_message = QueueMessage.data_from(message_payload) + callbackfn(queue_message) + result_connection.send({'success': True, 'error': None}) + except BaseException as exc: # pragma: no cover - exercised through the parent process wrapper + result_connection.send({ + 'success': False, + 'error': ''.join(traceback.format_exception(type(exc), exc, exc.__traceback__)).strip(), + }) + finally: + result_connection.close() + + +class _FutureExecutionTask: + def __init__(self, future): + self._future = future + + def done(self): + return self._future.done() + + def result(self): + return self._future.result() + + +class _ProcessExecutionTask: + def __init__(self, process, result_connection): + self._process = process + self._result_connection = result_connection + self._result = None + + def done(self): + return not self._process.is_alive() + + def result(self): + if self._result is not None: + return self._result + + self._process.join() + try: + if self._result_connection.poll(): + self._result = self._result_connection.recv() + else: + self._result = { + 'success': False, + 'error': f'Callback worker exited with code {self._process.exitcode} without returning a result.', + } + finally: + self._result_connection.close() + + return self._result diff --git a/src/python_ms_core/version.py b/src/python_ms_core/version.py index 58bdbd8..c628dd5 100644 --- a/src/python_ms_core/version.py +++ b/src/python_ms_core/version.py @@ -1 +1 @@ -__version__ = '0.2.5.1' +__version__ = '0.2.5.2' diff --git a/tests/unit_tests/test_topic/test_azure_topic.py b/tests/unit_tests/test_topic/test_azure_topic.py index 2be079e..f8d9954 100644 --- a/tests/unit_tests/test_topic/test_azure_topic.py +++ b/tests/unit_tests/test_topic/test_azure_topic.py @@ -1,39 +1,149 @@ +import os import unittest -import concurrent.futures as cf from unittest.mock import MagicMock, patch from src.python_ms_core.core.topic.azure_topic import AzureTopic -from src.python_ms_core.core.queue.models.queue_message import QueueMessage + + +class CompletedTask: + def __init__(self, result=None, error=None): + self._result = result + self._error = error + + def done(self): + return True + + def result(self): + if self._error is not None: + raise self._error + return self._result class TestAzureTopic(unittest.TestCase): + @patch.dict(os.environ, {}, clear=True) + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_context') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_all_start_methods', return_value=['fork', 'spawn']) @patch('src.python_ms_core.core.topic.azure_topic.AutoLockRenewer') @patch('src.python_ms_core.core.topic.azure_topic.ServiceBusClient') - def test_init_sets_long_running_lock_renewal_defaults(self, mock_service_bus_client, mock_auto_lock_renewer): + def test_init_sets_process_execution_defaults( + self, + mock_service_bus_client, + mock_auto_lock_renewer, + mock_get_all_start_methods, + mock_get_context, + ): mock_client = MagicMock() mock_config = MagicMock(connection_string='Endpoint=sb://test/') mock_renewer = MagicMock() + mock_process_context = MagicMock() mock_service_bus_client.from_connection_string.return_value = mock_client mock_client.get_topic_sender.return_value = MagicMock() mock_auto_lock_renewer.return_value = mock_renewer + mock_get_context.return_value = mock_process_context - AzureTopic(config=mock_config, topic_name='mock-topic', max_concurrent_messages=1) + topic = AzureTopic(config=mock_config, topic_name='mock-topic', max_concurrent_messages=1) + self.assertEqual(topic.callback_execution_mode, 'process') + self.assertEqual(topic.callback_process_start_method, 'fork') + self.assertIs(topic.process_context, mock_process_context) mock_auto_lock_renewer.assert_called_once() _, kwargs = mock_auto_lock_renewer.call_args self.assertEqual(kwargs['max_lock_renewal_duration'], 86400) self.assertEqual(kwargs['max_workers'], 2) self.assertEqual(mock_renewer._renew_period, 60) + mock_get_context.assert_called_once_with('fork') + + @patch.dict(os.environ, {}, clear=True) + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_context') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_all_start_methods', return_value=['fork', 'spawn']) + @patch('src.python_ms_core.core.topic.azure_topic.AutoLockRenewer') + @patch('src.python_ms_core.core.topic.azure_topic.ServiceBusClient') + def test_submit_processing_task_uses_process_runner_by_default( + self, + mock_service_bus_client, + mock_auto_lock_renewer, + mock_get_all_start_methods, + mock_get_context, + ): + mock_client = MagicMock() + mock_config = MagicMock(connection_string='Endpoint=sb://test/') + mock_message = MagicMock() + mock_callback = MagicMock() + + mock_service_bus_client.from_connection_string.return_value = mock_client + mock_client.get_topic_sender.return_value = MagicMock() + mock_get_context.return_value = MagicMock() + mock_message.__str__.return_value = '{"message":"hello"}' + + topic = AzureTopic(config=mock_config, topic_name='mock-topic', max_concurrent_messages=1) + topic._submit_process_task = MagicMock(return_value='process-task') + topic._submit_thread_task = MagicMock(return_value='thread-task') + task = topic._submit_processing_task(mock_message, mock_callback) + + self.assertEqual(task, 'process-task') + topic._submit_process_task.assert_called_once_with('{"message":"hello"}', mock_callback) + topic._submit_thread_task.assert_not_called() + + @patch.dict(os.environ, {}, clear=True) + @patch('src.python_ms_core.core.topic.azure_topic.logger') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_context') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_all_start_methods', return_value=['fork', 'spawn']) @patch('src.python_ms_core.core.topic.azure_topic.AutoLockRenewer') @patch('src.python_ms_core.core.topic.azure_topic.ServiceBusClient') - def test_subscribe_settles_completed_tasks_on_receiver_loop(self, mock_service_bus_client, mock_auto_lock_renewer): + def test_submit_processing_task_falls_back_to_thread_when_process_start_fails( + self, + mock_service_bus_client, + mock_auto_lock_renewer, + mock_get_all_start_methods, + mock_get_context, + mock_logger, + ): + mock_client = MagicMock() + mock_config = MagicMock(connection_string='Endpoint=sb://test/') + mock_message = MagicMock() + mock_callback = MagicMock() + + mock_service_bus_client.from_connection_string.return_value = mock_client + mock_client.get_topic_sender.return_value = MagicMock() + mock_get_context.return_value = MagicMock() + mock_message.message_id = 'message-1' + mock_message.__str__.return_value = '{"message":"hello"}' + + topic = AzureTopic(config=mock_config, topic_name='mock-topic', max_concurrent_messages=1) + topic._submit_process_task = MagicMock(side_effect=RuntimeError('process boom')) + topic._submit_thread_task = MagicMock(return_value='thread-task') + + task = topic._submit_processing_task(mock_message, mock_callback) + + self.assertEqual(task, 'thread-task') + topic._submit_thread_task.assert_called_once_with('{"message":"hello"}', mock_callback) + mock_logger.warning.assert_called_once() + warning_args = mock_logger.warning.call_args[0] + self.assertEqual( + warning_args[0], + 'Falling back to thread execution for message %s because process start failed: %s', + ) + self.assertEqual(warning_args[1], 'message-1') + self.assertEqual(str(warning_args[2]), 'process boom') + + @patch.dict(os.environ, {}, clear=True) + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_context') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_all_start_methods', return_value=['fork', 'spawn']) + @patch('src.python_ms_core.core.topic.azure_topic.AutoLockRenewer') + @patch('src.python_ms_core.core.topic.azure_topic.ServiceBusClient') + def test_subscribe_settles_completed_tasks_on_receiver_loop( + self, + mock_service_bus_client, + mock_auto_lock_renewer, + mock_get_all_start_methods, + mock_get_context, + ): mock_client = MagicMock() mock_receiver = MagicMock() mock_message = MagicMock() - mock_future = MagicMock() mock_config = MagicMock(connection_string='Endpoint=sb://test/') mock_message._lock_expired = False @@ -41,59 +151,170 @@ def test_subscribe_settles_completed_tasks_on_receiver_loop(self, mock_service_b mock_client.get_topic_sender.return_value = MagicMock() mock_client.get_subscription_receiver.return_value = mock_receiver mock_receiver.receive_messages.side_effect = [[mock_message]] + mock_get_context.return_value = MagicMock() topic = AzureTopic(config=mock_config, topic_name='mock-topic', max_concurrent_messages=1) callback = MagicMock() - - def submit_side_effect(fn, *args, **kwargs): - mock_future = cf.Future() - mock_future.set_result(fn(*args, **kwargs)) - return mock_future - - topic.executor.submit = MagicMock(side_effect=submit_side_effect) - with patch.object(QueueMessage, 'data_from', return_value=QueueMessage()): - topic.subscribe(subscription='mock-subscription', callback=callback, max_receivable_messages=1) - - callback.assert_called_once() - mock_auto_lock_renewer.return_value.register.assert_called_once() + topic._submit_processing_task = MagicMock( + return_value=CompletedTask({'success': True, 'error': None}) + ) + + topic.subscribe(subscription='mock-subscription', callback=callback, max_receivable_messages=1) + + topic._submit_processing_task.assert_called_once_with(mock_message, callback) + mock_auto_lock_renewer.return_value.register.assert_called_once_with( + topic.lock_renew_receiver, + mock_message, + max_lock_renewal_duration=topic.max_renewal_duration, + on_lock_renew_failure=topic._handle_lock_renew_failure, + ) mock_receiver.complete_message.assert_called_once_with(mock_message) mock_receiver.abandon_message.assert_not_called() + @patch.dict(os.environ, {}, clear=True) + @patch('src.python_ms_core.core.topic.azure_topic.logger') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_context') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_all_start_methods', return_value=['fork', 'spawn']) + @patch('src.python_ms_core.core.topic.azure_topic.AutoLockRenewer') + @patch('src.python_ms_core.core.topic.azure_topic.ServiceBusClient') + def test_settle_task_abandons_message_when_worker_reports_failure( + self, + mock_service_bus_client, + mock_auto_lock_renewer, + mock_get_all_start_methods, + mock_get_context, + mock_logger, + ): + mock_client = MagicMock() + mock_receiver = MagicMock() + mock_message = MagicMock() + mock_config = MagicMock(connection_string='Endpoint=sb://test/') + + mock_message._lock_expired = False + mock_message.message_id = 'message-1' + mock_service_bus_client.from_connection_string.return_value = mock_client + mock_client.get_topic_sender.return_value = MagicMock() + mock_client.get_subscription_receiver.return_value = mock_receiver + mock_get_context.return_value = MagicMock() + + topic = AzureTopic(config=mock_config, topic_name='mock-topic', max_concurrent_messages=1) + topic.receiver = mock_receiver + topic.internal_count = 1 + + topic._settle_task( + CompletedTask({'success': False, 'error': 'worker failure'}), + incoming_message=mock_message, + ) + + mock_receiver.complete_message.assert_not_called() + mock_receiver.abandon_message.assert_called_once_with(mock_message) + mock_logger.error.assert_called_once_with( + 'Processing failed for message %s: %s', + 'message-1', + 'worker failure', + ) + self.assertEqual(topic.internal_count, 0) + + @patch.dict(os.environ, {}, clear=True) @patch('src.python_ms_core.core.topic.azure_topic.logger') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_context') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_all_start_methods', return_value=['fork', 'spawn']) @patch('src.python_ms_core.core.topic.azure_topic.AutoLockRenewer') @patch('src.python_ms_core.core.topic.azure_topic.ServiceBusClient') - def test_settle_task_logs_error_and_releases_slot(self, mock_service_bus_client, mock_auto_lock_renewer, mock_logger): + def test_settle_task_abandons_message_when_worker_exits_without_result( + self, + mock_service_bus_client, + mock_auto_lock_renewer, + mock_get_all_start_methods, + mock_get_context, + mock_logger, + ): mock_client = MagicMock() mock_receiver = MagicMock() mock_message = MagicMock() - mock_future = MagicMock() mock_config = MagicMock(connection_string='Endpoint=sb://test/') + mock_message._lock_expired = False + mock_message.message_id = 'message-2' + mock_service_bus_client.from_connection_string.return_value = mock_client + mock_client.get_topic_sender.return_value = MagicMock() + mock_client.get_subscription_receiver.return_value = mock_receiver + mock_get_context.return_value = MagicMock() + topic = AzureTopic(config=mock_config, topic_name='mock-topic', max_concurrent_messages=1) + topic.receiver = mock_receiver + topic.internal_count = 1 + + topic._settle_task( + CompletedTask(error=RuntimeError('worker died')), + incoming_message=mock_message, + ) + + mock_receiver.complete_message.assert_not_called() + mock_receiver.abandon_message.assert_called_once_with(mock_message) + mock_logger.error.assert_called_once_with( + 'Processing failed for message %s: %s', + 'message-2', + 'Callback worker exited before returning a result: worker died', + ) + self.assertEqual(topic.internal_count, 0) + + @patch.dict(os.environ, {}, clear=True) + @patch('src.python_ms_core.core.topic.azure_topic.logger') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_context') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_all_start_methods', return_value=['fork', 'spawn']) + @patch('src.python_ms_core.core.topic.azure_topic.AutoLockRenewer') + @patch('src.python_ms_core.core.topic.azure_topic.ServiceBusClient') + def test_settle_task_logs_error_and_releases_slot( + self, + mock_service_bus_client, + mock_auto_lock_renewer, + mock_get_all_start_methods, + mock_get_context, + mock_logger, + ): + mock_client = MagicMock() + mock_receiver = MagicMock() + mock_message = MagicMock() + mock_config = MagicMock(connection_string='Endpoint=sb://test/') + + mock_message._lock_expired = False mock_service_bus_client.from_connection_string.return_value = mock_client mock_client.get_topic_sender.return_value = MagicMock() mock_client.get_subscription_receiver.return_value = mock_receiver - mock_future.result.return_value = [True, mock_message] mock_receiver.complete_message.side_effect = Exception('Mocked settlement failure') + mock_get_context.return_value = MagicMock() topic = AzureTopic(config=mock_config, topic_name='mock-topic', max_concurrent_messages=1) topic.receiver = mock_receiver topic.internal_count = 1 - topic._settle_task(mock_future, incoming_message=mock_message) + topic._settle_task( + CompletedTask({'success': True, 'error': None}), + incoming_message=mock_message, + ) mock_receiver.complete_message.assert_called_once_with(mock_message) mock_logger.error.assert_called_once_with('Error in settling message: Mocked settlement failure') self.assertEqual(topic.internal_count, 0) + @patch.dict(os.environ, {}, clear=True) @patch('src.python_ms_core.core.topic.azure_topic.logger') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_context') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_all_start_methods', return_value=['fork', 'spawn']) @patch('src.python_ms_core.core.topic.azure_topic.AutoLockRenewer') @patch('src.python_ms_core.core.topic.azure_topic.ServiceBusClient') - def test_settle_task_skips_expired_message(self, mock_service_bus_client, mock_auto_lock_renewer, mock_logger): + def test_settle_task_skips_expired_message( + self, + mock_service_bus_client, + mock_auto_lock_renewer, + mock_get_all_start_methods, + mock_get_context, + mock_logger, + ): mock_client = MagicMock() mock_receiver = MagicMock() mock_message = MagicMock() - mock_future = MagicMock() mock_config = MagicMock(connection_string='Endpoint=sb://test/') mock_message._lock_expired = True @@ -103,13 +324,16 @@ def test_settle_task_skips_expired_message(self, mock_service_bus_client, mock_a mock_service_bus_client.from_connection_string.return_value = mock_client mock_client.get_topic_sender.return_value = MagicMock() mock_client.get_subscription_receiver.return_value = mock_receiver - mock_future.result.return_value = [True, mock_message] + mock_get_context.return_value = MagicMock() topic = AzureTopic(config=mock_config, topic_name='mock-topic', max_concurrent_messages=1) topic.receiver = mock_receiver topic.internal_count = 1 - topic._settle_task(mock_future, incoming_message=mock_message) + topic._settle_task( + CompletedTask({'success': True, 'error': None}), + incoming_message=mock_message, + ) mock_receiver.complete_message.assert_not_called() mock_receiver.abandon_message.assert_not_called() @@ -119,13 +343,18 @@ def test_settle_task_skips_expired_message(self, mock_service_bus_client, mock_a ) self.assertEqual(topic.internal_count, 0) + @patch.dict(os.environ, {}, clear=True) @patch('src.python_ms_core.core.topic.azure_topic.logger') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_context') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_all_start_methods', return_value=['fork', 'spawn']) @patch('src.python_ms_core.core.topic.azure_topic.AutoLockRenewer') @patch('src.python_ms_core.core.topic.azure_topic.ServiceBusClient') def test_handle_lock_renew_failure_logs_when_sdk_returns_no_error( self, mock_service_bus_client, mock_auto_lock_renewer, + mock_get_all_start_methods, + mock_get_context, mock_logger, ): mock_client = MagicMock() @@ -134,6 +363,7 @@ def test_handle_lock_renew_failure_logs_when_sdk_returns_no_error( mock_service_bus_client.from_connection_string.return_value = mock_client mock_client.get_topic_sender.return_value = MagicMock() + mock_get_context.return_value = MagicMock() mock_message.message_id = 'message-1' mock_message.locked_until_utc = '2026-03-17T09:39:28Z' mock_message.auto_renew_error = None @@ -159,13 +389,18 @@ def test_handle_lock_renew_failure_logs_when_sdk_returns_no_error( 'stats=gen0[collections=1, collected=2, uncollectable=0]' ) + @patch.dict(os.environ, {}, clear=True) @patch('src.python_ms_core.core.topic.azure_topic.logger') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_context') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_all_start_methods', return_value=['fork', 'spawn']) @patch('src.python_ms_core.core.topic.azure_topic.AutoLockRenewer') @patch('src.python_ms_core.core.topic.azure_topic.ServiceBusClient') def test_lock_renew_attempt_logs_memory_snapshot( self, mock_service_bus_client, mock_auto_lock_renewer, + mock_get_all_start_methods, + mock_get_context, mock_logger, ): mock_client = MagicMock() @@ -173,6 +408,7 @@ def test_lock_renew_attempt_logs_memory_snapshot( mock_service_bus_client.from_connection_string.return_value = mock_client mock_client.get_topic_sender.return_value = MagicMock() mock_auto_lock_renewer.return_value = MagicMock() + mock_get_context.return_value = MagicMock() topic = AzureTopic(config=mock_config, topic_name='mock-topic', max_concurrent_messages=1) topic._get_runtime_snapshot = MagicMock( diff --git a/version.py b/version.py index f0cce3d..3d43659 100644 --- a/version.py +++ b/version.py @@ -1,4 +1,4 @@ -version = '0.2.5.1' -lastCommit = '30e447b527d6744b5ce72e9460f21e22675a3126' -lastCommitShort = '30e4' -buildDate = '2026-03-17' +version = '0.2.5.2' +lastCommit = '7c4a4ad16d5e8825587366d91a17dd3b71598bb6' +lastCommitShort = '7c4a' +buildDate = '2026-03-18' From a00ea6dbb8a967dc21ed882f112da483d68496a3 Mon Sep 17 00:00:00 2001 From: sujata-m Date: Thu, 19 Mar 2026 14:25:15 +0530 Subject: [PATCH 4/5] remove version file --- src/python_ms_core/version.py | 2 +- version.py | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) delete mode 100644 version.py diff --git a/src/python_ms_core/version.py b/src/python_ms_core/version.py index c628dd5..13a85f7 100644 --- a/src/python_ms_core/version.py +++ b/src/python_ms_core/version.py @@ -1 +1 @@ -__version__ = '0.2.5.2' +__version__ = '0.2.5' diff --git a/version.py b/version.py deleted file mode 100644 index 3d43659..0000000 --- a/version.py +++ /dev/null @@ -1,4 +0,0 @@ -version = '0.2.5.2' -lastCommit = '7c4a4ad16d5e8825587366d91a17dd3b71598bb6' -lastCommitShort = '7c4a' -buildDate = '2026-03-18' From 38d452bd53d2c1b922b19bedb1122db15fc34664 Mon Sep 17 00:00:00 2001 From: sujata-m Date: Thu, 19 Mar 2026 15:24:56 +0530 Subject: [PATCH 5/5] Removed unused code --- src/python_ms_core/core/topic/azure_topic.py | 123 +----------------- .../unit_tests/test_topic/test_azure_topic.py | 65 +-------- 2 files changed, 6 insertions(+), 182 deletions(-) diff --git a/src/python_ms_core/core/topic/azure_topic.py b/src/python_ms_core/core/topic/azure_topic.py index dc8eb38..4cf741e 100644 --- a/src/python_ms_core/core/topic/azure_topic.py +++ b/src/python_ms_core/core/topic/azure_topic.py @@ -1,5 +1,4 @@ import json -import gc import logging import multiprocessing as mp import os @@ -13,15 +12,8 @@ from azure.servicebus import AutoLockRenewer import threading -try: - import psutil -except ImportError: # pragma: no cover - dependency exists in the package, but keep logging resilient. - psutil = None - -logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s', datefmt='%Y-%m-%d %H:%M:%S') logger = logging.getLogger('AzureTopic') -logger.setLevel(logging.INFO) """ @@ -59,14 +51,8 @@ def __init__(self, config: TopicConfig=None, topic_name=None, max_concurrent_mes self.callback_process_start_method = self._get_process_start_method() self.process_context = self._get_process_context() self.internal_count = 0 - self.max_renewal_duration = self._get_positive_int_from_env( - 'TOPIC_MAX_LOCK_RENEWAL_DURATION_SECONDS', - 86400, - ) # Renew the message upto 1 day - self.lock_renewal_margin = self._get_positive_int_from_env( - 'TOPIC_LOCK_RENEWAL_MARGIN_SECONDS', - 60, - ) + self.max_renewal_duration = 86400 # Renew the message upto 1 day + self.lock_renewal_margin = 60 renewer_max_workers = max(max_concurrent_messages, 2) self.lock_renewal = AutoLockRenewer( max_lock_renewal_duration=self.max_renewal_duration, @@ -76,22 +62,9 @@ def __init__(self, config: TopicConfig=None, topic_name=None, max_concurrent_mes # The SDK default renews only in the last 10 seconds of the lock window. # Start earlier so long-running jobs have more headroom for scheduler jitter. self.lock_renewal._renew_period = min(self.lock_renewal_margin, self.max_renewal_duration) - self.lock_renew_receiver = _LockRenewLoggingReceiver(self) self.wait_time_for_message = 5 self.thread_lock = threading.Lock() self.pending_tasks = [] - self._process = None - self._prime_runtime_samplers() - logger.info( - 'Configured AzureTopic %s: execution_mode=%s, process_start_method=%s, ' - 'max_lock_renewal_duration=%s seconds, renew_margin=%s seconds, renewer_max_workers=%s', - self.topic_name, - self.callback_execution_mode, - self.callback_process_start_method, - self.max_renewal_duration, - self.lock_renewal_margin, - renewer_max_workers, - ) def publish(self, data: QueueMessage): @@ -135,7 +108,7 @@ def subscribe(self, subscription: str, callback, max_receivable_messages=-1): for message in messages: execution_task = self._submit_processing_task(message, callback) self.lock_renewal.register( - self.lock_renew_receiver, + self.receiver, message, max_lock_renewal_duration=self.max_renewal_duration, on_lock_renew_failure=self._handle_lock_renew_failure, @@ -283,88 +256,13 @@ def _handle_lock_renew_failure(self, renewable, error): failure_reason = error or getattr(renewable, 'auto_renew_error', None) or 'lock expired before renewal could complete' logger.error( f'Error renewing lock for message {message_id}: {failure_reason}; ' - f'locked_until_utc={getattr(renewable, "locked_until_utc", None)}; ' - f'runtime_snapshot={self._get_runtime_snapshot()}' + f'locked_until_utc={getattr(renewable, "locked_until_utc", None)}' ) @staticmethod def _get_message_id(message): return getattr(message, 'message_id', None) or getattr(message, 'messageId', 'unknown') - def _prime_runtime_samplers(self): - if psutil is None: - return - try: - self._process = psutil.Process(os.getpid()) - self._process.cpu_percent(interval=None) - psutil.cpu_percent(interval=None) - except Exception: # pragma: no cover - best effort diagnostics - self._process = None - - def _get_runtime_snapshot(self): - return f'{self._get_memory_snapshot()}, {self._get_cpu_snapshot()}, {self._get_gc_snapshot()}' - - def _get_memory_snapshot(self): - if self._process is None: - return 'memory=psutil-unavailable' - try: - memory_info = self._process.memory_info() - rss_mb = memory_info.rss / (1024 * 1024) - vms_mb = memory_info.vms / (1024 * 1024) - return f'memory=rss_mb={rss_mb:.2f}, vms_mb={vms_mb:.2f}, num_threads={self._process.num_threads()}' - except Exception as exc: # pragma: no cover - diagnostic fallback - return f'memory=unavailable({exc})' - - def _get_cpu_snapshot(self): - if self._process is None: - return 'cpu=psutil-unavailable' - try: - process_cpu_percent = self._process.cpu_percent(interval=None) - system_cpu_percent = psutil.cpu_percent(interval=None) - return ( - f'cpu=process_percent={process_cpu_percent:.2f}, ' - f'system_percent={system_cpu_percent:.2f}' - ) - except Exception as exc: # pragma: no cover - diagnostic fallback - return f'cpu=unavailable({exc})' - - @staticmethod - def _get_gc_snapshot(): - try: - gc_counts = gc.get_count() - gc_thresholds = gc.get_threshold() - gc_stats = gc.get_stats() - generation_summaries = [] - for generation, stats in enumerate(gc_stats): - generation_summaries.append( - 'gen{generation}[collections={collections}, collected={collected}, uncollectable={uncollectable}]'.format( - generation=generation, - collections=stats.get('collections', 0), - collected=stats.get('collected', 0), - uncollectable=stats.get('uncollectable', 0), - ) - ) - return ( - f'gc=enabled={gc.isenabled()}, counts={gc_counts}, thresholds={gc_thresholds}, ' - f'stats={"; ".join(generation_summaries)}' - ) - except Exception as exc: # pragma: no cover - diagnostic fallback - return f'gc=unavailable({exc})' - - @staticmethod - def _get_positive_int_from_env(name, default): - value = os.environ.get(name) - if value is None: - return default - try: - parsed = int(value) - if parsed > 0: - return parsed - except (TypeError, ValueError): - pass - logger.warning(f'Invalid value for {name}: {value}. Using default {default}.') - return default - @staticmethod def _get_callback_execution_mode(): value = os.environ.get('TOPIC_CALLBACK_EXECUTION_MODE', 'process') @@ -405,19 +303,6 @@ def _get_process_context(self): self.callback_execution_mode = 'thread' return None -class _LockRenewLoggingReceiver: - def __init__(self, topic): - self._topic = topic - - def renew_message_lock(self, renewable): - logger.info( - 'Attempting lock renewal for message %s; locked_until_utc=%s; runtime_snapshot=%s', - self._topic._get_message_id(renewable), - getattr(renewable, 'locked_until_utc', None), - self._topic._get_runtime_snapshot(), - ) - return self._topic.receiver.renew_message_lock(renewable) - def _run_callback_in_subprocess(message_payload, callbackfn, result_connection): try: diff --git a/tests/unit_tests/test_topic/test_azure_topic.py b/tests/unit_tests/test_topic/test_azure_topic.py index f8d9954..6b68937 100644 --- a/tests/unit_tests/test_topic/test_azure_topic.py +++ b/tests/unit_tests/test_topic/test_azure_topic.py @@ -163,7 +163,7 @@ def test_subscribe_settles_completed_tasks_on_receiver_loop( topic._submit_processing_task.assert_called_once_with(mock_message, callback) mock_auto_lock_renewer.return_value.register.assert_called_once_with( - topic.lock_renew_receiver, + mock_receiver, mock_message, max_lock_renewal_duration=topic.max_renewal_duration, on_lock_renew_failure=topic._handle_lock_renew_failure, @@ -369,74 +369,13 @@ def test_handle_lock_renew_failure_logs_when_sdk_returns_no_error( mock_message.auto_renew_error = None topic = AzureTopic(config=mock_config, topic_name='mock-topic', max_concurrent_messages=1) - topic._get_runtime_snapshot = MagicMock( - return_value=( - 'memory=rss_mb=128.00, vms_mb=256.00, num_threads=4, ' - 'cpu=process_percent=80.00, system_percent=91.00, ' - 'gc=enabled=True, counts=(1, 2, 3), thresholds=(700, 10, 10), ' - 'stats=gen0[collections=1, collected=2, uncollectable=0]' - ) - ) topic._handle_lock_renew_failure(mock_message, None) mock_logger.error.assert_called_once_with( 'Error renewing lock for message message-1: lock expired before renewal could complete; ' - 'locked_until_utc=2026-03-17T09:39:28Z; ' - 'runtime_snapshot=memory=rss_mb=128.00, vms_mb=256.00, num_threads=4, ' - 'cpu=process_percent=80.00, system_percent=91.00, ' - 'gc=enabled=True, counts=(1, 2, 3), thresholds=(700, 10, 10), ' - 'stats=gen0[collections=1, collected=2, uncollectable=0]' - ) - - @patch.dict(os.environ, {}, clear=True) - @patch('src.python_ms_core.core.topic.azure_topic.logger') - @patch('src.python_ms_core.core.topic.azure_topic.mp.get_context') - @patch('src.python_ms_core.core.topic.azure_topic.mp.get_all_start_methods', return_value=['fork', 'spawn']) - @patch('src.python_ms_core.core.topic.azure_topic.AutoLockRenewer') - @patch('src.python_ms_core.core.topic.azure_topic.ServiceBusClient') - def test_lock_renew_attempt_logs_memory_snapshot( - self, - mock_service_bus_client, - mock_auto_lock_renewer, - mock_get_all_start_methods, - mock_get_context, - mock_logger, - ): - mock_client = MagicMock() - mock_config = MagicMock(connection_string='Endpoint=sb://test/') - mock_service_bus_client.from_connection_string.return_value = mock_client - mock_client.get_topic_sender.return_value = MagicMock() - mock_auto_lock_renewer.return_value = MagicMock() - mock_get_context.return_value = MagicMock() - - topic = AzureTopic(config=mock_config, topic_name='mock-topic', max_concurrent_messages=1) - topic._get_runtime_snapshot = MagicMock( - return_value=( - 'memory=rss_mb=64.00, vms_mb=128.00, num_threads=3, ' - 'cpu=process_percent=55.00, system_percent=70.00, ' - 'gc=enabled=True, counts=(4, 5, 6), thresholds=(700, 10, 10), ' - 'stats=gen0[collections=3, collected=10, uncollectable=0]' - ) - ) - - mock_message = MagicMock() - mock_message.message_id = 'message-2' - mock_message.locked_until_utc = '2026-03-17T09:39:33Z' - topic.receiver = MagicMock() - - topic.lock_renew_receiver.renew_message_lock(mock_message) - - mock_logger.info.assert_any_call( - 'Attempting lock renewal for message %s; locked_until_utc=%s; runtime_snapshot=%s', - 'message-2', - '2026-03-17T09:39:33Z', - 'memory=rss_mb=64.00, vms_mb=128.00, num_threads=3, ' - 'cpu=process_percent=55.00, system_percent=70.00, ' - 'gc=enabled=True, counts=(4, 5, 6), thresholds=(700, 10, 10), ' - 'stats=gen0[collections=3, collected=10, uncollectable=0]', + 'locked_until_utc=2026-03-17T09:39:28Z' ) - topic.receiver.renew_message_lock.assert_called_once_with(mock_message) if __name__ == '__main__':