diff --git a/cloudflare-gastown/container/src/control-server.ts b/cloudflare-gastown/container/src/control-server.ts index 70ba58c62..2e7d4793e 100644 --- a/cloudflare-gastown/container/src/control-server.ts +++ b/cloudflare-gastown/container/src/control-server.ts @@ -10,10 +10,11 @@ import { activeServerCount, getUptime, stopAll, + drainAll, getAgentEvents, registerEventSink, } from './process-manager'; -import { startHeartbeat, stopHeartbeat } from './heartbeat'; +import { startHeartbeat, stopHeartbeat, notifyContainerReady } from './heartbeat'; import { pushContext as pushDashboardContext } from './dashboard-context'; import { mergeBranch, setupRigBrowseWorktree } from './git-manager'; import { @@ -92,6 +93,15 @@ app.use('*', async (c, next) => { // GET /health app.get('/health', c => { + // When the TownDO is draining, it passes the drain nonce and town + // ID via headers so idle containers (no running agents) can + // acknowledge readiness and clear the drain flag. + const drainNonce = c.req.header('X-Drain-Nonce'); + const townId = c.req.header('X-Town-Id'); + if (drainNonce && townId) { + void notifyContainerReady(townId, drainNonce); + } + const response: HealthResponse = { status: 'ok', agents: activeAgentCount(), @@ -723,7 +733,7 @@ export function startControlServer(): void { startHeartbeat(apiUrl, authToken); } - // Handle graceful shutdown + // Handle graceful shutdown (immediate, no drain — used by SIGINT for dev) const shutdown = async () => { console.log('Shutting down control server...'); stopHeartbeat(); @@ -731,7 +741,18 @@ export function startControlServer(): void { process.exit(0); }; - process.on('SIGTERM', () => void shutdown()); + process.on( + 'SIGTERM', + () => + void (async () => { + console.log('[control-server] SIGTERM received — starting graceful drain...'); + stopHeartbeat(); + await drainAll(); + await stopAll(); + process.exit(0); + })() + ); + process.on('SIGINT', () => void shutdown()); // Track connected WebSocket clients with optional agent filter diff --git a/cloudflare-gastown/container/src/heartbeat.ts b/cloudflare-gastown/container/src/heartbeat.ts index bd9dd8db3..defbe7897 100644 --- a/cloudflare-gastown/container/src/heartbeat.ts +++ b/cloudflare-gastown/container/src/heartbeat.ts @@ -6,6 +6,8 @@ const HEARTBEAT_INTERVAL_MS = 30_000; let heartbeatTimer: ReturnType | null = null; let gastownApiUrl: string | null = null; let sessionToken: string | null = null; +/** Set once we've successfully acknowledged container-ready. */ +let containerReadyAcknowledged = false; /** * Configure and start the heartbeat reporter. @@ -38,6 +40,49 @@ export function stopHeartbeat(): void { console.log('Heartbeat reporter stopped'); } +/** + * Notify the TownDO that the replacement container is ready. + * Exported so the health endpoint can trigger it when the TownDO + * passes the drain nonce via headers (handles idle containers that + * have no running agents and thus no per-agent heartbeats). + */ +export async function notifyContainerReady(townId: string, drainNonce: string): Promise { + if (containerReadyAcknowledged) return; + await acknowledgeContainerReady(townId, drainNonce); +} + +/** + * Call POST /container-ready to acknowledge that this is a fresh + * container replacing an evicted one. Clears the TownDO drain flag + * so the reconciler can resume dispatching. + */ +async function acknowledgeContainerReady(townId: string, drainNonce: string): Promise { + const apiUrl = gastownApiUrl ?? process.env.GASTOWN_API_URL; + const currentToken = process.env.GASTOWN_CONTAINER_TOKEN ?? sessionToken; + if (!apiUrl || !currentToken) return; + + try { + const response = await fetch(`${apiUrl}/api/towns/${townId}/container-ready`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${currentToken}`, + }, + body: JSON.stringify({ nonce: drainNonce }), + }); + if (response.ok) { + containerReadyAcknowledged = true; + console.log(`[heartbeat] container-ready acknowledged for town=${townId}`); + } else { + console.warn( + `[heartbeat] container-ready failed for town=${townId}: ${response.status} ${response.statusText}` + ); + } + } catch (err) { + console.warn(`[heartbeat] container-ready error for town=${townId}:`, err); + } +} + async function sendHeartbeats(): Promise { // Prefer the live container token (refreshed via POST /refresh-token) // over the token captured at startHeartbeat() time. @@ -46,6 +91,12 @@ async function sendHeartbeats(): Promise { const active = listAgents().filter(a => a.status === 'running' || a.status === 'starting'); + // When no agents are active, the per-agent heartbeat loop has + // nothing to send. Idle container drain acknowledgment is handled + // by the /health endpoint instead (the TownDO passes the nonce via + // X-Drain-Nonce headers in ensureContainerReady). + if (active.length === 0) return; + for (const agent of active) { const payload: HeartbeatPayload = { agentId: agent.agentId, @@ -77,6 +128,18 @@ async function sendHeartbeats(): Promise { console.warn( `Heartbeat failed for agent ${agent.agentId}: ${response.status} ${response.statusText}` ); + } else if (!containerReadyAcknowledged) { + // If the TownDO is draining, the heartbeat response includes a + // drainNonce. Use it to call /container-ready and clear drain. + try { + const body = (await response.json()) as { data?: { drainNonce?: string } }; + const nonce = body?.data?.drainNonce; + if (nonce) { + void acknowledgeContainerReady(agent.townId, nonce); + } + } catch { + // Non-JSON or unexpected shape — ignore + } } } catch (err) { console.warn(`Heartbeat error for agent ${agent.agentId}:`, err); diff --git a/cloudflare-gastown/container/src/process-manager.ts b/cloudflare-gastown/container/src/process-manager.ts index f0008e320..eca69e631 100644 --- a/cloudflare-gastown/container/src/process-manager.ts +++ b/cloudflare-gastown/container/src/process-manager.ts @@ -973,6 +973,199 @@ export function activeServerCount(): number { return sdkInstances.size; } +/** + * Gracefully drain all running agents before container eviction. + * + * 4-phase sequence: + * 1. Notify TownDO of the eviction (fire-and-forget) + * 2. Nudge running polecats/refineries to commit & push + * 3. Poll up to 10 min waiting for agents to finish + * 4. Force-save any stragglers via WIP git commit + push + * + * Never throws — all errors are logged and swallowed so the caller + * can always proceed to stopAll() + process.exit(). + */ +export async function drainAll(): Promise { + const DRAIN_LOG = '[drain]'; + + // ── Phase 1: Notify TownDO ────────────────────────────────────────── + try { + const apiUrl = process.env.GASTOWN_API_URL; + const token = process.env.GASTOWN_CONTAINER_TOKEN; + // Grab townId from any registered agent — all agents in a container + // belong to the same town. + const anyAgent = [...agents.values()][0]; + const townId = anyAgent?.townId; + + if (apiUrl && token && townId) { + console.log(`${DRAIN_LOG} Phase 1: notifying TownDO of container eviction`); + const resp = await fetch(`${apiUrl}/api/towns/${townId}/container-eviction`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${token}`, + }, + signal: AbortSignal.timeout(10_000), + }); + console.log(`${DRAIN_LOG} Phase 1: TownDO responded ${resp.status}`); + } else { + console.warn( + `${DRAIN_LOG} Phase 1: skipping TownDO notification (missing apiUrl=${!!apiUrl} token=${!!token} townId=${!!townId})` + ); + } + } catch (err) { + console.warn(`${DRAIN_LOG} Phase 1: TownDO notification failed, continuing:`, err); + } + + // ── Phase 2: Nudge running agents to save ─────────────────────────── + const runningAgents = [...agents.values()].filter(a => a.status === 'running'); + console.log(`${DRAIN_LOG} Phase 2: nudging ${runningAgents.length} running agents`); + + for (const agent of runningAgents) { + try { + let nudgeMessage: string | null = null; + + if (agent.role === 'polecat') { + nudgeMessage = + 'URGENT: The container is shutting down in ~15 minutes. Please commit and push your current changes immediately, then call gt_done. You have 2 minutes before a forced save.'; + } else if (agent.role === 'refinery') { + nudgeMessage = + 'URGENT: The container is shutting down. If your review is complete, call gt_done now. Otherwise your work will be pushed as a WIP commit.'; + } + // Mayor and other roles: no nudge needed + + if (nudgeMessage) { + // Cancel the idle timer before nudging — if the agent was + // already idle, the timer could fire mid-nudge and exit the + // agent before it processes the eviction message. + clearIdleTimer(agent.agentId); + console.log(`${DRAIN_LOG} Phase 2: nudging ${agent.role} agent ${agent.agentId}`); + await sendMessage(agent.agentId, nudgeMessage); + } + } catch (err) { + console.warn( + `${DRAIN_LOG} Phase 2: failed to nudge agent ${agent.agentId} (${agent.role}):`, + err + ); + } + } + + // ── Phase 3: Wait up to 10 minutes ────────────────────────────────── + const DRAIN_WAIT_MS = 10 * 60 * 1000; + const pollInterval = 5000; + const start = Date.now(); + console.log(`${DRAIN_LOG} Phase 3: waiting up to ${DRAIN_WAIT_MS / 1000}s for agents to finish`); + + while (Date.now() - start < DRAIN_WAIT_MS) { + const running = [...agents.values()].filter(a => a.status === 'running'); + if (running.length === 0) break; + console.log(`${DRAIN_LOG} Waiting for ${running.length} agents...`); + await new Promise(r => setTimeout(r, pollInterval)); + } + + // ── Phase 4: Force-save remaining agents ──────────────────────────── + // Two sub-steps: first freeze all stragglers (cancel idle timers, + // abort event subscriptions and SDK sessions), then snapshot each + // worktree. Freezing first prevents the normal completion path + // (idle timer → onExit → bead completion) from racing with the WIP + // git save, and avoids .git/index.lock collisions with agent git ops. + const stragglers = [...agents.values()].filter(a => a.status === 'running'); + if (stragglers.length > 0) { + console.log(`${DRAIN_LOG} Phase 4: freezing ${stragglers.length} straggler(s)`); + } else { + console.log(`${DRAIN_LOG} Phase 4: all agents finished, no force-save needed`); + } + + // 4a: Freeze — cancel idle timers and abort sessions so no + // completion/exit callbacks can fire during the git snapshot. + // Only agents that freeze successfully are safe to snapshot. + const frozen: typeof stragglers = []; + for (const agent of stragglers) { + try { + // Cancel idle timer FIRST — prevents the timer from firing and + // marking the agent as completed via onExit() while we abort. + clearIdleTimer(agent.agentId); + + // Abort event subscription + const controller = eventAbortControllers.get(agent.agentId); + if (controller) { + controller.abort(); + eventAbortControllers.delete(agent.agentId); + } + + // Abort the SDK session + const instance = sdkInstances.get(agent.workdir); + if (instance) { + await instance.client.session.abort({ + path: { id: agent.sessionId }, + }); + } + + agent.status = 'exited'; + agent.exitReason = 'container eviction'; + frozen.push(agent); + console.log(`${DRAIN_LOG} Phase 4: froze agent ${agent.agentId}`); + } catch (err) { + // Freeze failed — the session may still be writing to the + // worktree. Skip this agent in 4b to avoid .git/index.lock + // races and partial snapshots. + console.warn( + `${DRAIN_LOG} Phase 4: failed to freeze agent ${agent.agentId}, skipping snapshot:`, + err + ); + } + } + + // 4b: Snapshot — git add/commit/push each worktree now that + // all sessions are frozen. Only iterate agents that froze + // successfully; unfrozen agents are skipped to avoid racing + // with a still-active SDK session. + for (const agent of frozen) { + try { + console.log(`${DRAIN_LOG} Phase 4: force-saving agent ${agent.agentId} in ${agent.workdir}`); + + // Check whether a remote named "origin" exists. Lightweight + // workspaces (mayor/triage) are created with `git init` and + // never add a remote, so pushing would fail with + // "fatal: 'origin' does not appear to be a git repository". + const remoteCheck = Bun.spawn(['git', 'remote', 'get-url', 'origin'], { + cwd: agent.workdir, + stdout: 'pipe', + stderr: 'pipe', + }); + const hasOrigin = (await remoteCheck.exited) === 0; + + const gitCmd = hasOrigin + ? "git add -A && git commit --allow-empty -m 'WIP: container eviction save' && git push --set-upstream origin HEAD --no-verify" + : "git add -A && git commit --allow-empty -m 'WIP: container eviction save'"; + + if (!hasOrigin) { + console.warn( + `${DRAIN_LOG} Phase 4: no origin remote for agent ${agent.agentId}, committing locally only (push skipped)` + ); + } + + const proc = Bun.spawn(['bash', '-c', gitCmd], { + cwd: agent.workdir, + stdout: 'pipe', + stderr: 'pipe', + }); + const exitCode = await proc.exited; + const stdout = await new Response(proc.stdout).text(); + const stderr = await new Response(proc.stderr).text(); + console.log( + `${DRAIN_LOG} Phase 4: agent ${agent.agentId} git save exited ${exitCode}` + + (stdout ? ` stdout=${stdout.trim()}` : '') + + (stderr ? ` stderr=${stderr.trim()}` : '') + ); + } catch (err) { + console.warn(`${DRAIN_LOG} Phase 4: force-save failed for agent ${agent.agentId}:`, err); + } + } + + console.log(`${DRAIN_LOG} Drain complete`); +} + export async function stopAll(): Promise { // Cancel all idle timers for (const [, timer] of idleTimers) { diff --git a/cloudflare-gastown/src/db/tables/town-events.table.ts b/cloudflare-gastown/src/db/tables/town-events.table.ts index 95309e7a0..30be09c65 100644 --- a/cloudflare-gastown/src/db/tables/town-events.table.ts +++ b/cloudflare-gastown/src/db/tables/town-events.table.ts @@ -5,6 +5,7 @@ export const TownEventType = z.enum([ 'agent_done', 'agent_completed', 'container_status', + 'container_eviction', 'pr_status_changed', 'bead_created', 'bead_cancelled', diff --git a/cloudflare-gastown/src/dos/Town.do.ts b/cloudflare-gastown/src/dos/Town.do.ts index b3eeaa158..7cccd35ad 100644 --- a/cloudflare-gastown/src/dos/Town.do.ts +++ b/cloudflare-gastown/src/dos/Town.do.ts @@ -504,6 +504,11 @@ export class TownDO extends DurableObject { const townConfig = await config.getTownConfig(this.ctx.storage); this._ownerUserId = townConfig.owner_user_id; + // Load persisted draining flag, nonce, and start time + this._draining = (await this.ctx.storage.get('town:draining')) ?? false; + this._drainNonce = (await this.ctx.storage.get('town:drainNonce')) ?? null; + this._drainStartedAt = (await this.ctx.storage.get('town:drainStartedAt')) ?? null; + // All tables are now initialized via beads.initBeadTables(): // beads, bead_events, bead_dependencies, agent_metadata, review_metadata, // escalation_metadata, convoy_metadata @@ -537,6 +542,9 @@ export class TownDO extends DurableObject { private _townId: string | null = null; private _lastReconcilerMetrics: reconciler.ReconcilerMetrics | null = null; private _dashboardContext: string | null = null; + private _draining = false; + private _drainNonce: string | null = null; + private _drainStartedAt: number | null = null; private get townId(): string { return this._townId ?? this.ctx.id.name ?? this.ctx.id.toString(); @@ -563,6 +571,72 @@ export class TownDO extends DurableObject { return this._dashboardContext; } + // ══════════════════════════════════════════════════════════════════ + // Container Eviction (graceful drain) + // ══════════════════════════════════════════════════════════════════ + + /** + * Record a container eviction event and set the draining flag. + * Called by the container when it receives SIGTERM. While draining, + * the reconciler skips dispatch to prevent new work from starting. + * + * Returns a drain nonce that must be presented via + * `acknowledgeContainerReady()` to clear the drain flag. This + * prevents stale heartbeats from the dying container from + * prematurely re-enabling dispatch. + */ + async recordContainerEviction(): Promise { + events.insertEvent(this.sql, 'container_eviction', {}); + const nonce = crypto.randomUUID(); + const startedAt = Date.now(); + this._draining = true; + this._drainNonce = nonce; + this._drainStartedAt = startedAt; + await this.ctx.storage.put('town:draining', true); + await this.ctx.storage.put('town:drainNonce', nonce); + await this.ctx.storage.put('town:drainStartedAt', startedAt); + console.log(`${TOWN_LOG} recordContainerEviction: draining flag set, nonce=${nonce}`); + return nonce; + } + + /** + * Acknowledge that the replacement container is ready. Clears the + * draining flag only if the provided nonce matches the one generated + * during `recordContainerEviction()`. This ensures that only the + * new container (which received the nonce via startup config) can + * re-enable dispatch — not a stale heartbeat from the old container. + */ + async acknowledgeContainerReady(nonce: string): Promise { + if (!this._draining) { + console.log(`${TOWN_LOG} acknowledgeContainerReady: not draining, noop`); + return true; + } + if (nonce !== this._drainNonce) { + console.warn( + `${TOWN_LOG} acknowledgeContainerReady: nonce mismatch (got=${nonce}, expected=${this._drainNonce})` + ); + return false; + } + this._draining = false; + this._drainNonce = null; + this._drainStartedAt = null; + await this.ctx.storage.put('town:draining', false); + await this.ctx.storage.delete('town:drainNonce'); + await this.ctx.storage.delete('town:drainStartedAt'); + console.log(`${TOWN_LOG} acknowledgeContainerReady: draining flag cleared`); + return true; + } + + /** Whether the town is in draining mode (container eviction in progress). */ + async isDraining(): Promise { + return this._draining; + } + + /** The current drain nonce (null when not draining). */ + async getDrainNonce(): Promise { + return this._drainNonce; + } + // ══════════════════════════════════════════════════════════════════ // Town Configuration // ══════════════════════════════════════════════════════════════════ @@ -1105,6 +1179,13 @@ export class TownDO extends DurableObject { // ── Heartbeat ───────────────────────────────────────────────────── + /** + * Update an agent's heartbeat timestamp. Returns the current drain + * nonce (if draining) so the caller can include it in the HTTP + * response without a second RPC — preventing a TOCTOU race where + * an in-flight heartbeat from the old container could observe a + * nonce generated between two separate DO calls. + */ async touchAgentHeartbeat( agentId: string, watermark?: { @@ -1112,9 +1193,10 @@ export class TownDO extends DurableObject { lastEventAt?: string | null; activeTools?: string[]; } - ): Promise { + ): Promise<{ drainNonce: string | null }> { agents.touchAgent(this.sql, agentId, watermark); await this.armAlarmIfNeeded(); + return { drainNonce: this._drainNonce }; } async updateAgentStatusMessage(agentId: string, message: string): Promise { @@ -3113,10 +3195,29 @@ export class TownDO extends DurableObject { Sentry.captureException(err); } + // Auto-clear drain flag if it has been active for too long. + // The drain sequence (drainAll) waits up to 10 minutes, so 15 + // minutes is a generous upper bound. After this timeout the old + // container is certainly dead and it is safe to resume dispatch. + const DRAIN_TIMEOUT_MS = 15 * 60 * 1000; + if ( + this._draining && + this._drainStartedAt && + Date.now() - this._drainStartedAt > DRAIN_TIMEOUT_MS + ) { + this._draining = false; + this._drainNonce = null; + this._drainStartedAt = null; + await this.ctx.storage.put('town:draining', false); + await this.ctx.storage.delete('town:drainNonce'); + await this.ctx.storage.delete('town:drainStartedAt'); + logger.info('reconciler: drain timeout exceeded, auto-clearing draining flag'); + } + // Phase 1: Reconcile — compute desired state vs actual state const sideEffects: Array<() => Promise> = []; try { - const actions = reconciler.reconcile(this.sql); + const actions = reconciler.reconcile(this.sql, { draining: this._draining }); metrics.actionsEmitted = actions.length; for (const a of actions) { metrics.actionsByType[a.type] = (metrics.actionsByType[a.type] ?? 0) + 1; @@ -3648,7 +3749,7 @@ export class TownDO extends DurableObject { if (!hasRigs) return; const hasWork = this.hasActiveWork(); - if (!hasWork) { + if (!hasWork && !this._draining) { const rigList = rigs.listRigs(this.sql); const newestRigAge = rigList.reduce((min, r) => { const age = Date.now() - new Date(r.created_at).getTime(); @@ -3663,8 +3764,27 @@ export class TownDO extends DurableObject { try { const container = getTownContainerStub(this.env, townId); + const headers: Record = {}; + // When draining AND enough time has passed for the old container + // to have exited (drainAll waits up to 10 min + exit), pass the + // nonce so the replacement container can acknowledge readiness. + // We only send the nonce after 11 minutes to avoid the old + // (still-draining) container receiving it and clearing drain + // prematurely — the health check goes to whichever container is + // currently serving this town. + const DRAIN_HANDOFF_DELAY_MS = 11 * 60 * 1000; + if ( + this._draining && + this._drainNonce && + this._drainStartedAt && + Date.now() - this._drainStartedAt > DRAIN_HANDOFF_DELAY_MS + ) { + headers['X-Drain-Nonce'] = this._drainNonce; + headers['X-Town-Id'] = townId; + } await container.fetch('http://container/health', { signal: AbortSignal.timeout(5_000), + headers, }); } catch { // Container is starting up or unavailable — alarm will retry diff --git a/cloudflare-gastown/src/dos/town/reconciler.ts b/cloudflare-gastown/src/dos/town/reconciler.ts index e9fef8d21..404d71d87 100644 --- a/cloudflare-gastown/src/dos/town/reconciler.ts +++ b/cloudflare-gastown/src/dos/town/reconciler.ts @@ -290,6 +290,13 @@ export function applyEvent(sql: SqlStorage, event: TownEventRecord): void { return; } + case 'container_eviction': { + // Draining flag is managed by the TownDO via KV storage. + // The reconciler reads it from there; no SQL state change needed here. + // The event is recorded for audit trail. + return; + } + case 'nudge_timeout': { // GUPP violations are handled by reconcileGUPP on the next pass. // The event just records the fact for audit trail. @@ -306,11 +313,12 @@ export function applyEvent(sql: SqlStorage, event: TownEventRecord): void { // Top-level reconcile // ════════════════════════════════════════════════════════════════════ -export function reconcile(sql: SqlStorage): Action[] { +export function reconcile(sql: SqlStorage, opts?: { draining?: boolean }): Action[] { + const draining = opts?.draining ?? false; const actions: Action[] = []; actions.push(...reconcileAgents(sql)); - actions.push(...reconcileBeads(sql)); - actions.push(...reconcileReviewQueue(sql)); + actions.push(...reconcileBeads(sql, { draining })); + actions.push(...reconcileReviewQueue(sql, { draining })); actions.push(...reconcileConvoys(sql)); actions.push(...reconcileGUPP(sql)); actions.push(...reconcileGC(sql)); @@ -457,7 +465,8 @@ export function reconcileAgents(sql: SqlStorage): Action[] { // reconcileBeads — handle unassigned beads, lost agents, stale reviews // ════════════════════════════════════════════════════════════════════ -export function reconcileBeads(sql: SqlStorage): Action[] { +export function reconcileBeads(sql: SqlStorage, opts?: { draining?: boolean }): Action[] { + const draining = opts?.draining ?? false; const actions: Action[] = []; // Rule 1: Open issue beads with no assignee, no blockers, not staged, not triage @@ -498,6 +507,10 @@ export function reconcileBeads(sql: SqlStorage): Action[] { for (const bead of unassigned) { if (!bead.rig_id) continue; + if (draining) { + console.log(`${LOG} Town is draining, skipping dispatch for bead ${bead.bead_id}`); + continue; + } // In shadow mode we can't call getOrCreateAgent, so we just note // that a hook_agent + dispatch_agent is needed. // The action includes rig_id so Phase 3's applyAction can resolve the agent. @@ -594,6 +607,13 @@ export function reconcileBeads(sql: SqlStorage): Action[] { if (blockerCount[0]?.cnt > 0) continue; + if (draining) { + console.log( + `${LOG} Town is draining, skipping dispatch for bead ${agent.current_hook_bead_id}` + ); + continue; + } + actions.push({ type: 'dispatch_agent', agent_id: agent.bead_id, @@ -743,7 +763,8 @@ export function reconcileBeads(sql: SqlStorage): Action[] { // refinery dispatch // ════════════════════════════════════════════════════════════════════ -export function reconcileReviewQueue(sql: SqlStorage): Action[] { +export function reconcileReviewQueue(sql: SqlStorage, opts?: { draining?: boolean }): Action[] { + const draining = opts?.draining ?? false; const actions: Action[] = []; // Get all MR beads that need attention @@ -933,6 +954,12 @@ export function reconcileReviewQueue(sql: SqlStorage): Action[] { if (oldestMr.length === 0) continue; + // Skip dispatch if the town is draining (container eviction in progress) + if (draining) { + console.log(`${LOG} Town is draining, skipping dispatch for bead ${oldestMr[0].bead_id}`); + continue; + } + // If no refinery exists or it's busy, emit a dispatch_agent with empty // agent_id — applyAction will create the refinery via getOrCreateAgent. if (refinery.length === 0) { @@ -1044,6 +1071,13 @@ export function reconcileReviewQueue(sql: SqlStorage): Action[] { const mr = mrRows[0]; if (mr.type !== 'merge_request' || mr.status !== 'in_progress') continue; + if (draining) { + console.log( + `${LOG} Town is draining, skipping dispatch for bead ${ref.current_hook_bead_id}` + ); + continue; + } + // Container status is checked at apply time (async). In shadow mode, // we just note that a dispatch is needed. actions.push({ diff --git a/cloudflare-gastown/src/gastown.worker.ts b/cloudflare-gastown/src/gastown.worker.ts index 32d676e0a..2f0738c86 100644 --- a/cloudflare-gastown/src/gastown.worker.ts +++ b/cloudflare-gastown/src/gastown.worker.ts @@ -134,6 +134,11 @@ import { handleListEscalations, handleAcknowledgeEscalation, } from './handlers/town-escalations.handler'; +import { + handleContainerEviction, + handleContainerReady, + handleDrainStatus, +} from './handlers/town-eviction.handler'; export { GastownUserDO } from './dos/GastownUser.do'; export { GastownOrgDO } from './dos/GastownOrg.do'; @@ -478,6 +483,27 @@ app.post('/api/towns/:townId/rigs/:rigId/triage/resolve', c => ) ); +// ── Container Eviction ────────────────────────────────────────────────── +// Called by the container on SIGTERM. Uses container JWT auth (not kilo +// user auth), so it must be registered before the kiloAuthMiddleware +// wildcard below. + +app.post('/api/towns/:townId/container-eviction', c => + instrumented(c, 'POST /api/towns/:townId/container-eviction', () => + handleContainerEviction(c, c.req.param()) + ) +); + +app.post('/api/towns/:townId/container-ready', c => + instrumented(c, 'POST /api/towns/:townId/container-ready', () => + handleContainerReady(c, c.req.param()) + ) +); + +app.get('/api/towns/:townId/drain-status', c => + instrumented(c, 'GET /api/towns/:townId/drain-status', () => handleDrainStatus(c, c.req.param())) +); + // ── Kilo User Auth ────────────────────────────────────────────────────── // Validate Kilo user JWT (signed with NEXTAUTH_SECRET) for dashboard/user // routes. Container→worker routes use the agent JWT middleware instead diff --git a/cloudflare-gastown/src/handlers/rig-agents.handler.ts b/cloudflare-gastown/src/handlers/rig-agents.handler.ts index 24c429a4c..f1dd8062a 100644 --- a/cloudflare-gastown/src/handlers/rig-agents.handler.ts +++ b/cloudflare-gastown/src/handlers/rig-agents.handler.ts @@ -221,7 +221,11 @@ export async function handleHeartbeat( // No body or invalid JSON — old container format, just touch } - await town.touchAgentHeartbeat( + // touchAgentHeartbeat returns the drain nonce atomically — no + // second RPC needed, which prevents a TOCTOU race where an + // in-flight heartbeat from the old container could observe a nonce + // generated between two separate DO calls. + const { drainNonce } = await town.touchAgentHeartbeat( params.agentId, watermark ? { @@ -232,7 +236,7 @@ export async function handleHeartbeat( : undefined ); - return c.json(resSuccess({ heartbeat: true })); + return c.json(resSuccess({ heartbeat: true, ...(drainNonce ? { drainNonce } : {}) })); } const GetOrCreateAgentBody = z.object({ diff --git a/cloudflare-gastown/src/handlers/town-eviction.handler.ts b/cloudflare-gastown/src/handlers/town-eviction.handler.ts new file mode 100644 index 000000000..b92e6a502 --- /dev/null +++ b/cloudflare-gastown/src/handlers/town-eviction.handler.ts @@ -0,0 +1,142 @@ +import type { Context } from 'hono'; +import { z } from 'zod'; +import { extractBearerToken } from '@kilocode/worker-utils'; +import type { GastownEnv } from '../gastown.worker'; +import { getTownDOStub } from '../dos/Town.do'; +import { verifyContainerJWT } from '../util/jwt.util'; +import { resolveSecret } from '../util/secret.util'; +import { resSuccess, resError } from '../util/res.util'; + +/** + * POST /api/towns/:townId/container-eviction + * + * Called by the container's process-manager when the container receives + * SIGTERM. Inserts a `container_eviction` event and sets the draining + * flag so the reconciler stops dispatching new work. + * + * Returns a `drainNonce` that must be presented via `/container-ready` + * to clear the drain flag. This prevents stale heartbeats from the + * dying container from prematurely re-enabling dispatch. + * + * Authenticated with the container-scoped JWT (same token used for all + * container→worker calls). + */ +export async function handleContainerEviction( + c: Context, + params: { townId: string } +): Promise { + // Authenticate with container JWT + const token = extractBearerToken(c.req.header('Authorization')); + if (!token) { + return c.json(resError('Authentication required'), 401); + } + + const secret = await resolveSecret(c.env.GASTOWN_JWT_SECRET); + if (!secret) { + console.error('[town-eviction] failed to resolve GASTOWN_JWT_SECRET'); + return c.json(resError('Internal server error'), 500); + } + + const result = verifyContainerJWT(token, secret); + if (!result.success) { + return c.json(resError(result.error), 401); + } + + // Cross-town guard + if (result.payload.townId !== params.townId) { + return c.json(resError('Cross-town access denied'), 403); + } + + const town = getTownDOStub(c.env, params.townId); + const drainNonce = await town.recordContainerEviction(); + + console.log(`[town-eviction] container eviction recorded for town=${params.townId}`); + return c.json(resSuccess({ acknowledged: true, drainNonce }), 200); +} + +/** + * GET /api/towns/:townId/drain-status + * + * Lightweight endpoint for the container to poll drain state. Used by + * the heartbeat module when no agents are running — the per-agent + * heartbeat loop has nothing to iterate, so a separate check is needed + * to discover the drain nonce and call /container-ready. + * + * Authenticated with the container-scoped JWT. + */ +export async function handleDrainStatus( + c: Context, + params: { townId: string } +): Promise { + const token = extractBearerToken(c.req.header('Authorization')); + if (!token) { + return c.json(resError('Authentication required'), 401); + } + + const secret = await resolveSecret(c.env.GASTOWN_JWT_SECRET); + if (!secret) { + return c.json(resError('Internal server error'), 500); + } + + const result = verifyContainerJWT(token, secret); + if (!result.success) { + return c.json(resError(result.error), 401); + } + + if (result.payload.townId !== params.townId) { + return c.json(resError('Cross-town access denied'), 403); + } + + const town = getTownDOStub(c.env, params.townId); + const [draining, drainNonce] = await Promise.all([town.isDraining(), town.getDrainNonce()]); + + return c.json(resSuccess({ draining, drainNonce }), 200); +} + +/** + * POST /api/towns/:townId/container-ready + * + * Called by the replacement container on startup to signal readiness. + * Clears the draining flag only if the provided `drainNonce` matches + * the nonce generated during the eviction that triggered the drain. + * + * Authenticated with the container-scoped JWT. + */ +export async function handleContainerReady( + c: Context, + params: { townId: string } +): Promise { + const token = extractBearerToken(c.req.header('Authorization')); + if (!token) { + return c.json(resError('Authentication required'), 401); + } + + const secret = await resolveSecret(c.env.GASTOWN_JWT_SECRET); + if (!secret) { + console.error('[container-ready] failed to resolve GASTOWN_JWT_SECRET'); + return c.json(resError('Internal server error'), 500); + } + + const result = verifyContainerJWT(token, secret); + if (!result.success) { + return c.json(resError(result.error), 401); + } + + if (result.payload.townId !== params.townId) { + return c.json(resError('Cross-town access denied'), 403); + } + + const ContainerReadyBody = z.object({ nonce: z.string() }); + + const parsed = ContainerReadyBody.safeParse(await c.req.json().catch(() => null)); + if (!parsed.success) { + return c.json(resError('Missing required field: nonce'), 400); + } + const { nonce } = parsed.data; + + const town = getTownDOStub(c.env, params.townId); + const cleared = await town.acknowledgeContainerReady(nonce); + + console.log(`[container-ready] town=${params.townId} nonce=${nonce} cleared=${cleared}`); + return c.json(resSuccess({ cleared }), 200); +} diff --git a/cloudflare-gastown/test/integration/reconciler.test.ts b/cloudflare-gastown/test/integration/reconciler.test.ts index e5e286924..e46e7cae3 100644 --- a/cloudflare-gastown/test/integration/reconciler.test.ts +++ b/cloudflare-gastown/test/integration/reconciler.test.ts @@ -485,5 +485,18 @@ describe('Reconciler', () => { const statusAfter = await town.getConvoyStatus(result.convoy.id); expect(statusAfter?.staged).toBe(false); }); + + it('should process container_eviction event without error', async () => { + // Record a container eviction (inserts a container_eviction event) + await town.recordContainerEviction(); + + // Run alarm — the reconciler should drain the container_eviction + // event and process it as a no-op (audit-only) without throwing. + await runDurableObjectAlarm(town); + + // The draining flag should still be true (the reconciler doesn't + // clear it — only a heartbeat does). + expect(await town.isDraining()).toBe(true); + }); }); }); diff --git a/cloudflare-gastown/test/integration/town-container.test.ts b/cloudflare-gastown/test/integration/town-container.test.ts index 0849168b9..f9313f052 100644 --- a/cloudflare-gastown/test/integration/town-container.test.ts +++ b/cloudflare-gastown/test/integration/town-container.test.ts @@ -119,3 +119,90 @@ describe('Town DO — touchAgentHeartbeat', () => { expect(updated!.last_activity_at).not.toBe(initialActivity); }); }); + +// ── Container Eviction (draining lifecycle) ──────────────────────────── + +describe('Town DO — container eviction draining lifecycle', () => { + it('recordContainerEviction() sets draining flag', async () => { + const id = `town-${crypto.randomUUID()}`; + const town = env.TOWN.get(env.TOWN.idFromName(id)); + + // Before eviction, isDraining should be false + expect(await town.isDraining()).toBe(false); + + // Record eviction + await town.recordContainerEviction(); + + // isDraining should now be true + expect(await town.isDraining()).toBe(true); + }); + + it('acknowledgeContainerReady() clears draining flag with correct nonce', async () => { + const id = `town-${crypto.randomUUID()}`; + const town = env.TOWN.get(env.TOWN.idFromName(id)); + + // Set draining flag and capture the nonce + const drainNonce = await town.recordContainerEviction(); + expect(await town.isDraining()).toBe(true); + + // Acknowledge with the correct nonce clears the drain flag + const cleared = await town.acknowledgeContainerReady(drainNonce); + expect(cleared).toBe(true); + expect(await town.isDraining()).toBe(false); + }); + + it('acknowledgeContainerReady() rejects wrong nonce and stays draining', async () => { + const id = `town-${crypto.randomUUID()}`; + const town = env.TOWN.get(env.TOWN.idFromName(id)); + + // Set draining flag + await town.recordContainerEviction(); + expect(await town.isDraining()).toBe(true); + + // Wrong nonce should not clear the drain flag + const cleared = await town.acknowledgeContainerReady('wrong-nonce'); + expect(cleared).toBe(false); + expect(await town.isDraining()).toBe(true); + }); + + it('touchAgentHeartbeat() returns drainNonce during eviction', async () => { + const id = `town-${crypto.randomUUID()}`; + const town = env.TOWN.get(env.TOWN.idFromName(id)); + + // Register an agent so heartbeat has a valid target + const agent = await town.registerAgent({ + role: 'polecat', + name: 'drain-nonce-test', + identity: `drain-nonce-${id}`, + }); + + // Set draining flag + const drainNonce = await town.recordContainerEviction(); + expect(await town.isDraining()).toBe(true); + + // Heartbeat returns the drainNonce (but does NOT clear draining) + const heartbeatResult = await town.touchAgentHeartbeat(agent.id); + expect(heartbeatResult.drainNonce).toBe(drainNonce); + expect(await town.isDraining()).toBe(true); + + // Only acknowledgeContainerReady with the nonce clears it + const cleared = await town.acknowledgeContainerReady(drainNonce); + expect(cleared).toBe(true); + expect(await town.isDraining()).toBe(false); + }); + + it('draining flag persists across re-initialization', async () => { + const id = `town-${crypto.randomUUID()}`; + const town = env.TOWN.get(env.TOWN.idFromName(id)); + + // Record eviction on the first stub reference + await town.recordContainerEviction(); + expect(await town.isDraining()).toBe(true); + + // Get a fresh stub reference to the same DO (same name = same instance). + // The DO's blockConcurrencyWhile loads town:draining from KV on init, + // so the draining flag should survive. + const town2 = env.TOWN.get(env.TOWN.idFromName(id)); + expect(await town2.isDraining()).toBe(true); + }); +}); diff --git a/src/app/(app)/claw/components/ClawHeader.tsx b/src/app/(app)/claw/components/ClawHeader.tsx index 0f7ccfc9d..ca9c827d0 100644 --- a/src/app/(app)/claw/components/ClawHeader.tsx +++ b/src/app/(app)/claw/components/ClawHeader.tsx @@ -28,7 +28,6 @@ export function ClawHeader({ title="KiloClaw" icon={} > - Beta {statusInfo && ( {statusInfo.label} diff --git a/src/app/admin/api/safety-identifiers/route.ts b/src/app/admin/api/safety-identifiers/route.ts new file mode 100644 index 000000000..c9cb74867 --- /dev/null +++ b/src/app/admin/api/safety-identifiers/route.ts @@ -0,0 +1,76 @@ +import { NextResponse } from 'next/server'; +import { getUserFromAuth } from '@/lib/user.server'; +import { db } from '@/lib/drizzle'; +import { kilocode_users } from '@kilocode/db'; +import { + generateOpenRouterUpstreamSafetyIdentifier, + generateVercelDownstreamSafetyIdentifier, +} from '@/lib/providerHash'; +import { isNull, count, or, desc, eq } from 'drizzle-orm'; + +const missingEither = or( + isNull(kilocode_users.openrouter_upstream_safety_identifier), + isNull(kilocode_users.vercel_downstream_safety_identifier) +); + +export type SafetyIdentifierCountsResponse = { + missing: number; +}; + +export type BackfillBatchResponse = { + processed: number; + remaining: boolean; +}; + +export async function GET(): Promise< + NextResponse +> { + const { authFailedResponse } = await getUserFromAuth({ adminOnly: true }); + if (authFailedResponse) return authFailedResponse; + + const [result] = await db.select({ count: count() }).from(kilocode_users).where(missingEither); + + return NextResponse.json({ missing: result?.count ?? 0 }); +} + +export async function POST(): Promise> { + const { authFailedResponse } = await getUserFromAuth({ adminOnly: true }); + if (authFailedResponse) return authFailedResponse; + + const processed = await db.transaction(async tran => { + const rows = await tran + .select({ id: kilocode_users.id }) + .from(kilocode_users) + .where(missingEither) + .orderBy(desc(kilocode_users.created_at)) + .limit(1000); + + for (const user of rows) { + const openrouter_upstream_safety_identifier = generateOpenRouterUpstreamSafetyIdentifier( + user.id + ); + if (openrouter_upstream_safety_identifier === null) { + return null; + } + await tran + .update(kilocode_users) + .set({ + openrouter_upstream_safety_identifier, + vercel_downstream_safety_identifier: generateVercelDownstreamSafetyIdentifier(user.id), + }) + .where(eq(kilocode_users.id, user.id)) + .execute(); + } + + return rows.length; + }); + + if (processed === null) { + return NextResponse.json( + { error: 'OPENROUTER_ORG_ID is not configured on this server' }, + { status: 500 } + ); + } + + return NextResponse.json({ processed, remaining: processed === 1000 }); +} diff --git a/src/app/admin/components/AppSidebar.tsx b/src/app/admin/components/AppSidebar.tsx index 0fc8480f8..8eddc3788 100644 --- a/src/app/admin/components/AppSidebar.tsx +++ b/src/app/admin/components/AppSidebar.tsx @@ -23,6 +23,7 @@ import { Bell, Server, Network, + KeyRound, } from 'lucide-react'; import { useSession } from 'next-auth/react'; import type { Session } from 'next-auth'; @@ -79,6 +80,11 @@ const userManagementItems: MenuItem[] = [ url: '/admin/blacklisted-domains', icon: () => , }, + { + title: () => 'Safety Identifiers', + url: '/admin/safety-identifiers', + icon: () => , + }, ]; const financialItems: MenuItem[] = [ diff --git a/src/app/admin/components/SafetyIdentifiersBackfill.tsx b/src/app/admin/components/SafetyIdentifiersBackfill.tsx new file mode 100644 index 000000000..5c7b8ed64 --- /dev/null +++ b/src/app/admin/components/SafetyIdentifiersBackfill.tsx @@ -0,0 +1,99 @@ +'use client'; + +import { useState } from 'react'; +import { useQuery, useMutation, useQueryClient } from '@tanstack/react-query'; +import { Button } from '@/components/ui/button'; +import { Alert, AlertDescription } from '@/components/ui/alert'; +import { Badge } from '@/components/ui/badge'; +import type { + SafetyIdentifierCountsResponse, + BackfillBatchResponse, +} from '../api/safety-identifiers/route'; + +type BatchLog = { + processed: number; + timestamp: Date; +}; + +export function SafetyIdentifiersBackfill() { + const [logs, setLogs] = useState([]); + const queryClient = useQueryClient(); + + const { data: counts, isLoading } = useQuery({ + queryKey: ['safety-identifier-counts'], + queryFn: async () => { + const res = await fetch('/admin/api/safety-identifiers'); + return res.json() as Promise; + }, + refetchInterval: false, + }); + + const mutation = useMutation({ + mutationFn: async () => { + const res = await fetch('/admin/api/safety-identifiers', { method: 'POST' }); + if (!res.ok) { + const body = (await res.json()) as { error?: string }; + throw new Error(body.error ?? `HTTP ${res.status}`); + } + return res.json() as Promise; + }, + onSuccess: data => { + setLogs(prev => [{ processed: data.processed, timestamp: new Date() }, ...prev]); + void queryClient.invalidateQueries({ queryKey: ['safety-identifier-counts'] }); + }, + }); + + const isDone = counts?.missing === 0; + + return ( +
+

+ Backfill safety identifiers for users missing either field. Each click processes up to 1 000 + users. Click repeatedly until the counter reaches zero. +

+ +
+
+ Users missing a safety identifier + {isLoading ? ( + Loading… + ) : isDone ? ( + + All filled + + ) : ( + {(counts?.missing ?? 0).toLocaleString()} missing + )} +
+ + {mutation.isError && ( + + {mutation.error.message} + + )} + + +
+ + {logs.length > 0 && ( +
+

Batch log

+
+ {logs.map((log, i) => ( +
+ {log.timestamp.toLocaleTimeString()} + processed {log.processed.toLocaleString()} users +
+ ))} +
+
+ )} +
+ ); +} diff --git a/src/app/admin/components/UserAdmin/UserAdminAccountInfo.tsx b/src/app/admin/components/UserAdmin/UserAdminAccountInfo.tsx index 3566922e2..f60db2173 100644 --- a/src/app/admin/components/UserAdmin/UserAdminAccountInfo.tsx +++ b/src/app/admin/components/UserAdmin/UserAdminAccountInfo.tsx @@ -108,6 +108,21 @@ export function UserAdminAccountInfo(user: UserAdminAccountInfoProps) {

N/A

)} +
+

+ Vercel Downstream Safety Identifier +

+ {user.vercel_downstream_safety_identifier ? ( +
+

+ {user.vercel_downstream_safety_identifier} +

+ +
+ ) : ( +

N/A

+ )} +
diff --git a/src/app/admin/safety-identifiers/page.tsx b/src/app/admin/safety-identifiers/page.tsx new file mode 100644 index 000000000..f0ed5dbb0 --- /dev/null +++ b/src/app/admin/safety-identifiers/page.tsx @@ -0,0 +1,24 @@ +import { SafetyIdentifiersBackfill } from '../components/SafetyIdentifiersBackfill'; +import AdminPage from '../components/AdminPage'; +import { BreadcrumbItem, BreadcrumbPage } from '@/components/ui/breadcrumb'; + +const breadcrumbs = ( + <> + + Safety Identifiers + + +); + +export default function SafetyIdentifiersPage() { + return ( + +
+
+

Safety Identifier Backfill

+
+ +
+
+ ); +} diff --git a/src/lib/providers/gateway-error-rate.ts b/src/lib/providers/gateway-error-rate.ts index 8dbcdb64e..54bde1025 100644 --- a/src/lib/providers/gateway-error-rate.ts +++ b/src/lib/providers/gateway-error-rate.ts @@ -7,14 +7,15 @@ const getGatewayErrorRate_cached = unstable_cache( console.debug(`[getGatewayErrorRate_cached] refreshing at ${new Date().toISOString()}`); const { rows } = await db.execute(sql` select - provider as "gateway", - 1.0 * count(*) filter(where has_error = true) / count(*) as "errorRate" - from microdollar_usage_view + mu.provider as "gateway", + 1.0 * count(*) filter(where mu.has_error = true) / count(*) as "errorRate" + from microdollar_usage mu + join microdollar_usage_metadata meta on mu.id = meta.id where true - and created_at >= now() - interval '10 minutes' - and is_user_byok = false - and provider in ('openrouter', 'vercel') - group by provider + and mu.created_at >= now() - interval '10 minutes' + and meta.is_user_byok = false + and mu.provider in ('openrouter', 'vercel') + group by mu.provider `); return z .array( diff --git a/src/lib/user.test.ts b/src/lib/user.test.ts index 04109844e..612a005dc 100644 --- a/src/lib/user.test.ts +++ b/src/lib/user.test.ts @@ -92,6 +92,7 @@ describe('User', () => { linkedin_url: 'https://linkedin.com/in/testuser', github_url: 'https://github.com/testuser', openrouter_upstream_safety_identifier: 'openrouter_upstream_safety_identifier', + vercel_downstream_safety_identifier: 'vercel_downstream_safety_identifier', customer_source: 'A YouTube video', is_admin: true, }); @@ -108,6 +109,7 @@ describe('User', () => { expect(softDeleted!.github_url).toBeNull(); expect(softDeleted!.discord_server_membership_verified_at).toBeNull(); expect(softDeleted!.openrouter_upstream_safety_identifier).toBeNull(); + expect(softDeleted!.vercel_downstream_safety_identifier).toBeNull(); expect(softDeleted!.customer_source).toBeNull(); expect(softDeleted!.api_token_pepper).toBeNull(); expect(softDeleted!.default_model).toBeNull(); diff --git a/src/scripts/openrouter/backfill-safety-identifier.ts b/src/scripts/openrouter/backfill-safety-identifier.ts deleted file mode 100644 index c8e1f690c..000000000 --- a/src/scripts/openrouter/backfill-safety-identifier.ts +++ /dev/null @@ -1,40 +0,0 @@ -import { db } from '@/lib/drizzle'; -import { generateOpenRouterUpstreamSafetyIdentifier } from '@/lib/providerHash'; -import { kilocode_users } from '@kilocode/db'; -import { isNull, desc, eq } from 'drizzle-orm'; - -export async function run() { - while (true) { - const count = await db.transaction(async tran => { - const rows = await tran - .select({ - id: kilocode_users.id, - }) - .from(kilocode_users) - .where(isNull(kilocode_users.openrouter_upstream_safety_identifier)) - .orderBy(desc(kilocode_users.created_at)) - .limit(1000); - if (rows.length === 0) { - return 0; - } - console.log(`Batch of ${rows.length} users`); - for (const user of rows) { - const openrouter_upstream_safety_identifier = generateOpenRouterUpstreamSafetyIdentifier( - user.id - ); - await tran - .update(kilocode_users) - .set({ - openrouter_upstream_safety_identifier, - }) - .where(eq(kilocode_users.id, user.id)) - .execute(); - } - console.log('Commit'); - return rows.length; - }); - if (count === 0) { - break; - } - } -}