Skip to content

Commit fac54b2

Browse files
authored
Merge pull request #82 from TaskarCenterAtUW/feature-stop-at-one-multiprocessing
Feature stop at one multiprocessing
2 parents 332f7ac + 771fc2b commit fac54b2

3 files changed

Lines changed: 21 additions & 6 deletions

File tree

requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
fastapi==0.88.0
22
pydantic==1.10.4
3-
python-ms-core==0.0.24
3+
python-ms-core==0.0.25
44
uvicorn==0.20.0
55
html_testRunner==1.2.1
66
geopandas==0.14.4

src/osw_validator.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,16 @@ def process(message) -> None:
4949
self.validate(received_message=upload_message)
5050

5151
self.listening_topic.subscribe(subscription=self.subscription_name, callback=process, max_receivable_messages=self._settings.max_receivable_messages)
52+
if self._settings.max_receivable_messages > 0:
53+
logger.info('Listener finished processing available messages; stopping server/container.')
54+
self._stop_server_and_container(delay_seconds=2)
5255

5356
def validate(self, received_message: Upload):
5457
tdei_record_id: str = ''
5558
status_sent = False
5659
try:
5760
tdei_record_id = received_message.message_id
58-
logger.info(f'Received message for : {tdei_record_id} Message received for OSW validation !')
61+
logger.info(f'Received message for : {tdei_record_id} Message received for OSW validation! Core version: {Core.__version__}')
5962

6063
if received_message.data.file_upload_path is None:
6164
error_msg = 'Request does not have valid file path specified.'
@@ -86,10 +89,9 @@ def validate(self, received_message: Upload):
8689
status_sent = True
8790
finally:
8891
if status_sent:
89-
logger.info('Triggering server shutdown after status send.')
92+
logger.info('Validation status sent for %s.', tdei_record_id)
9093
else:
91-
logger.warning('Server shutdown skipped because status was not sent.')
92-
self._stop_server_and_container(delay_seconds=2)
94+
logger.warning('Validation status was not sent for %s.', tdei_record_id)
9395

9496
def send_status(self, result: ValidationResult, upload_message: Upload):
9597
upload_message.data.success = result.is_valid

tests/unit_tests/test_service.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import unittest
2-
from unittest.mock import patch, MagicMock
2+
from unittest.mock import patch, MagicMock, ANY
33
from src.osw_validator import OSWValidator
44
from src.models.queue_message_content import Upload
55
from src.models.queue_message_content import ValidationResult
@@ -16,6 +16,7 @@ def setUp(self, mock_core, mock_settings, mock_thread):
1616
mock_settings.return_value.event_bus.upload_topic = 'test_request_topic'
1717
mock_settings.return_value.event_bus.validation_topic = 'test_response_topic'
1818
mock_settings.return_value.max_concurrent_messages = 10
19+
mock_settings.return_value.max_receivable_messages = -1
1920
mock_settings.return_value.get_download_directory.return_value = '/tmp'
2021
mock_settings.return_value.event_bus.container_name = 'test_container'
2122

@@ -63,6 +64,18 @@ def test_subscribe_with_valid_message(self, mock_upload, mock_queue_message):
6364
# Assert
6465
self.service.validate.assert_called_once_with(received_message=mock_upload_message)
6566

67+
def test_start_listening_stops_container_after_subscribe_returns(self):
68+
self.service._settings.max_receivable_messages = 1
69+
70+
self.service.start_listening()
71+
72+
self.service.listening_topic.subscribe.assert_called_once_with(
73+
subscription=self.service.subscription_name,
74+
callback=ANY,
75+
max_receivable_messages=1,
76+
)
77+
self.service._stop_server_and_container.assert_called_once_with(delay_seconds=2)
78+
6679
@patch('src.osw_validator.Validation')
6780
def test_validate_with_valid_file_path(self, mock_validation):
6881
# Arrange

0 commit comments

Comments
 (0)