From 54526b72a1383cd67ed80b4457757e99715ee2c1 Mon Sep 17 00:00:00 2001 From: Richard Abrich Date: Thu, 19 Mar 2026 13:53:32 -0400 Subject: [PATCH 1/2] feat: add heartbeat + bot-side reaper for worker reliability Worker writes heartbeat_at every 30s during processing. Bot reaper runs every 60s, detects stale running jobs (heartbeat >90s old), re-queues them with CAS safety. Includes Telegram notifications and startup crash recovery. Co-Authored-By: Claude Opus 4.6 (1M context) --- apps/bot/src/index.ts | 137 +++++++++++++++++- apps/worker/src/queue-poller.ts | 65 ++++++++- packages/shared/src/constants.ts | 19 +++ packages/shared/src/types.ts | 1 + .../20260319000000_add_heartbeat_at.sql | 5 + 5 files changed, 224 insertions(+), 3 deletions(-) create mode 100644 supabase/migrations/20260319000000_add_heartbeat_at.sql diff --git a/apps/bot/src/index.ts b/apps/bot/src/index.ts index 3bf9a90..8070e98 100644 --- a/apps/bot/src/index.ts +++ b/apps/bot/src/index.ts @@ -8,8 +8,9 @@ import { execFileSync } from 'child_process' import { Bot, InlineKeyboard, type Context } from 'grammy' -import { JOB_STATUS, type Job, type JobEvent } from '@wright/shared' +import { JOB_STATUS, REAPER_INTERVAL_MS, STALE_HEARTBEAT_MS, STALE_CLAIMED_MS, type Job, type JobEvent } from '@wright/shared' import { + getSupabase, insertJob, getJob, getJobByPrefix, @@ -889,6 +890,137 @@ bot.catch((err) => { console.error('Unhandled bot error:', err) }) +// --------------------------------------------------------------------------- +// Stale job reaper — detects dead workers via heartbeat expiry +// --------------------------------------------------------------------------- + +function startReaper(): void { + const sb = getSupabase() + + setInterval(async () => { + try { + const cutoff = new Date(Date.now() - STALE_HEARTBEAT_MS).toISOString() + + // Find running jobs with stale or missing heartbeats + const { data: staleJobs, error } = await sb + .from('job_queue') + .select('id, attempt, max_attempts, worker_id, telegram_chat_id, task, heartbeat_at, started_at') + .eq('status', 'running') + .or(`heartbeat_at.lt.${cutoff},and(heartbeat_at.is.null,started_at.lt.${cutoff})`) + + if (error) { + console.error('[reaper] Query error:', error.message) + return + } + + if (!staleJobs || staleJobs.length === 0) return + + console.log(`[reaper] Found ${staleJobs.length} stale running job(s)`) + + for (const job of staleJobs) { + if (job.attempt < job.max_attempts) { + // Re-queue for retry + const { data: updated } = await sb + .from('job_queue') + .update({ + status: 'queued', + worker_id: null, + claimed_at: null, + started_at: null, + heartbeat_at: null, + attempt: job.attempt + 1, + error: `Re-queued by reaper: worker stopped responding (attempt ${job.attempt + 1}/${job.max_attempts})`, + }) + .eq('id', job.id) + .eq('status', 'running') // CAS: only if still running + .select('id') + + if (updated && updated.length > 0) { + console.log(`[reaper] Re-queued job ${job.id} (attempt ${job.attempt + 1}/${job.max_attempts})`) + + if (job.telegram_chat_id) { + try { + await bot.api.sendMessage( + job.telegram_chat_id, + `[${job.id.slice(0, 8)}] Worker stopped responding. ` + + `Re-queuing automatically (attempt ${job.attempt + 1}/${job.max_attempts}).`, + { parse_mode: 'HTML' }, + ) + } catch { + // Best effort notification + } + } + + wakeWorker() + } + } else { + // Max attempts exceeded — mark as permanently failed + const { data: updated } = await sb + .from('job_queue') + .update({ + status: 'failed', + completed_at: new Date().toISOString(), + heartbeat_at: null, + error: `Failed: worker stopped responding after ${job.max_attempts} attempts`, + }) + .eq('id', job.id) + .eq('status', 'running') // CAS + + if (updated && updated.length > 0) { + console.log(`[reaper] Job ${job.id} permanently failed (max attempts)`) + + if (job.telegram_chat_id) { + try { + await bot.api.sendMessage( + job.telegram_chat_id, + `[${job.id.slice(0, 8)}] Worker stopped responding. ` + + `Job has failed permanently after ${job.max_attempts} attempts.`, + { parse_mode: 'HTML' }, + ) + } catch { + // Best effort notification + } + } + } + } + } + + // Also check for stale claimed jobs (worker died before transitioning to running) + const claimedCutoff = new Date(Date.now() - STALE_CLAIMED_MS).toISOString() + const { data: staleClaimed } = await sb + .from('job_queue') + .select('id, worker_id') + .eq('status', 'claimed') + .lt('claimed_at', claimedCutoff) + + if (staleClaimed && staleClaimed.length > 0) { + for (const job of staleClaimed) { + await sb + .from('job_queue') + .update({ + status: 'queued', + worker_id: null, + claimed_at: null, + heartbeat_at: null, + error: `Re-queued by reaper: claimed by ${job.worker_id} but never started`, + }) + .eq('id', job.id) + .eq('status', 'claimed') // CAS + + console.log(`[reaper] Reset stale claimed job ${job.id}`) + } + wakeWorker() + } + } catch (err) { + console.error('[reaper] Unexpected error:', err) + } + }, REAPER_INTERVAL_MS) + + console.log( + `[reaper] Stale job reaper started (interval: ${REAPER_INTERVAL_MS}ms, staleness: ${STALE_HEARTBEAT_MS}ms)`, + ) +} + // --------------------------------------------------------------------------- // Startup // --------------------------------------------------------------------------- @@ -901,6 +1033,9 @@ async function main(): Promise { // intentional -- we want a loud failure at startup. startRealtimeBridge() + // Start the stale job reaper — detects dead workers via heartbeat expiry + startReaper() + // Start long polling. This will block until the process is stopped. console.log('Bot is now polling for updates.') await bot.start({ diff --git a/apps/worker/src/queue-poller.ts b/apps/worker/src/queue-poller.ts index de9d412..8b34f17 100644 --- a/apps/worker/src/queue-poller.ts +++ b/apps/worker/src/queue-poller.ts @@ -1,6 +1,6 @@ import { createClient, type SupabaseClient } from '@supabase/supabase-js' import type { Job } from '@wright/shared' -import { POLL_INTERVAL_MS, STALE_CLAIMED_MS, STALE_RUNNING_MS } from '@wright/shared' +import { POLL_INTERVAL_MS, STALE_CLAIMED_MS, STALE_RUNNING_MS, HEARTBEAT_INTERVAL_MS } from '@wright/shared' import { runDevLoop } from './dev-loop.js' // Worker identity — use Fly machine ID if available, otherwise hostname @@ -120,6 +120,7 @@ export async function requeueCurrentJob(): Promise { worker_id: null, claimed_at: null, started_at: null, + heartbeat_at: null, attempt: job.attempt + 1, error: `Re-queued: worker shutdown (SIGTERM), attempt ${job.attempt + 1}/${job.max_attempts}`, }) @@ -141,6 +142,7 @@ export async function requeueCurrentJob(): Promise { .update({ status: 'failed', completed_at: new Date().toISOString(), + heartbeat_at: null, error: `Failed after ${job.max_attempts} attempts (worker restarts)`, }) .eq('id', job.id) @@ -237,13 +239,26 @@ async function processJob(job: Job): Promise { `[queue-poller] Processing job ${job.id} (attempt ${job.attempt})`, ) + // Start heartbeat interval — proves this worker is alive while processing + const heartbeatTimer = setInterval(async () => { + try { + await supabase! + .from('job_queue') + .update({ heartbeat_at: new Date().toISOString() }) + .eq('id', job.id) + } catch (err) { + console.error(`[heartbeat] Failed to update heartbeat for ${job.id}:`, err) + } + }, HEARTBEAT_INTERVAL_MS) + try { - // Mark as running + // Mark as running with initial heartbeat await supabase .from('job_queue') .update({ status: 'running', started_at: new Date().toISOString(), + heartbeat_at: new Date().toISOString(), }) .eq('id', job.id) @@ -297,6 +312,7 @@ async function processJob(job: Job): Promise { }) .eq('id', job.id) } finally { + clearInterval(heartbeatTimer) if (onJobEnd) onJobEnd(job.id) currentJob = null currentAbortController = null @@ -329,11 +345,53 @@ async function startupCleanup(): Promise { status: 'queued', worker_id: null, claimed_at: null, + heartbeat_at: null, }) .eq('id', job.id) } } + // 1b. Reset jobs still 'running' for this worker (interrupted by crash) + const { data: staleRunningThisWorker } = await supabase + .from('job_queue') + .select('id, attempt, max_attempts') + .eq('status', 'running') + .eq('worker_id', WORKER_ID) + + if (staleRunningThisWorker && staleRunningThisWorker.length > 0) { + console.log( + `[queue-poller] Found ${staleRunningThisWorker.length} running job(s) from this worker (crash recovery)`, + ) + for (const job of staleRunningThisWorker) { + if (job.attempt < job.max_attempts) { + await supabase + .from('job_queue') + .update({ + status: 'queued', + worker_id: null, + claimed_at: null, + started_at: null, + heartbeat_at: null, + attempt: job.attempt + 1, + error: `Re-queued: worker crash recovery on startup (attempt ${job.attempt + 1}/${job.max_attempts})`, + }) + .eq('id', job.id) + console.log(`[queue-poller] Re-queued running job ${job.id} (attempt ${job.attempt + 1}/${job.max_attempts})`) + } else { + await supabase + .from('job_queue') + .update({ + status: 'failed', + completed_at: new Date().toISOString(), + heartbeat_at: null, + error: `Failed: worker crashed after ${job.max_attempts} attempts`, + }) + .eq('id', job.id) + console.log(`[queue-poller] Job ${job.id} permanently failed (max attempts exceeded)`) + } + } + } + // 2. Reset jobs claimed by ANY worker for too long const staleClaimedCutoff = new Date( Date.now() - STALE_CLAIMED_MS, @@ -355,6 +413,7 @@ async function startupCleanup(): Promise { status: 'queued', worker_id: null, claimed_at: null, + heartbeat_at: null, error: `Reset: claimed by ${job.worker_id} but never started`, }) .eq('id', job.id) @@ -384,6 +443,7 @@ async function startupCleanup(): Promise { worker_id: null, claimed_at: null, started_at: null, + heartbeat_at: null, attempt: job.attempt + 1, error: `Re-queued: abandoned running job (attempt ${job.attempt + 1}/${job.max_attempts})`, }) @@ -394,6 +454,7 @@ async function startupCleanup(): Promise { .update({ status: 'failed', completed_at: new Date().toISOString(), + heartbeat_at: null, error: `Failed: abandoned after ${job.max_attempts} attempts`, }) .eq('id', job.id) diff --git a/packages/shared/src/constants.ts b/packages/shared/src/constants.ts index 1cd1fbf..f30b142 100644 --- a/packages/shared/src/constants.ts +++ b/packages/shared/src/constants.ts @@ -50,6 +50,25 @@ export const STALE_CLAIMED_MS = 2 * 60 * 1000 // 2 minutes */ export const STALE_RUNNING_MS = 30 * 60 * 1000 // 30 minutes +/** + * How often the worker sends a heartbeat while processing a job (ms). + */ +export const HEARTBEAT_INTERVAL_MS = 30_000 // 30 seconds + +/** + * How long a running job can go without a heartbeat before being + * considered stale (ms). Must be > HEARTBEAT_INTERVAL_MS. + * + * Set to 3x the heartbeat interval to tolerate transient delays + * (slow DB writes, GC pauses, etc.) + */ +export const STALE_HEARTBEAT_MS = 90_000 // 90 seconds + +/** + * How often the bot checks for stale running jobs (ms). + */ +export const REAPER_INTERVAL_MS = 60_000 // 60 seconds + /** * Supabase table names. */ diff --git a/packages/shared/src/types.ts b/packages/shared/src/types.ts index a405d3c..ce37698 100644 --- a/packages/shared/src/types.ts +++ b/packages/shared/src/types.ts @@ -64,6 +64,7 @@ export interface Job { claimed_at?: string started_at?: string completed_at?: string + heartbeat_at?: string // Error details on failure error?: string diff --git a/supabase/migrations/20260319000000_add_heartbeat_at.sql b/supabase/migrations/20260319000000_add_heartbeat_at.sql new file mode 100644 index 0000000..f9ff7c1 --- /dev/null +++ b/supabase/migrations/20260319000000_add_heartbeat_at.sql @@ -0,0 +1,5 @@ +-- Add heartbeat_at column for worker liveness detection. +-- Workers update this timestamp every 30s while processing a job. +-- The bot-side reaper checks for stale heartbeats every 60s and +-- re-queues jobs whose workers have stopped responding. +ALTER TABLE job_queue ADD COLUMN heartbeat_at TIMESTAMPTZ; From 3e8ae78ee81dd8ae89b6fabb14fc6b9cc01f9417 Mon Sep 17 00:00:00 2001 From: Richard Abrich Date: Thu, 19 Mar 2026 13:58:10 -0400 Subject: [PATCH 2/2] fix: add missing .select() to reaper update query to fix TS error The Supabase `.update()` without `.select()` returns `null` for data, causing TypeScript to infer it as `never`. Adding `.select('id')` matches the pattern used in the re-queue path above. Co-Authored-By: Claude Opus 4.6 (1M context) --- apps/bot/src/index.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/bot/src/index.ts b/apps/bot/src/index.ts index 8070e98..12ece95 100644 --- a/apps/bot/src/index.ts +++ b/apps/bot/src/index.ts @@ -965,6 +965,7 @@ function startReaper(): void { }) .eq('id', job.id) .eq('status', 'running') // CAS + .select('id') if (updated && updated.length > 0) { console.log(`[reaper] Job ${job.id} permanently failed (max attempts)`)