Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pulpcore/exceptions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
ReplicateError,
SyncError,
PublishError,
TaskConfigurationError,
TaskTimeoutException,
)
from .validation import (
DigestValidationError,
Expand Down
49 changes: 49 additions & 0 deletions pulpcore/exceptions/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,3 +325,52 @@ class ReplicateError(PulpException):

def __str__(self):
return f"[{self.error_code}] " + _("Replication failed")


class TaskConfigurationError(PulpException):
"""
Raised when a task is incorrectly configured.
"""

error_code = "PLP0023"

def __init__(self, task_name, message):
"""
:param task_name: the fully qualified name of the task function
:type task_name: str
:param message: description of the configuration error
:type message: str
"""
self.task_name = task_name
self.message = message

def __str__(self):
return f"[{self.error_code}] " + _(
"Task type '{task_name}' is misconfigured: {message}"
).format(task_name=self.task_name, message=self.message)


class TaskTimeoutException(PulpException):
"""
Raised when an immediate task exceeds its execution timeout.
"""

error_code = "PLP0024"

def __init__(self, task_name, task_pk, timeout_seconds):
"""
:param task_name: the fully qualified name of the task function
:type task_name: str
:param task_pk: the unique task identifier
:type task_pk: str
:param timeout_seconds: the timeout value that was exceeded
:type timeout_seconds: int
"""
self.task_name = task_name
self.task_pk = task_pk
self.timeout_seconds = timeout_seconds

def __str__(self):
return f"[{self.error_code}] " + _(
"Immediate task {task_pk} (type: {task_name}) timed out after {timeout} seconds."
).format(task_pk=self.task_pk, task_name=self.task_name, timeout=self.timeout_seconds)
24 changes: 13 additions & 11 deletions pulpcore/tasking/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@
get_domain,
get_prn,
)
from pulpcore.exceptions import PulpException, InternalErrorException
from pulpcore.exceptions import (
PulpException,
InternalErrorException,
TaskConfigurationError,
TaskTimeoutException,
)
from pulpcore.app.contexts import with_task_context, awith_task_context, x_task_diagnostics_var
from pulpcore.constants import (
TASK_FINAL_STATES,
Expand Down Expand Up @@ -184,11 +189,11 @@ async def aget_task_function(task):
func, is_coroutine_fn = _load_function(task)

if task.immediate and not is_coroutine_fn:
raise ValueError("Immediate tasks must be async functions.")
raise TaskConfigurationError(task.name, "Immediate tasks must be async functions.")
elif not task.immediate:
raise ValueError("Non-immediate tasks can't run in async context.")
raise TaskConfigurationError(task.name, "Non-immediate tasks can't run in async context.")

return _add_timeout_to(func, task.pk)
return _add_timeout_to(func, task.name, task.pk)


def get_task_function(task):
Expand All @@ -202,15 +207,15 @@ def get_task_function(task):
func, is_coroutine_fn = _load_function(task)

if task.immediate and not is_coroutine_fn:
raise ValueError("Immediate tasks must be async functions.")
raise TaskConfigurationError(task.name, "Immediate tasks must be async functions.")

# no sync wrapper required
if not is_coroutine_fn:
return func

# async function in sync context requires wrapper
if task.immediate:
coro_fn_with_timeout = _add_timeout_to(func, task.pk)
coro_fn_with_timeout = _add_timeout_to(func, task.name, task.pk)
return async_to_sync(coro_fn_with_timeout)
return async_to_sync(func)

Expand All @@ -227,16 +232,13 @@ def _load_function(task):
return func_with_args, is_coroutine_fn


def _add_timeout_to(coro_fn, task_pk):
def _add_timeout_to(coro_fn, task_name, task_pk):

async def _wrapper():
try:
return await asyncio.wait_for(coro_fn(), timeout=IMMEDIATE_TIMEOUT)
except asyncio.TimeoutError:
msg_template = "Immediate task %s timed out after %s seconds."
error_msg = msg_template % (task_pk, IMMEDIATE_TIMEOUT)
_logger.info(error_msg)
raise RuntimeError(error_msg)
raise TaskTimeoutException(task_name, task_pk, IMMEDIATE_TIMEOUT)

return _wrapper

Expand Down
Loading