diff --git a/cloudflare-gastown/src/db/tables/beads.table.ts b/cloudflare-gastown/src/db/tables/beads.table.ts index e629017a1..bf23f2910 100644 --- a/cloudflare-gastown/src/db/tables/beads.table.ts +++ b/cloudflare-gastown/src/db/tables/beads.table.ts @@ -50,6 +50,8 @@ export const BeadRecord = z.object({ } }) .pipe(z.record(z.string(), z.any())), // z.any() needed for Rpc.Serializable compatibility + dispatch_attempts: z.number().int().default(0), + last_dispatch_attempt_at: z.string().nullable().default(null), created_by: z.string().nullable(), created_at: z.string(), updated_at: z.string(), @@ -129,10 +131,20 @@ export function createTableBeads(): string { created_by: `text`, created_at: `text not null`, updated_at: `text not null`, + dispatch_attempts: `integer not null default 0`, + last_dispatch_attempt_at: `text`, closed_at: `text`, }); } +/** Idempotent ALTER statements for existing databases. */ +export function migrateBeads(): string[] { + return [ + `ALTER TABLE beads ADD COLUMN dispatch_attempts integer not null default 0`, + `ALTER TABLE beads ADD COLUMN last_dispatch_attempt_at text`, + ]; +} + export function getIndexesBeads(): string[] { return [ `CREATE INDEX IF NOT EXISTS idx_beads_type_status ON ${beads}(${beads.columns.type}, ${beads.columns.status})`, diff --git a/cloudflare-gastown/src/dos/Town.do.ts b/cloudflare-gastown/src/dos/Town.do.ts index b3eeaa158..071ce42f9 100644 --- a/cloudflare-gastown/src/dos/Town.do.ts +++ b/cloudflare-gastown/src/dos/Town.do.ts @@ -1555,6 +1555,25 @@ export class TownDO extends DurableObject { switch (action) { case 'RESTART': case 'RESTART_WITH_BACKOFF': { + // Fix 4 (#1653): if the hooked bead has exhausted its dispatch + // attempts, fail it instead of restarting — prevents infinite loops. + const restartBeadId = snapshotHookedBeadId ?? targetAgent?.current_hook_bead_id; + if (restartBeadId) { + const restartBead = beadOps.getBead(this.sql, restartBeadId); + if (restartBead && restartBead.dispatch_attempts >= scheduling.MAX_DISPATCH_ATTEMPTS) { + beadOps.updateBeadStatus(this.sql, restartBeadId, 'failed', input.agent_id); + if (targetAgent?.current_hook_bead_id === restartBeadId) { + if (targetAgent.status === 'working' || targetAgent.status === 'stalled') { + dispatch + .stopAgentInContainer(this.env, this.townId, targetAgentId) + .catch(() => {}); + } + agents.unhookBead(this.sql, targetAgentId); + } + break; + } + } + // Stop the agent in the container, reset to idle so the // scheduler picks it up again on the next alarm cycle. if (targetAgent?.status === 'working' || targetAgent?.status === 'stalled') { diff --git a/cloudflare-gastown/src/dos/town/actions.ts b/cloudflare-gastown/src/dos/town/actions.ts index 5f799ec90..199c15a7f 100644 --- a/cloudflare-gastown/src/dos/town/actions.ts +++ b/cloudflare-gastown/src/dos/town/actions.ts @@ -517,6 +517,19 @@ export function applyAction(ctx: ApplyActionContext, action: Action): (() => Pro `, [agentId] ); + // Track dispatch attempts on the bead itself (not just the agent). + // The bead counter is never reset by hookBead, preventing the + // infinite retry loop (#1653). + query( + sql, + /* sql */ ` + UPDATE ${beads} + SET ${beads.columns.dispatch_attempts} = ${beads.columns.dispatch_attempts} + 1, + ${beads.columns.last_dispatch_attempt_at} = ? + WHERE ${beads.bead_id} = ? + `, + [now(), beadId] + ); beadOps.updateBeadStatus(sql, beadId, 'in_progress', agentId); const capturedAgentId = agentId; diff --git a/cloudflare-gastown/src/dos/town/agents.ts b/cloudflare-gastown/src/dos/town/agents.ts index c208a3834..39ce9c591 100644 --- a/cloudflare-gastown/src/dos/town/agents.ts +++ b/cloudflare-gastown/src/dos/town/agents.ts @@ -282,13 +282,16 @@ export function hookBead(sql: SqlStorage, agentId: string, beadId: string): void unhookBead(sql, stale.bead_id); } + // Do NOT reset dispatch_attempts here — per-bead dispatch tracking + // lives on the beads table now (beads.dispatch_attempts). Resetting + // the agent counter on every hook was the root cause of the infinite + // retry loop (#1653). query( sql, /* sql */ ` UPDATE ${agent_metadata} SET ${agent_metadata.columns.current_hook_bead_id} = ?, ${agent_metadata.columns.status} = 'idle', - ${agent_metadata.columns.dispatch_attempts} = 0, ${agent_metadata.columns.last_activity_at} = ?, ${agent_metadata.columns.agent_status_message} = NULL, ${agent_metadata.columns.agent_status_updated_at} = NULL diff --git a/cloudflare-gastown/src/dos/town/beads.ts b/cloudflare-gastown/src/dos/town/beads.ts index 5fd9463fa..85a9e8a6b 100644 --- a/cloudflare-gastown/src/dos/town/beads.ts +++ b/cloudflare-gastown/src/dos/town/beads.ts @@ -4,7 +4,13 @@ */ import { z } from 'zod'; -import { beads, BeadRecord, createTableBeads, getIndexesBeads } from '../../db/tables/beads.table'; +import { + beads, + BeadRecord, + createTableBeads, + getIndexesBeads, + migrateBeads, +} from '../../db/tables/beads.table'; import { bead_events, BeadEventRecord, @@ -65,7 +71,7 @@ export function initBeadTables(sql: SqlStorage): void { dropCheckConstraints(sql); // Migrations: add columns to existing tables (idempotent) - for (const stmt of [...migrateConvoyMetadata(), ...migrateAgentMetadata()]) { + for (const stmt of [...migrateBeads(), ...migrateConvoyMetadata(), ...migrateAgentMetadata()]) { try { query(sql, stmt, []); } catch { diff --git a/cloudflare-gastown/src/dos/town/reconciler.ts b/cloudflare-gastown/src/dos/town/reconciler.ts index e9fef8d21..4a42a231d 100644 --- a/cloudflare-gastown/src/dos/town/reconciler.ts +++ b/cloudflare-gastown/src/dos/town/reconciler.ts @@ -25,7 +25,7 @@ import { AGENT_GC_RETENTION_MS, TRIAGE_LABEL_LIKE, } from './patrol'; -import { DISPATCH_COOLDOWN_MS, MAX_DISPATCH_ATTEMPTS } from './scheduling'; +import { MAX_DISPATCH_ATTEMPTS, getDispatchBackoffMs } from './scheduling'; import * as reviewQueue from './review-queue'; import * as agents from './agents'; import * as beadOps from './beads'; @@ -35,6 +35,13 @@ import type { TownEventRecord } from '../../db/tables/town-events.table'; const LOG = '[reconciler]'; +// ── Circuit breaker constants ──────────────────────────────────────── + +/** Number of total dispatch failures in a 30-min window that trips the circuit breaker. */ +const CIRCUIT_BREAKER_FAILURE_THRESHOLD = 20; +/** Window (ms) over which dispatch failures are counted. */ +const CIRCUIT_BREAKER_WINDOW_MS = 30 * 60_000; // 30 min + // ── Timeouts (from spec §7) ───────────────────────────────────────── /** Reset non-PR MR beads stuck in_progress with no working agent */ @@ -61,6 +68,33 @@ function staleMs(timestamp: string | null, thresholdMs: number): boolean { return Date.now() - new Date(timestamp).getTime() > thresholdMs; } +/** + * Town-level dispatch circuit breaker (#1653). + * Counts total dispatch_attempts across all beads with last_dispatch_attempt_at + * in the recent window. If it exceeds the threshold, returns the failure count. + * Returns 0 if the circuit breaker is not tripped. + */ +function checkDispatchCircuitBreaker(sql: SqlStorage): number { + const cutoff = new Date(Date.now() - CIRCUIT_BREAKER_WINDOW_MS).toISOString(); + const rows = z + .object({ total_attempts: z.number() }) + .array() + .parse([ + ...query( + sql, + /* sql */ ` + SELECT COALESCE(SUM(${beads.columns.dispatch_attempts}), 0) as total_attempts + FROM ${beads} + WHERE ${beads.columns.last_dispatch_attempt_at} > ? + AND ${beads.columns.dispatch_attempts} > 0 + `, + [cutoff] + ), + ]); + const totalAttempts = rows[0]?.total_attempts ?? 0; + return totalAttempts >= CIRCUIT_BREAKER_FAILURE_THRESHOLD ? totalAttempts : 0; +} + // ── Row schemas for queries ───────────────────────────────────────── // Derived from table record schemas for traceability back to table defs. @@ -89,6 +123,8 @@ const BeadRow = BeadRecord.pick({ updated_at: true, labels: true, created_by: true, + dispatch_attempts: true, + last_dispatch_attempt_at: true, }); type BeadRow = z.infer; @@ -460,6 +496,22 @@ export function reconcileAgents(sql: SqlStorage): Action[] { export function reconcileBeads(sql: SqlStorage): Action[] { const actions: Action[] = []; + // Town-level circuit breaker (#1653): if too many dispatch failures + // in the recent window, skip all dispatch_agent actions and escalate. + const cbFailures = checkDispatchCircuitBreaker(sql); + if (cbFailures > 0) { + console.warn( + `${LOG} circuit breaker OPEN: ${cbFailures} dispatch attempts in last 30min (threshold=${CIRCUIT_BREAKER_FAILURE_THRESHOLD}). Skipping all dispatches.` + ); + actions.push({ + type: 'notify_mayor', + message: `Dispatch circuit breaker is OPEN: ${cbFailures} dispatch attempts across all beads in the last 30 minutes exceeds threshold of ${CIRCUIT_BREAKER_FAILURE_THRESHOLD}. All dispatches are paused until the window expires.`, + }); + // Still run non-dispatch rules (stale bead cleanup, in-review checks) + // but skip Rule 1 and Rule 2 (dispatch paths) + return [...actions, ...reconcileBeadsNonDispatch(sql)]; + } + // Rule 1: Open issue beads with no assignee, no blockers, not staged, not triage const unassigned = BeadRow.array().parse([ ...query( @@ -470,7 +522,9 @@ export function reconcileBeads(sql: SqlStorage): Action[] { b.${beads.columns.assignee_agent_bead_id}, b.${beads.columns.updated_at}, b.${beads.columns.labels}, - b.${beads.columns.created_by} + b.${beads.columns.created_by}, + b.${beads.columns.dispatch_attempts}, + b.${beads.columns.last_dispatch_attempt_at} FROM ${beads} b WHERE b.${beads.columns.type} = 'issue' AND b.${beads.columns.status} = 'open' @@ -498,6 +552,24 @@ export function reconcileBeads(sql: SqlStorage): Action[] { for (const bead of unassigned) { if (!bead.rig_id) continue; + + // Per-bead dispatch cap (#1653): fail beads that have exhausted retries + if (bead.dispatch_attempts >= MAX_DISPATCH_ATTEMPTS) { + actions.push({ + type: 'transition_bead', + bead_id: bead.bead_id, + from: 'open', + to: 'failed', + reason: `max dispatch attempts exceeded (${bead.dispatch_attempts}/${MAX_DISPATCH_ATTEMPTS})`, + actor: 'system', + }); + continue; + } + + // Exponential backoff: respect cooldown based on attempt count (#1653) + const backoffMs = getDispatchBackoffMs(bead.dispatch_attempts); + if (!staleMs(bead.last_dispatch_attempt_at, backoffMs)) continue; + // In shadow mode we can't call getOrCreateAgent, so we just note // that a hook_agent + dispatch_agent is needed. // The action includes rig_id so Phase 3's applyAction can resolve the agent. @@ -532,36 +604,21 @@ export function reconcileBeads(sql: SqlStorage): Action[] { for (const agent of idleHooked) { if (!agent.current_hook_bead_id) continue; - // Check dispatch cooldown - if (!staleMs(agent.last_activity_at, DISPATCH_COOLDOWN_MS)) continue; - - // Check max dispatch attempts - if (agent.dispatch_attempts >= MAX_DISPATCH_ATTEMPTS) { - actions.push({ - type: 'transition_bead', - bead_id: agent.current_hook_bead_id, - from: null, - to: 'failed', - reason: 'max dispatch attempts exceeded', - actor: 'system', - }); - actions.push({ - type: 'unhook_agent', - agent_id: agent.bead_id, - reason: 'max dispatch attempts', - }); - continue; - } - - // Check if the hooked bead is open and unblocked + // Check if the hooked bead is open and unblocked, and read bead-level dispatch tracking const hookedRows = z - .object({ status: z.string(), rig_id: z.string().nullable() }) + .object({ + status: z.string(), + rig_id: z.string().nullable(), + dispatch_attempts: z.number().default(0), + last_dispatch_attempt_at: z.string().nullable().default(null), + }) .array() .parse([ ...query( sql, /* sql */ ` - SELECT ${beads.status}, ${beads.rig_id} + SELECT ${beads.status}, ${beads.rig_id}, + ${beads.dispatch_attempts}, ${beads.last_dispatch_attempt_at} FROM ${beads} WHERE ${beads.bead_id} = ? `, @@ -573,6 +630,28 @@ export function reconcileBeads(sql: SqlStorage): Action[] { const hooked = hookedRows[0]; if (hooked.status !== 'open') continue; + // Per-bead dispatch cap (#1653): use bead.dispatch_attempts, not agent's + if (hooked.dispatch_attempts >= MAX_DISPATCH_ATTEMPTS) { + actions.push({ + type: 'transition_bead', + bead_id: agent.current_hook_bead_id, + from: null, + to: 'failed', + reason: `max dispatch attempts exceeded (${hooked.dispatch_attempts}/${MAX_DISPATCH_ATTEMPTS})`, + actor: 'system', + }); + actions.push({ + type: 'unhook_agent', + agent_id: agent.bead_id, + reason: 'max dispatch attempts', + }); + continue; + } + + // Exponential backoff based on bead attempt count (#1653) + const backoffMs = getDispatchBackoffMs(hooked.dispatch_attempts); + if (!staleMs(hooked.last_dispatch_attempt_at, backoffMs)) continue; + // Check blockers const blockerCount = z .object({ cnt: z.number() }) @@ -602,6 +681,15 @@ export function reconcileBeads(sql: SqlStorage): Action[] { }); } + actions.push(...reconcileBeadsNonDispatch(sql)); + + return actions; +} + +/** Rules 3 + 4 of reconcileBeads — separated so the circuit breaker can skip dispatch rules. */ +function reconcileBeadsNonDispatch(sql: SqlStorage): Action[] { + const actions: Action[] = []; + // Rule 3: In-progress issue beads with no working/stalled agent const staleInProgress = BeadRow.array().parse([ ...query( @@ -612,7 +700,9 @@ export function reconcileBeads(sql: SqlStorage): Action[] { b.${beads.columns.assignee_agent_bead_id}, b.${beads.columns.updated_at}, b.${beads.columns.labels}, - b.${beads.columns.created_by} + b.${beads.columns.created_by}, + b.${beads.columns.dispatch_attempts}, + b.${beads.columns.last_dispatch_attempt_at} FROM ${beads} b WHERE b.${beads.columns.type} = 'issue' AND b.${beads.columns.status} = 'in_progress' @@ -649,6 +739,24 @@ export function reconcileBeads(sql: SqlStorage): Action[] { if (hookedAgent.length > 0) continue; + // Fix 4 (#1653): beads at max dispatch attempts should transition + // to 'failed' instead of 'open' to prevent infinite retry loops. + if (bead.dispatch_attempts >= MAX_DISPATCH_ATTEMPTS) { + actions.push({ + type: 'transition_bead', + bead_id: bead.bead_id, + from: 'in_progress', + to: 'failed', + reason: `agent lost, max dispatch attempts exhausted (${bead.dispatch_attempts}/${MAX_DISPATCH_ATTEMPTS})`, + actor: 'system', + }); + actions.push({ + type: 'clear_bead_assignee', + bead_id: bead.bead_id, + }); + continue; + } + actions.push({ type: 'transition_bead', bead_id: bead.bead_id, @@ -673,7 +781,9 @@ export function reconcileBeads(sql: SqlStorage): Action[] { b.${beads.columns.assignee_agent_bead_id}, b.${beads.columns.updated_at}, b.${beads.columns.labels}, - b.${beads.columns.created_by} + b.${beads.columns.created_by}, + b.${beads.columns.dispatch_attempts}, + b.${beads.columns.last_dispatch_attempt_at} FROM ${beads} b WHERE b.${beads.columns.type} = 'issue' AND b.${beads.columns.status} = 'in_review' @@ -746,6 +856,9 @@ export function reconcileBeads(sql: SqlStorage): Action[] { export function reconcileReviewQueue(sql: SqlStorage): Action[] { const actions: Action[] = []; + // Town-level circuit breaker (#1653): skip dispatch-related rules + const circuitBreakerOpen = checkDispatchCircuitBreaker(sql) > 0; + // Get all MR beads that need attention const mrBeads = MrBeadRow.array().parse([ ...query( @@ -847,6 +960,9 @@ export function reconcileReviewQueue(sql: SqlStorage): Action[] { } } + // Skip dispatch rules when circuit breaker is open + if (circuitBreakerOpen) return actions; + // Rule 5: Pop open MR bead for idle refinery // Get all rigs that have open MR beads const rigsWithOpenMrs = z @@ -1000,39 +1116,21 @@ export function reconcileReviewQueue(sql: SqlStorage): Action[] { for (const ref of idleRefineries) { if (!ref.current_hook_bead_id) continue; - // Cooldown: skip if last activity is too recent (#1342) - if (!staleMs(ref.last_activity_at, DISPATCH_COOLDOWN_MS)) continue; - - // Circuit-breaker: fail the MR bead after too many attempts (#1342) - if (ref.dispatch_attempts >= MAX_DISPATCH_ATTEMPTS) { - actions.push({ - type: 'transition_bead', - bead_id: ref.current_hook_bead_id, - from: null, - to: 'failed', - reason: 'refinery max dispatch attempts exceeded', - actor: 'system', - }); - actions.push({ - type: 'unhook_agent', - agent_id: ref.bead_id, - reason: 'max dispatch attempts', - }); - continue; - } - const mrRows = z .object({ status: z.string(), type: z.string(), rig_id: z.string().nullable(), + dispatch_attempts: z.number().default(0), + last_dispatch_attempt_at: z.string().nullable().default(null), }) .array() .parse([ ...query( sql, /* sql */ ` - SELECT ${beads.status}, ${beads.type}, ${beads.rig_id} + SELECT ${beads.status}, ${beads.type}, ${beads.rig_id}, + ${beads.dispatch_attempts}, ${beads.last_dispatch_attempt_at} FROM ${beads} WHERE ${beads.bead_id} = ? `, @@ -1044,6 +1142,28 @@ export function reconcileReviewQueue(sql: SqlStorage): Action[] { const mr = mrRows[0]; if (mr.type !== 'merge_request' || mr.status !== 'in_progress') continue; + // Per-bead dispatch cap (#1653): use bead.dispatch_attempts, not agent's + if (mr.dispatch_attempts >= MAX_DISPATCH_ATTEMPTS) { + actions.push({ + type: 'transition_bead', + bead_id: ref.current_hook_bead_id, + from: null, + to: 'failed', + reason: `refinery max dispatch attempts exceeded (${mr.dispatch_attempts}/${MAX_DISPATCH_ATTEMPTS})`, + actor: 'system', + }); + actions.push({ + type: 'unhook_agent', + agent_id: ref.bead_id, + reason: 'max dispatch attempts', + }); + continue; + } + + // Exponential backoff based on bead attempt count (#1653) + const backoffMs = getDispatchBackoffMs(mr.dispatch_attempts); + if (!staleMs(mr.last_dispatch_attempt_at, backoffMs)) continue; + // Container status is checked at apply time (async). In shadow mode, // we just note that a dispatch is needed. actions.push({ diff --git a/cloudflare-gastown/src/dos/town/scheduling.ts b/cloudflare-gastown/src/dos/town/scheduling.ts index 2c48347b5..c1e913607 100644 --- a/cloudflare-gastown/src/dos/town/scheduling.ts +++ b/cloudflare-gastown/src/dos/town/scheduling.ts @@ -23,7 +23,18 @@ const LOG = '[scheduling]'; // ── Constants ────────────────────────────────────────────────────────── export const DISPATCH_COOLDOWN_MS = 2 * 60_000; // 2 min -export const MAX_DISPATCH_ATTEMPTS = 20; +export const MAX_DISPATCH_ATTEMPTS = 5; + +/** + * Exponential backoff schedule for dispatch retries, indexed by attempt number. + * Caps total retry window at ~1h for 5 attempts. + */ +export function getDispatchBackoffMs(attempts: number): number { + if (attempts <= 2) return DISPATCH_COOLDOWN_MS; // 2 min (existing behavior) + if (attempts === 3) return 5 * 60_000; // 5 min + if (attempts === 4) return 10 * 60_000; // 10 min + return 30 * 60_000; // 30 min (attempt 5+) +} // ── Context passed by the Town DO ────────────────────────────────────── @@ -109,6 +120,18 @@ export async function dispatchAgent( `, [timestamp, agent.id] ); + // Track dispatch attempts on the bead itself — the bead counter + // is never reset by hookBead, preventing infinite retry loops (#1653). + query( + ctx.sql, + /* sql */ ` + UPDATE ${beads} + SET ${beads.columns.dispatch_attempts} = ${beads.columns.dispatch_attempts} + 1, + ${beads.columns.last_dispatch_attempt_at} = ? + WHERE ${beads.bead_id} = ? + `, + [timestamp, bead.bead_id] + ); const started = await dispatch.startAgentInContainer(ctx.env, ctx.storage, { townId: ctx.townId, @@ -170,6 +193,7 @@ export async function dispatchAgent( agentId: agent.id, beadId: bead.bead_id, role: agent.role, + label: 'container returned false', }); } return started; @@ -178,6 +202,7 @@ export async function dispatchAgent( Sentry.captureException(err, { extra: { agentId: agent.id, beadId: bead.bead_id }, }); + const reason = err instanceof Error ? err.message : String(err); try { query( ctx.sql, @@ -199,6 +224,7 @@ export async function dispatchAgent( agentId: agent.id, beadId: bead.bead_id, role: agent.role, + label: reason, }); return false; }