-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworker.py
More file actions
119 lines (100 loc) · 4.39 KB
/
worker.py
File metadata and controls
119 lines (100 loc) · 4.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import asyncio
import json
import logging
import os
import random
from datetime import timedelta
import asyncpg
from pgqueuer import PgQueuer
from pgqueuer import errors
from pgqueuer.db import AsyncpgDriver
from pgqueuer.executors import RetryWithBackoffEntrypointExecutor, EntrypointExecutorParameters
from pgqueuer.models import Job
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
DATABASE_URL = os.getenv(
"DATABASE_URL",
"postgresql+psycopg://fastapi_user:fastapi_password@localhost:5432/fastapi_db",
)
ASYNC_DATABASE_URL = DATABASE_URL.replace("postgresql+psycopg://", "postgresql://")
# Mock Audit API
async def mock_audit_api(payload: dict) -> bool:
"""
Mock Audit API that randomly fails to simulate real-world scenarios
Returns True for success, False for failure
"""
# Simulate API call delay
await asyncio.sleep(0.1)
# 30% chance of failure to test DLQ functionality
if random.random() < 0.3:
logger.error(f"Mock Audit API failed for payload: {payload}")
return False
logger.info(f"Mock Audit API success for user_id: {payload.get('user_id')}")
return True
async def main():
"""Main worker function"""
logger.info("Starting pgqueuer worker for audit events...")
connection = await asyncpg.connect(ASYNC_DATABASE_URL)
driver = AsyncpgDriver(connection)
pgq = PgQueuer(driver)
# Custom executor factory with retry configuration and enhanced logging
class EnhancedRetryExecutor(RetryWithBackoffEntrypointExecutor):
async def execute(self, job: Job, context) -> None:
try:
await super().execute(job, context)
except errors.MaxRetriesExceeded as e:
# Log final failure after all retries exhausted
payload = json.loads(job.payload.decode())
logger.error(
f"🚨 FINAL FAILURE - Job {job.id} failed permanently after {self.max_attempts} attempts. "
f"Payload: {payload}. Job moved to DLQ/failed status."
)
raise
except errors.MaxTimeExceeded as e:
# Log final failure due to time limit
payload = json.loads(job.payload.decode())
logger.error(
f"🚨 TIMEOUT FAILURE - Job {job.id} exceeded max time limit. "
f"Payload: {payload}. Job moved to DLQ/failed status."
)
raise
def retry_executor_factory(params: EntrypointExecutorParameters):
return EnhancedRetryExecutor(
parameters=params,
max_attempts=3, # Retry up to 3 times
initial_delay=5.0, # Start with 5 second delay (float)
max_delay=timedelta(seconds=60), # Max delay of 1 minute
max_time=timedelta(minutes=10) # Give up after 10 minutes total
)
@pgq.entrypoint("audit_events", retry_timer=timedelta(seconds=30), executor_factory=retry_executor_factory)
async def handle_audit_event(job: Job) -> None:
"""Process audit job by calling mock audit API"""
try:
# Decode the job payload
payload = json.loads(job.payload.decode())
logger.info(f"Processing audit job: {payload} (job_id: {job.id})")
# Call mock audit API
success = await mock_audit_api(payload)
if not success:
logger.warning(f"Audit API failed for job_id: {job.id}, will retry")
raise Exception("Audit API call failed")
logger.info(
f"Successfully processed audit for user_id: {payload.get('user_id')} (job_id: {job.id})"
)
except Exception as e:
# Enhanced logging for failed jobs - this will show before retry
logger.error(f"Failed to process audit job {job.id}: {e} - Job will be retried")
# Job will be retried automatically by pgqueuer
raise
# Start the worker with optimized settings
logger.info("Starting pgqueuer worker...")
await pgq.run(
dequeue_timeout=timedelta(seconds=5), # Check for new jobs every 5 seconds (faster polling)
batch_size=5, # Process up to 5 jobs at once
max_concurrent_tasks=10 # Allow up to 10 concurrent jobs
)
if __name__ == "__main__":
asyncio.run(main())