Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion gunicorn_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import multiprocessing
import os
import logging

from tesp_api.config import gunicorn_logger

Expand Down Expand Up @@ -42,4 +43,5 @@
"host": host,
"port": port,
}
print(json.dumps(log_data))
logging.basicConfig(level=logging.INFO)
logging.getLogger("gunicorn.config").info(json.dumps(log_data))
14 changes: 4 additions & 10 deletions tesp_api/service/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,47 +73,41 @@ def pulsar_event_handle_error(error: Exception, task_id: ObjectId,
logger.warning(f'One of the task [id: {task_id_str}] executors execution finished with error while '
f'executing event [event_name: {event_name}]. This log is here just for reference and can be'
f' ignored since it originates from executor. Will try to cancel respective Pulsar job.')
traceback.print_stack()
return repo_update_error_task_promise(task_id, TesTaskState.EXECUTOR_ERROR, Nothing)\
.then(lambda ignored: pulsar_cancel_task_promise(task_id, pulsar_operations))

case TaskNotFoundError() as task_not_found_error:
logger.warning(f'Task reached unexpected state [msg: {task_not_found_error}] while executing event '
f'[event_name: {event_name}]. This might be a result of client canceling task. '
f'Execution will not continue')
traceback.print_stack()
return Promise(lambda resolve, reject: resolve(None))

case CustomDataLayerError():
logger.error(f'Data layer error occurred while executing task event [event_name: {event_name}, '
logger.opt(exception=True).error(f'Data layer error occurred while executing task event [event_name: {event_name}, '
f'task_id: {task_id_str}]. Will try to request Pulsar for job cancellation if possible.')
traceback.print_stack()
return pulsar_cancel_task_promise(str(task_id), pulsar_operations)

case PulsarLayerConnectionError() as pulsar_layer_connection_error:
logger.error(f'Pulsar connection error occurred while executing task event [event_name: {event_name}, '
logger.opt(exception=True).error(f'Pulsar connection error occurred while executing task event [event_name: {event_name}, '
f'task_id: {task_id_str}, msg: {pulsar_layer_connection_error}]. Will try to cancel task '
f'and set it up with error message in the syslog attribute.')
traceback.print_stack()
syslog: Maybe[str] = Just('Connection error with underlying task executor')
return repo_update_error_task_promise(task_id, TesTaskState.SYSTEM_ERROR, syslog)

case PulsarOperationsError() as pulsar_operations_error:
logger.warning(f'Pulsar operations error occurred while executing task event [event_name: {event_name}, '
logger.opt(exception=True).warning(f'Pulsar operations error occurred while executing task event [event_name: {event_name}, '
f'task_id: {task_id_str}, msg: {pulsar_operations_error}]. This indicates uncommon problem '
f'with processing/executing task therefore it will be set up with error message in the '
f'syslog attribute. Will try to cancel task and respective Pulsar job.')
traceback.print_stack()
syslog: Maybe[str] = Just(f"Uncommon error occurred while working with underlying task executor. "
f"[msg: {pulsar_operations_error.message}]")
return pulsar_cancel_task_promise(task_id_str, pulsar_operations)\
.then(lambda ignored: repo_update_error_task_promise(task_id, TesTaskState.SYSTEM_ERROR, syslog))

case _ as unknown_error:
logger.error(f'Unknown error occurred while executing task event [event_name: {event_name}, '
logger.opt(exception=True).error(f'Unknown error occurred while executing task event [event_name: {event_name}, '
f'task_id: {task_id_str}, msg: {unknown_error}]. Such error was not expected leading to '
f'unrecoverable state. Will try to cancel task and respective Pulsar job.')
traceback.print_stack()
syslog: Maybe[str] = Just("Unexpected error occurred while processing/executing the task")
return pulsar_cancel_task_promise(task_id_str, pulsar_operations)\
.then(lambda ignored: repo_update_error_task_promise(task_id, TesTaskState.SYSTEM_ERROR, syslog))
35 changes: 19 additions & 16 deletions tesp_api/service/event_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from pathlib import Path
import asyncio

from loguru import logger

from pymonad.maybe import Just, Nothing
from bson.objectid import ObjectId
from pymonad.promise import Promise, _Promise
Expand Down Expand Up @@ -36,7 +38,7 @@ def handle_queued_task(event: Event) -> None:
Dispatches the task to a REST or AMQP specific handler based on Pulsar operations type.
"""
event_name, payload = event
print(f"Queued task: {payload.get('task_id')}")
logger.info(f"Queued task: {payload.get('task_id')}")
match pulsar_service.get_operations():
case PulsarRestOperations() as pulsar_rest_operations:
dispatch_event('queued_task_rest', {**payload, 'pulsar_operations': pulsar_rest_operations})
Expand All @@ -53,7 +55,7 @@ async def handle_queued_task_rest(event: Event):
task_id: ObjectId = payload['task_id']
pulsar_operations: PulsarRestOperations = payload['pulsar_operations']

print(f"Queued task rest: {task_id}")
logger.debug(f"Queued task rest: {task_id}")

await Promise(lambda resolve, reject: resolve(None)) \
.then(lambda nothing: pulsar_operations.setup_job(task_id)) \
Expand All @@ -71,7 +73,7 @@ async def handle_queued_task_amqp(event: Event):
task_id: ObjectId = payload['task_id']
pulsar_operations: PulsarAmqpOperations = payload['pulsar_operations']

print(f"Queued task AMQP: {task_id}")
logger.debug(f"Queued task AMQP: {task_id}")

try:
# Setup job via AMQP
Expand Down Expand Up @@ -134,7 +136,8 @@ async def setup_data(job_id: ObjectId,

return resource_conf, volume_confs, input_confs, output_confs

print(f"Initializing task: {task_id}")
logger.info(f"Initializing task: {task_id}")

await Promise(lambda resolve, reject: resolve(None)) \
.then(lambda nothing: task_repository.update_task_state(
task_id,
Expand Down Expand Up @@ -178,7 +181,7 @@ async def handle_run_task(event: Event) -> None:
command_start_time = datetime.datetime.now(datetime.timezone.utc)

try:
print(f"Running task: {task_id}")
logger.info(f"Running task: {task_id}")
# Set task state to RUNNING
task_monad_init = await task_repository.update_task_state(
task_id,
Expand All @@ -191,7 +194,7 @@ async def handle_run_task(event: Event) -> None:
current_task_after_init_monad = await task_repository.get_task(maybe_of(author), {'_id': task_id})
current_task_after_init = get_else_throw(current_task_after_init_monad, TaskNotFoundError(task_id))
if current_task_after_init.state == TesTaskState.CANCELED:
print(f"Task {task_id} found CANCELED shortly after RUNNING state update. Aborting handler.")
logger.warning(f"Task {task_id} found CANCELED shortly after RUNNING state update. Aborting handler.")
return

await update_last_task_log_time(
Expand Down Expand Up @@ -242,10 +245,10 @@ async def handle_run_task(event: Event) -> None:
command_status: dict

if run_command_str is None:
print(f"Task {task_id} has no commands to run. Treating as successful no-op.")
logger.warning(f"Task {task_id} has no commands to run. Treating as successful no-op.")
command_status = {'stdout': '', 'stderr': 'No commands to run.', 'returncode': 0}
else:
print(f"Submitting job to Pulsar for task {task_id}: {run_command_str}")
logger.debug(f"Submitting job to Pulsar for task {task_id}: {run_command_str}")
await pulsar_operations.run_job(task_id, run_command_str)
command_status = await pulsar_operations.job_status_complete(str(task_id))

Expand All @@ -261,17 +264,17 @@ async def handle_run_task(event: Event) -> None:
current_task_obj = get_else_throw(current_task_monad, TaskNotFoundError(task_id))

if current_task_obj.state == TesTaskState.CANCELED:
print(f"Task {task_id} found CANCELED after job completion polling. Aborting state changes.")
logger.warning(f"Task {task_id} found CANCELED after job completion polling. Aborting state changes.")
return

if command_status.get('returncode', -1) != 0:
print(
logger.error(
f"Task {task_id} executor error (return code: {command_status.get('returncode', -1)}). Setting state to EXECUTOR_ERROR.")
await task_repository.update_task_state(task_id, TesTaskState.RUNNING, TesTaskState.EXECUTOR_ERROR)
await pulsar_operations.erase_job(task_id)
return

print(f"Task {task_id} completed successfully. Setting state to COMPLETE.")
logger.info(f"Task {task_id} completed successfully. Setting state to COMPLETE.")
await Promise(lambda resolve, reject: resolve(None)) \
.then(lambda ignored: task_repository.update_task_state(
task_id, TesTaskState.RUNNING, TesTaskState.COMPLETE
Expand All @@ -284,22 +287,22 @@ async def handle_run_task(event: Event) -> None:
.then(lambda x: x)

except asyncio.CancelledError:
print(f"handle_run_task for task {task_id} was explicitly cancelled (asyncio.CancelledError).")
logger.warning(f"handle_run_task for task {task_id} was explicitly cancelled (asyncio.CancelledError).")
await task_repository.update_task_state(task_id, None, TesTaskState.CANCELED)
await pulsar_operations.kill_job(task_id)
await pulsar_operations.erase_job(task_id)
print(f"Task {task_id} Pulsar job cleanup attempted after asyncio cancellation.")
logger.info(f"Task {task_id} Pulsar job cleanup attempted after asyncio cancellation.")

except Exception as error:
print(f"Exception in handle_run_task for task {task_id}: {type(error).__name__} - {error}")
logger.error(f"Exception in handle_run_task for task {task_id}: {type(error).__name__} - {error}")

task_state_after_error_monad = await task_repository.get_task(maybe_of(author), {'_id': task_id})
if task_state_after_error_monad.is_just() and task_state_after_error_monad.value.state == TesTaskState.CANCELED:
print(
logger.info(
f"Task {task_id} is already CANCELED. Exception '{type(error).__name__}' likely due to this. No further error processing by handler.")
return

print(f"Task {task_id} not CANCELED; proceeding with pulsar_event_handle_error for '{type(error).__name__}'.")
logger.debug(f"Task {task_id} not CANCELED; proceeding with pulsar_event_handle_error for '{type(error).__name__}'.")
error_handler_result = pulsar_event_handle_error(error, task_id, event_name, pulsar_operations)
if asyncio.iscoroutine(error_handler_result) or isinstance(error_handler_result, _Promise):
await error_handler_result
Expand Down
Loading