Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 39 additions & 32 deletions workers/api-deployment/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,30 @@ def _unified_api_execution(
Returns:
Execution result dictionary
"""
# Set up execution context - exit early if setup fails
organization_id = schema_name
try:
# Set up execution context using shared utilities
organization_id = schema_name
config, api_client = WorkerExecutionContext.setup_execution_context(
_, api_client = WorkerExecutionContext.setup_execution_context(
organization_id, execution_id, workflow_id
)
except Exception as e:
logger.error(f"Failed to setup execution context: {e}")
# Clean up StateStore since setup_execution_context may have
# partially populated it before failing
try:
from shared.infrastructure.context import StateStore

StateStore.clear_all()
except Exception:
logger.debug("StateStore cleanup also failed during early exit")
return {
"execution_id": execution_id,
"status": "ERROR",
"error": str(e),
"files_processed": 0,
}

try:
# Log task start with standardized format
WorkerExecutionContext.log_task_start(
f"unified_api_execution_{task_type}",
Expand Down Expand Up @@ -216,12 +233,12 @@ def _unified_api_execution(
schema_name=organization_id,
workflow_id=workflow_id,
execution_id=execution_id,
hash_values_of_files=converted_files, # Changed parameter name
hash_values_of_files=converted_files,
scheduled=scheduled,
execution_mode=execution_mode,
pipeline_id=pipeline_id,
use_file_history=use_file_history,
task_id=task_instance.request.id, # Add required task_id
task_id=task_instance.request.id,
**kwargs,
)

Expand All @@ -233,25 +250,14 @@ def _unified_api_execution(
f"files_processed={len(converted_files)}",
)

# CRITICAL: Clean up StateStore to prevent data leaks between tasks
try:
from shared.infrastructure.context import StateStore

StateStore.clear_all()
logger.debug("🧹 Cleaned up StateStore context to prevent data leaks")
except Exception as cleanup_error:
logger.warning(f"Failed to cleanup StateStore context: {cleanup_error}")

return result

except Exception as e:
logger.error(f"API execution failed: {e}")

# Handle execution error with standardized pattern
if "api_client" in locals():
WorkerExecutionContext.handle_execution_error(
api_client, execution_id, e, logger, f"api_execution_{task_type}"
)
WorkerExecutionContext.handle_execution_error(
api_client, execution_id, e, logger, f"api_execution_{task_type}"
)

# Log completion with error
WorkerExecutionContext.log_task_completion(
Expand All @@ -261,26 +267,27 @@ def _unified_api_execution(
f"error={str(e)}",
)

# CRITICAL: Clean up StateStore to prevent data leaks between tasks (error path)
try:
from shared.infrastructure.context import StateStore

StateStore.clear_all()
logger.debug(
"🧹 Cleaned up StateStore context to prevent data leaks (error path)"
)
except Exception as cleanup_error:
logger.warning(
f"Failed to cleanup StateStore context on error: {cleanup_error}"
)

return {
"execution_id": execution_id,
"status": "ERROR",
"error": str(e),
"files_processed": 0,
}

finally:
try:
api_client.close()
except Exception as e:
logger.warning("api_client.close() failed during cleanup: %s", e)

# Clean up StateStore to prevent data leaks between tasks
try:
from shared.infrastructure.context import StateStore

StateStore.clear_all()
except Exception as cleanup_error:
logger.warning(f"Failed to cleanup StateStore context: {cleanup_error}")


@app.task(
bind=True,
Expand Down
Loading
Loading