From eb5055f7dd71a0726934c41195a4132b256f2a65 Mon Sep 17 00:00:00 2001 From: micoleaoo Date: Wed, 4 Feb 2026 12:31:47 +0000 Subject: [PATCH] replaced print statements with python logging --- gunicorn_conf.py | 4 +++- tesp_api/service/error.py | 14 ++++--------- tesp_api/service/event_actions.py | 35 +++++++++++++++++-------------- 3 files changed, 26 insertions(+), 27 deletions(-) diff --git a/gunicorn_conf.py b/gunicorn_conf.py index c4e5200..cf288b5 100644 --- a/gunicorn_conf.py +++ b/gunicorn_conf.py @@ -3,6 +3,7 @@ import json import multiprocessing import os +import logging from tesp_api.config import gunicorn_logger @@ -42,4 +43,5 @@ "host": host, "port": port, } -print(json.dumps(log_data)) +logging.basicConfig(level=logging.INFO) +logging.getLogger("gunicorn.config").info(json.dumps(log_data)) diff --git a/tesp_api/service/error.py b/tesp_api/service/error.py index 0fc73d9..69045bb 100644 --- a/tesp_api/service/error.py +++ b/tesp_api/service/error.py @@ -73,7 +73,6 @@ def pulsar_event_handle_error(error: Exception, task_id: ObjectId, logger.warning(f'One of the task [id: {task_id_str}] executors execution finished with error while ' f'executing event [event_name: {event_name}]. This log is here just for reference and can be' f' ignored since it originates from executor. Will try to cancel respective Pulsar job.') - traceback.print_stack() return repo_update_error_task_promise(task_id, TesTaskState.EXECUTOR_ERROR, Nothing)\ .then(lambda ignored: pulsar_cancel_task_promise(task_id, pulsar_operations)) @@ -81,39 +80,34 @@ def pulsar_event_handle_error(error: Exception, task_id: ObjectId, logger.warning(f'Task reached unexpected state [msg: {task_not_found_error}] while executing event ' f'[event_name: {event_name}]. This might be a result of client canceling task. ' f'Execution will not continue') - traceback.print_stack() return Promise(lambda resolve, reject: resolve(None)) case CustomDataLayerError(): - logger.error(f'Data layer error occurred while executing task event [event_name: {event_name}, ' + logger.opt(exception=True).error(f'Data layer error occurred while executing task event [event_name: {event_name}, ' f'task_id: {task_id_str}]. Will try to request Pulsar for job cancellation if possible.') - traceback.print_stack() return pulsar_cancel_task_promise(str(task_id), pulsar_operations) case PulsarLayerConnectionError() as pulsar_layer_connection_error: - logger.error(f'Pulsar connection error occurred while executing task event [event_name: {event_name}, ' + logger.opt(exception=True).error(f'Pulsar connection error occurred while executing task event [event_name: {event_name}, ' f'task_id: {task_id_str}, msg: {pulsar_layer_connection_error}]. Will try to cancel task ' f'and set it up with error message in the syslog attribute.') - traceback.print_stack() syslog: Maybe[str] = Just('Connection error with underlying task executor') return repo_update_error_task_promise(task_id, TesTaskState.SYSTEM_ERROR, syslog) case PulsarOperationsError() as pulsar_operations_error: - logger.warning(f'Pulsar operations error occurred while executing task event [event_name: {event_name}, ' + logger.opt(exception=True).warning(f'Pulsar operations error occurred while executing task event [event_name: {event_name}, ' f'task_id: {task_id_str}, msg: {pulsar_operations_error}]. This indicates uncommon problem ' f'with processing/executing task therefore it will be set up with error message in the ' f'syslog attribute. Will try to cancel task and respective Pulsar job.') - traceback.print_stack() syslog: Maybe[str] = Just(f"Uncommon error occurred while working with underlying task executor. " f"[msg: {pulsar_operations_error.message}]") return pulsar_cancel_task_promise(task_id_str, pulsar_operations)\ .then(lambda ignored: repo_update_error_task_promise(task_id, TesTaskState.SYSTEM_ERROR, syslog)) case _ as unknown_error: - logger.error(f'Unknown error occurred while executing task event [event_name: {event_name}, ' + logger.opt(exception=True).error(f'Unknown error occurred while executing task event [event_name: {event_name}, ' f'task_id: {task_id_str}, msg: {unknown_error}]. Such error was not expected leading to ' f'unrecoverable state. Will try to cancel task and respective Pulsar job.') - traceback.print_stack() syslog: Maybe[str] = Just("Unexpected error occurred while processing/executing the task") return pulsar_cancel_task_promise(task_id_str, pulsar_operations)\ .then(lambda ignored: repo_update_error_task_promise(task_id, TesTaskState.SYSTEM_ERROR, syslog)) diff --git a/tesp_api/service/event_actions.py b/tesp_api/service/event_actions.py index 5f70eaf..5401a56 100644 --- a/tesp_api/service/event_actions.py +++ b/tesp_api/service/event_actions.py @@ -4,6 +4,8 @@ from pathlib import Path import asyncio +from loguru import logger + from pymonad.maybe import Just, Nothing from bson.objectid import ObjectId from pymonad.promise import Promise, _Promise @@ -36,7 +38,7 @@ def handle_queued_task(event: Event) -> None: Dispatches the task to a REST or AMQP specific handler based on Pulsar operations type. """ event_name, payload = event - print(f"Queued task: {payload.get('task_id')}") + logger.info(f"Queued task: {payload.get('task_id')}") match pulsar_service.get_operations(): case PulsarRestOperations() as pulsar_rest_operations: dispatch_event('queued_task_rest', {**payload, 'pulsar_operations': pulsar_rest_operations}) @@ -53,7 +55,7 @@ async def handle_queued_task_rest(event: Event): task_id: ObjectId = payload['task_id'] pulsar_operations: PulsarRestOperations = payload['pulsar_operations'] - print(f"Queued task rest: {task_id}") + logger.debug(f"Queued task rest: {task_id}") await Promise(lambda resolve, reject: resolve(None)) \ .then(lambda nothing: pulsar_operations.setup_job(task_id)) \ @@ -71,7 +73,7 @@ async def handle_queued_task_amqp(event: Event): task_id: ObjectId = payload['task_id'] pulsar_operations: PulsarAmqpOperations = payload['pulsar_operations'] - print(f"Queued task AMQP: {task_id}") + logger.debug(f"Queued task AMQP: {task_id}") try: # Setup job via AMQP @@ -134,7 +136,8 @@ async def setup_data(job_id: ObjectId, return resource_conf, volume_confs, input_confs, output_confs - print(f"Initializing task: {task_id}") + logger.info(f"Initializing task: {task_id}") + await Promise(lambda resolve, reject: resolve(None)) \ .then(lambda nothing: task_repository.update_task_state( task_id, @@ -178,7 +181,7 @@ async def handle_run_task(event: Event) -> None: command_start_time = datetime.datetime.now(datetime.timezone.utc) try: - print(f"Running task: {task_id}") + logger.info(f"Running task: {task_id}") # Set task state to RUNNING task_monad_init = await task_repository.update_task_state( task_id, @@ -191,7 +194,7 @@ async def handle_run_task(event: Event) -> None: current_task_after_init_monad = await task_repository.get_task(maybe_of(author), {'_id': task_id}) current_task_after_init = get_else_throw(current_task_after_init_monad, TaskNotFoundError(task_id)) if current_task_after_init.state == TesTaskState.CANCELED: - print(f"Task {task_id} found CANCELED shortly after RUNNING state update. Aborting handler.") + logger.warning(f"Task {task_id} found CANCELED shortly after RUNNING state update. Aborting handler.") return await update_last_task_log_time( @@ -242,10 +245,10 @@ async def handle_run_task(event: Event) -> None: command_status: dict if run_command_str is None: - print(f"Task {task_id} has no commands to run. Treating as successful no-op.") + logger.warning(f"Task {task_id} has no commands to run. Treating as successful no-op.") command_status = {'stdout': '', 'stderr': 'No commands to run.', 'returncode': 0} else: - print(f"Submitting job to Pulsar for task {task_id}: {run_command_str}") + logger.debug(f"Submitting job to Pulsar for task {task_id}: {run_command_str}") await pulsar_operations.run_job(task_id, run_command_str) command_status = await pulsar_operations.job_status_complete(str(task_id)) @@ -261,17 +264,17 @@ async def handle_run_task(event: Event) -> None: current_task_obj = get_else_throw(current_task_monad, TaskNotFoundError(task_id)) if current_task_obj.state == TesTaskState.CANCELED: - print(f"Task {task_id} found CANCELED after job completion polling. Aborting state changes.") + logger.warning(f"Task {task_id} found CANCELED after job completion polling. Aborting state changes.") return if command_status.get('returncode', -1) != 0: - print( + logger.error( f"Task {task_id} executor error (return code: {command_status.get('returncode', -1)}). Setting state to EXECUTOR_ERROR.") await task_repository.update_task_state(task_id, TesTaskState.RUNNING, TesTaskState.EXECUTOR_ERROR) await pulsar_operations.erase_job(task_id) return - print(f"Task {task_id} completed successfully. Setting state to COMPLETE.") + logger.info(f"Task {task_id} completed successfully. Setting state to COMPLETE.") await Promise(lambda resolve, reject: resolve(None)) \ .then(lambda ignored: task_repository.update_task_state( task_id, TesTaskState.RUNNING, TesTaskState.COMPLETE @@ -284,22 +287,22 @@ async def handle_run_task(event: Event) -> None: .then(lambda x: x) except asyncio.CancelledError: - print(f"handle_run_task for task {task_id} was explicitly cancelled (asyncio.CancelledError).") + logger.warning(f"handle_run_task for task {task_id} was explicitly cancelled (asyncio.CancelledError).") await task_repository.update_task_state(task_id, None, TesTaskState.CANCELED) await pulsar_operations.kill_job(task_id) await pulsar_operations.erase_job(task_id) - print(f"Task {task_id} Pulsar job cleanup attempted after asyncio cancellation.") + logger.info(f"Task {task_id} Pulsar job cleanup attempted after asyncio cancellation.") except Exception as error: - print(f"Exception in handle_run_task for task {task_id}: {type(error).__name__} - {error}") + logger.error(f"Exception in handle_run_task for task {task_id}: {type(error).__name__} - {error}") task_state_after_error_monad = await task_repository.get_task(maybe_of(author), {'_id': task_id}) if task_state_after_error_monad.is_just() and task_state_after_error_monad.value.state == TesTaskState.CANCELED: - print( + logger.info( f"Task {task_id} is already CANCELED. Exception '{type(error).__name__}' likely due to this. No further error processing by handler.") return - print(f"Task {task_id} not CANCELED; proceeding with pulsar_event_handle_error for '{type(error).__name__}'.") + logger.debug(f"Task {task_id} not CANCELED; proceeding with pulsar_event_handle_error for '{type(error).__name__}'.") error_handler_result = pulsar_event_handle_error(error, task_id, event_name, pulsar_operations) if asyncio.iscoroutine(error_handler_result) or isinstance(error_handler_result, _Promise): await error_handler_result