diff --git a/apps/bot/src/index.ts b/apps/bot/src/index.ts
index 3bf9a90..12ece95 100644
--- a/apps/bot/src/index.ts
+++ b/apps/bot/src/index.ts
@@ -8,8 +8,9 @@
import { execFileSync } from 'child_process'
import { Bot, InlineKeyboard, type Context } from 'grammy'
-import { JOB_STATUS, type Job, type JobEvent } from '@wright/shared'
+import { JOB_STATUS, REAPER_INTERVAL_MS, STALE_HEARTBEAT_MS, STALE_CLAIMED_MS, type Job, type JobEvent } from '@wright/shared'
import {
+ getSupabase,
insertJob,
getJob,
getJobByPrefix,
@@ -889,6 +890,138 @@ bot.catch((err) => {
console.error('Unhandled bot error:', err)
})
+// ---------------------------------------------------------------------------
+// Stale job reaper — detects dead workers via heartbeat expiry
+// ---------------------------------------------------------------------------
+
+function startReaper(): void {
+ const sb = getSupabase()
+
+ setInterval(async () => {
+ try {
+ const cutoff = new Date(Date.now() - STALE_HEARTBEAT_MS).toISOString()
+
+ // Find running jobs with stale or missing heartbeats
+ const { data: staleJobs, error } = await sb
+ .from('job_queue')
+ .select('id, attempt, max_attempts, worker_id, telegram_chat_id, task, heartbeat_at, started_at')
+ .eq('status', 'running')
+ .or(`heartbeat_at.lt.${cutoff},and(heartbeat_at.is.null,started_at.lt.${cutoff})`)
+
+ if (error) {
+ console.error('[reaper] Query error:', error.message)
+ return
+ }
+
+ if (!staleJobs || staleJobs.length === 0) return
+
+ console.log(`[reaper] Found ${staleJobs.length} stale running job(s)`)
+
+ for (const job of staleJobs) {
+ if (job.attempt < job.max_attempts) {
+ // Re-queue for retry
+ const { data: updated } = await sb
+ .from('job_queue')
+ .update({
+ status: 'queued',
+ worker_id: null,
+ claimed_at: null,
+ started_at: null,
+ heartbeat_at: null,
+ attempt: job.attempt + 1,
+ error: `Re-queued by reaper: worker stopped responding (attempt ${job.attempt + 1}/${job.max_attempts})`,
+ })
+ .eq('id', job.id)
+ .eq('status', 'running') // CAS: only if still running
+ .select('id')
+
+ if (updated && updated.length > 0) {
+ console.log(`[reaper] Re-queued job ${job.id} (attempt ${job.attempt + 1}/${job.max_attempts})`)
+
+ if (job.telegram_chat_id) {
+ try {
+ await bot.api.sendMessage(
+ job.telegram_chat_id,
+ `[${job.id.slice(0, 8)}] Worker stopped responding. `
+ + `Re-queuing automatically (attempt ${job.attempt + 1}/${job.max_attempts}).`,
+ { parse_mode: 'HTML' },
+ )
+ } catch {
+ // Best effort notification
+ }
+ }
+
+ wakeWorker()
+ }
+ } else {
+ // Max attempts exceeded — mark as permanently failed
+ const { data: updated } = await sb
+ .from('job_queue')
+ .update({
+ status: 'failed',
+ completed_at: new Date().toISOString(),
+ heartbeat_at: null,
+ error: `Failed: worker stopped responding after ${job.max_attempts} attempts`,
+ })
+ .eq('id', job.id)
+ .eq('status', 'running') // CAS
+ .select('id')
+
+ if (updated && updated.length > 0) {
+ console.log(`[reaper] Job ${job.id} permanently failed (max attempts)`)
+
+ if (job.telegram_chat_id) {
+ try {
+ await bot.api.sendMessage(
+ job.telegram_chat_id,
+ `[${job.id.slice(0, 8)}] Worker stopped responding. `
+ + `Job has failed permanently after ${job.max_attempts} attempts.`,
+ { parse_mode: 'HTML' },
+ )
+ } catch {
+ // Best effort notification
+ }
+ }
+ }
+ }
+ }
+
+ // Also check for stale claimed jobs (worker died before transitioning to running)
+ const claimedCutoff = new Date(Date.now() - STALE_CLAIMED_MS).toISOString()
+ const { data: staleClaimed } = await sb
+ .from('job_queue')
+ .select('id, worker_id')
+ .eq('status', 'claimed')
+ .lt('claimed_at', claimedCutoff)
+
+ if (staleClaimed && staleClaimed.length > 0) {
+ for (const job of staleClaimed) {
+ await sb
+ .from('job_queue')
+ .update({
+ status: 'queued',
+ worker_id: null,
+ claimed_at: null,
+ heartbeat_at: null,
+ error: `Re-queued by reaper: claimed by ${job.worker_id} but never started`,
+ })
+ .eq('id', job.id)
+ .eq('status', 'claimed') // CAS
+
+ console.log(`[reaper] Reset stale claimed job ${job.id}`)
+ }
+ wakeWorker()
+ }
+ } catch (err) {
+ console.error('[reaper] Unexpected error:', err)
+ }
+ }, REAPER_INTERVAL_MS)
+
+ console.log(
+ `[reaper] Stale job reaper started (interval: ${REAPER_INTERVAL_MS}ms, staleness: ${STALE_HEARTBEAT_MS}ms)`,
+ )
+}
+
// ---------------------------------------------------------------------------
// Startup
// ---------------------------------------------------------------------------
@@ -901,6 +1034,9 @@ async function main(): Promise {
// intentional -- we want a loud failure at startup.
startRealtimeBridge()
+ // Start the stale job reaper — detects dead workers via heartbeat expiry
+ startReaper()
+
// Start long polling. This will block until the process is stopped.
console.log('Bot is now polling for updates.')
await bot.start({
diff --git a/apps/worker/src/queue-poller.ts b/apps/worker/src/queue-poller.ts
index de9d412..8b34f17 100644
--- a/apps/worker/src/queue-poller.ts
+++ b/apps/worker/src/queue-poller.ts
@@ -1,6 +1,6 @@
import { createClient, type SupabaseClient } from '@supabase/supabase-js'
import type { Job } from '@wright/shared'
-import { POLL_INTERVAL_MS, STALE_CLAIMED_MS, STALE_RUNNING_MS } from '@wright/shared'
+import { POLL_INTERVAL_MS, STALE_CLAIMED_MS, STALE_RUNNING_MS, HEARTBEAT_INTERVAL_MS } from '@wright/shared'
import { runDevLoop } from './dev-loop.js'
// Worker identity — use Fly machine ID if available, otherwise hostname
@@ -120,6 +120,7 @@ export async function requeueCurrentJob(): Promise {
worker_id: null,
claimed_at: null,
started_at: null,
+ heartbeat_at: null,
attempt: job.attempt + 1,
error: `Re-queued: worker shutdown (SIGTERM), attempt ${job.attempt + 1}/${job.max_attempts}`,
})
@@ -141,6 +142,7 @@ export async function requeueCurrentJob(): Promise {
.update({
status: 'failed',
completed_at: new Date().toISOString(),
+ heartbeat_at: null,
error: `Failed after ${job.max_attempts} attempts (worker restarts)`,
})
.eq('id', job.id)
@@ -237,13 +239,26 @@ async function processJob(job: Job): Promise {
`[queue-poller] Processing job ${job.id} (attempt ${job.attempt})`,
)
+ // Start heartbeat interval — proves this worker is alive while processing
+ const heartbeatTimer = setInterval(async () => {
+ try {
+ await supabase!
+ .from('job_queue')
+ .update({ heartbeat_at: new Date().toISOString() })
+ .eq('id', job.id)
+ } catch (err) {
+ console.error(`[heartbeat] Failed to update heartbeat for ${job.id}:`, err)
+ }
+ }, HEARTBEAT_INTERVAL_MS)
+
try {
- // Mark as running
+ // Mark as running with initial heartbeat
await supabase
.from('job_queue')
.update({
status: 'running',
started_at: new Date().toISOString(),
+ heartbeat_at: new Date().toISOString(),
})
.eq('id', job.id)
@@ -297,6 +312,7 @@ async function processJob(job: Job): Promise {
})
.eq('id', job.id)
} finally {
+ clearInterval(heartbeatTimer)
if (onJobEnd) onJobEnd(job.id)
currentJob = null
currentAbortController = null
@@ -329,11 +345,53 @@ async function startupCleanup(): Promise {
status: 'queued',
worker_id: null,
claimed_at: null,
+ heartbeat_at: null,
})
.eq('id', job.id)
}
}
+ // 1b. Reset jobs still 'running' for this worker (interrupted by crash)
+ const { data: staleRunningThisWorker } = await supabase
+ .from('job_queue')
+ .select('id, attempt, max_attempts')
+ .eq('status', 'running')
+ .eq('worker_id', WORKER_ID)
+
+ if (staleRunningThisWorker && staleRunningThisWorker.length > 0) {
+ console.log(
+ `[queue-poller] Found ${staleRunningThisWorker.length} running job(s) from this worker (crash recovery)`,
+ )
+ for (const job of staleRunningThisWorker) {
+ if (job.attempt < job.max_attempts) {
+ await supabase
+ .from('job_queue')
+ .update({
+ status: 'queued',
+ worker_id: null,
+ claimed_at: null,
+ started_at: null,
+ heartbeat_at: null,
+ attempt: job.attempt + 1,
+ error: `Re-queued: worker crash recovery on startup (attempt ${job.attempt + 1}/${job.max_attempts})`,
+ })
+ .eq('id', job.id)
+ console.log(`[queue-poller] Re-queued running job ${job.id} (attempt ${job.attempt + 1}/${job.max_attempts})`)
+ } else {
+ await supabase
+ .from('job_queue')
+ .update({
+ status: 'failed',
+ completed_at: new Date().toISOString(),
+ heartbeat_at: null,
+ error: `Failed: worker crashed after ${job.max_attempts} attempts`,
+ })
+ .eq('id', job.id)
+ console.log(`[queue-poller] Job ${job.id} permanently failed (max attempts exceeded)`)
+ }
+ }
+ }
+
// 2. Reset jobs claimed by ANY worker for too long
const staleClaimedCutoff = new Date(
Date.now() - STALE_CLAIMED_MS,
@@ -355,6 +413,7 @@ async function startupCleanup(): Promise {
status: 'queued',
worker_id: null,
claimed_at: null,
+ heartbeat_at: null,
error: `Reset: claimed by ${job.worker_id} but never started`,
})
.eq('id', job.id)
@@ -384,6 +443,7 @@ async function startupCleanup(): Promise {
worker_id: null,
claimed_at: null,
started_at: null,
+ heartbeat_at: null,
attempt: job.attempt + 1,
error: `Re-queued: abandoned running job (attempt ${job.attempt + 1}/${job.max_attempts})`,
})
@@ -394,6 +454,7 @@ async function startupCleanup(): Promise {
.update({
status: 'failed',
completed_at: new Date().toISOString(),
+ heartbeat_at: null,
error: `Failed: abandoned after ${job.max_attempts} attempts`,
})
.eq('id', job.id)
diff --git a/packages/shared/src/constants.ts b/packages/shared/src/constants.ts
index 1cd1fbf..f30b142 100644
--- a/packages/shared/src/constants.ts
+++ b/packages/shared/src/constants.ts
@@ -50,6 +50,25 @@ export const STALE_CLAIMED_MS = 2 * 60 * 1000 // 2 minutes
*/
export const STALE_RUNNING_MS = 30 * 60 * 1000 // 30 minutes
+/**
+ * How often the worker sends a heartbeat while processing a job (ms).
+ */
+export const HEARTBEAT_INTERVAL_MS = 30_000 // 30 seconds
+
+/**
+ * How long a running job can go without a heartbeat before being
+ * considered stale (ms). Must be > HEARTBEAT_INTERVAL_MS.
+ *
+ * Set to 3x the heartbeat interval to tolerate transient delays
+ * (slow DB writes, GC pauses, etc.)
+ */
+export const STALE_HEARTBEAT_MS = 90_000 // 90 seconds
+
+/**
+ * How often the bot checks for stale running jobs (ms).
+ */
+export const REAPER_INTERVAL_MS = 60_000 // 60 seconds
+
/**
* Supabase table names.
*/
diff --git a/packages/shared/src/types.ts b/packages/shared/src/types.ts
index a405d3c..ce37698 100644
--- a/packages/shared/src/types.ts
+++ b/packages/shared/src/types.ts
@@ -64,6 +64,7 @@ export interface Job {
claimed_at?: string
started_at?: string
completed_at?: string
+ heartbeat_at?: string
// Error details on failure
error?: string
diff --git a/supabase/migrations/20260319000000_add_heartbeat_at.sql b/supabase/migrations/20260319000000_add_heartbeat_at.sql
new file mode 100644
index 0000000..f9ff7c1
--- /dev/null
+++ b/supabase/migrations/20260319000000_add_heartbeat_at.sql
@@ -0,0 +1,5 @@
+-- Add heartbeat_at column for worker liveness detection.
+-- Workers update this timestamp every 30s while processing a job.
+-- The bot-side reaper checks for stale heartbeats every 60s and
+-- re-queues jobs whose workers have stopped responding.
+ALTER TABLE job_queue ADD COLUMN heartbeat_at TIMESTAMPTZ;