From d596203c0780fc9b2b9d0d6d7d82526f3da03176 Mon Sep 17 00:00:00 2001 From: Aliaksei Klimau Date: Mon, 30 Mar 2026 17:31:58 +0200 Subject: [PATCH] Added new PulpException replacing existing errors --- pulpcore/exceptions/__init__.py | 2 ++ pulpcore/exceptions/base.py | 49 +++++++++++++++++++++++++++++++++ pulpcore/tasking/tasks.py | 24 ++++++++-------- 3 files changed, 64 insertions(+), 11 deletions(-) diff --git a/pulpcore/exceptions/__init__.py b/pulpcore/exceptions/__init__.py index 4c458e43933..5311a74f8a9 100644 --- a/pulpcore/exceptions/__init__.py +++ b/pulpcore/exceptions/__init__.py @@ -16,6 +16,8 @@ ReplicateError, SyncError, PublishError, + TaskConfigurationError, + TaskTimeoutException, ) from .validation import ( DigestValidationError, diff --git a/pulpcore/exceptions/base.py b/pulpcore/exceptions/base.py index 42b9016bfd5..834d6773297 100644 --- a/pulpcore/exceptions/base.py +++ b/pulpcore/exceptions/base.py @@ -325,3 +325,52 @@ class ReplicateError(PulpException): def __str__(self): return f"[{self.error_code}] " + _("Replication failed") + + +class TaskConfigurationError(PulpException): + """ + Raised when a task is incorrectly configured. + """ + + error_code = "PLP0023" + + def __init__(self, task_name, message): + """ + :param task_name: the fully qualified name of the task function + :type task_name: str + :param message: description of the configuration error + :type message: str + """ + self.task_name = task_name + self.message = message + + def __str__(self): + return f"[{self.error_code}] " + _( + "Task type '{task_name}' is misconfigured: {message}" + ).format(task_name=self.task_name, message=self.message) + + +class TaskTimeoutException(PulpException): + """ + Raised when an immediate task exceeds its execution timeout. + """ + + error_code = "PLP0024" + + def __init__(self, task_name, task_pk, timeout_seconds): + """ + :param task_name: the fully qualified name of the task function + :type task_name: str + :param task_pk: the unique task identifier + :type task_pk: str + :param timeout_seconds: the timeout value that was exceeded + :type timeout_seconds: int + """ + self.task_name = task_name + self.task_pk = task_pk + self.timeout_seconds = timeout_seconds + + def __str__(self): + return f"[{self.error_code}] " + _( + "Immediate task {task_pk} (type: {task_name}) timed out after {timeout} seconds." + ).format(task_pk=self.task_pk, task_name=self.task_name, timeout=self.timeout_seconds) diff --git a/pulpcore/tasking/tasks.py b/pulpcore/tasking/tasks.py index da9993b30c5..47d3c369f28 100644 --- a/pulpcore/tasking/tasks.py +++ b/pulpcore/tasking/tasks.py @@ -21,7 +21,12 @@ get_domain, get_prn, ) -from pulpcore.exceptions import PulpException, InternalErrorException +from pulpcore.exceptions import ( + PulpException, + InternalErrorException, + TaskConfigurationError, + TaskTimeoutException, +) from pulpcore.app.contexts import with_task_context, awith_task_context, x_task_diagnostics_var from pulpcore.constants import ( TASK_FINAL_STATES, @@ -184,11 +189,11 @@ async def aget_task_function(task): func, is_coroutine_fn = _load_function(task) if task.immediate and not is_coroutine_fn: - raise ValueError("Immediate tasks must be async functions.") + raise TaskConfigurationError(task.name, "Immediate tasks must be async functions.") elif not task.immediate: - raise ValueError("Non-immediate tasks can't run in async context.") + raise TaskConfigurationError(task.name, "Non-immediate tasks can't run in async context.") - return _add_timeout_to(func, task.pk) + return _add_timeout_to(func, task.name, task.pk) def get_task_function(task): @@ -202,7 +207,7 @@ def get_task_function(task): func, is_coroutine_fn = _load_function(task) if task.immediate and not is_coroutine_fn: - raise ValueError("Immediate tasks must be async functions.") + raise TaskConfigurationError(task.name, "Immediate tasks must be async functions.") # no sync wrapper required if not is_coroutine_fn: @@ -210,7 +215,7 @@ def get_task_function(task): # async function in sync context requires wrapper if task.immediate: - coro_fn_with_timeout = _add_timeout_to(func, task.pk) + coro_fn_with_timeout = _add_timeout_to(func, task.name, task.pk) return async_to_sync(coro_fn_with_timeout) return async_to_sync(func) @@ -227,16 +232,13 @@ def _load_function(task): return func_with_args, is_coroutine_fn -def _add_timeout_to(coro_fn, task_pk): +def _add_timeout_to(coro_fn, task_name, task_pk): async def _wrapper(): try: return await asyncio.wait_for(coro_fn(), timeout=IMMEDIATE_TIMEOUT) except asyncio.TimeoutError: - msg_template = "Immediate task %s timed out after %s seconds." - error_msg = msg_template % (task_pk, IMMEDIATE_TIMEOUT) - _logger.info(error_msg) - raise RuntimeError(error_msg) + raise TaskTimeoutException(task_name, task_pk, IMMEDIATE_TIMEOUT) return _wrapper