From fe40c78ae3d315eb1724ca1b60e7aee262a18680 Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Tue, 31 Mar 2026 19:36:45 -0500 Subject: [PATCH 1/9] feat(gastown): suppress dispatch_agent actions when town is draining (#1797) * feat(gastown): add container_eviction event type and /container-eviction endpoint Add foundation for Tier 1.5 graceful container eviction: - Add container_eviction to TownEventType enum - Add POST /api/towns/:townId/container-eviction route with container JWT auth for the container to signal SIGTERM receipt - Add recordContainerEviction() RPC on TownDO that inserts the event and sets a draining flag in DO KV storage - Add isDraining() RPC for reconciler consumption - Clear draining flag on agent heartbeat (container alive after restart) - Handle container_eviction in reconciler applyEvent (no-op, audit only) * feat(gastown): skip dispatch_agent actions when town is draining Add draining flag check to reconciler dispatch rules for Tier 1.5 graceful container eviction. When the town is in draining state (container eviction in progress), all dispatch_agent emissions are suppressed with a log message. Non-dispatch rules (agent health, convoy progress, GUPP, GC) continue to function normally during drain. Guards added to: - reconcileBeads Rule 1 (unassigned open beads) - reconcileBeads Rule 2 (idle agents with hooks) - reconcileReviewQueue Rule 5 (refinery dispatch for open MRs) - reconcileReviewQueue Rule 6 (idle refinery re-dispatch) * style: apply oxfmt formatting to reconciler * fix(gastown): replace heartbeat drain clearing with nonce-based /container-ready acknowledgment Heartbeats from the old (draining) container could race with the eviction signal and prematurely clear the drain flag, re-enabling dispatch into a container that was shutting down. Changes: - recordContainerEviction() now generates a drain nonce and returns it - New acknowledgeContainerReady(nonce) method validates the nonce before clearing drain - Heartbeat responses include the drainNonce when draining, so the replacement container can call /container-ready - New POST /container-ready worker endpoint - Drain auto-expires after 15 minutes as a safety net - Container heartbeat module detects drainNonce in responses and calls /container-ready to clear drain on the replacement container * fix(gastown): fix drain nonce race and idle container readiness Two issues fixed: 1. Heartbeat nonce TOCTOU race: touchAgentHeartbeat() now returns the drain nonce atomically in a single DO call instead of requiring a separate getDrainNonce() RPC. This prevents an in-flight heartbeat from the old container from observing a nonce generated between two separate calls. 2. Idle container readiness: ensureContainerReady() now passes X-Drain-Nonce and X-Town-Id headers to the container health check when draining. The container's /health endpoint reads these and calls /container-ready, handling the case where a replacement container has no running agents and the per-agent heartbeat loop has nothing to iterate. Also adds GET /drain-status endpoint for debugging. * fix(gastown): delay drain nonce handoff via health check until old container exits ensureContainerReady() talks to whichever container is currently serving this town. During drain, that is still the old container. Passing X-Drain-Nonce immediately would let the old container clear drain before the replacement is up. Now the nonce is only passed via health check headers after 11 minutes (beyond the 10-min drainAll wait + exit), ensuring the old container has exited before the handoff. * fix(gastown): skip ensureContainerReady early-return when draining ensureContainerReady() bails out early when there is no active work and the rig is not recently configured. This prevented the drain nonce handoff from ever reaching the replacement container in idle towns. Now the draining flag bypasses that early-return so the health check (with X-Drain-Nonce headers) always fires. --- .../container/src/control-server.ts | 11 +- cloudflare-gastown/container/src/heartbeat.ts | 63 +++++++ .../src/db/tables/town-events.table.ts | 1 + cloudflare-gastown/src/dos/Town.do.ts | 126 +++++++++++++- cloudflare-gastown/src/dos/town/reconciler.ts | 44 ++++- cloudflare-gastown/src/gastown.worker.ts | 28 ++++ .../src/handlers/rig-agents.handler.ts | 8 +- .../src/handlers/town-eviction.handler.ts | 155 ++++++++++++++++++ 8 files changed, 425 insertions(+), 11 deletions(-) create mode 100644 cloudflare-gastown/src/handlers/town-eviction.handler.ts diff --git a/cloudflare-gastown/container/src/control-server.ts b/cloudflare-gastown/container/src/control-server.ts index 70ba58c62..86b1d5c8d 100644 --- a/cloudflare-gastown/container/src/control-server.ts +++ b/cloudflare-gastown/container/src/control-server.ts @@ -13,7 +13,7 @@ import { getAgentEvents, registerEventSink, } from './process-manager'; -import { startHeartbeat, stopHeartbeat } from './heartbeat'; +import { startHeartbeat, stopHeartbeat, notifyContainerReady } from './heartbeat'; import { pushContext as pushDashboardContext } from './dashboard-context'; import { mergeBranch, setupRigBrowseWorktree } from './git-manager'; import { @@ -92,6 +92,15 @@ app.use('*', async (c, next) => { // GET /health app.get('/health', c => { + // When the TownDO is draining, it passes the drain nonce and town + // ID via headers so idle containers (no running agents) can + // acknowledge readiness and clear the drain flag. + const drainNonce = c.req.header('X-Drain-Nonce'); + const townId = c.req.header('X-Town-Id'); + if (drainNonce && townId) { + void notifyContainerReady(townId, drainNonce); + } + const response: HealthResponse = { status: 'ok', agents: activeAgentCount(), diff --git a/cloudflare-gastown/container/src/heartbeat.ts b/cloudflare-gastown/container/src/heartbeat.ts index bd9dd8db3..defbe7897 100644 --- a/cloudflare-gastown/container/src/heartbeat.ts +++ b/cloudflare-gastown/container/src/heartbeat.ts @@ -6,6 +6,8 @@ const HEARTBEAT_INTERVAL_MS = 30_000; let heartbeatTimer: ReturnType | null = null; let gastownApiUrl: string | null = null; let sessionToken: string | null = null; +/** Set once we've successfully acknowledged container-ready. */ +let containerReadyAcknowledged = false; /** * Configure and start the heartbeat reporter. @@ -38,6 +40,49 @@ export function stopHeartbeat(): void { console.log('Heartbeat reporter stopped'); } +/** + * Notify the TownDO that the replacement container is ready. + * Exported so the health endpoint can trigger it when the TownDO + * passes the drain nonce via headers (handles idle containers that + * have no running agents and thus no per-agent heartbeats). + */ +export async function notifyContainerReady(townId: string, drainNonce: string): Promise { + if (containerReadyAcknowledged) return; + await acknowledgeContainerReady(townId, drainNonce); +} + +/** + * Call POST /container-ready to acknowledge that this is a fresh + * container replacing an evicted one. Clears the TownDO drain flag + * so the reconciler can resume dispatching. + */ +async function acknowledgeContainerReady(townId: string, drainNonce: string): Promise { + const apiUrl = gastownApiUrl ?? process.env.GASTOWN_API_URL; + const currentToken = process.env.GASTOWN_CONTAINER_TOKEN ?? sessionToken; + if (!apiUrl || !currentToken) return; + + try { + const response = await fetch(`${apiUrl}/api/towns/${townId}/container-ready`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${currentToken}`, + }, + body: JSON.stringify({ nonce: drainNonce }), + }); + if (response.ok) { + containerReadyAcknowledged = true; + console.log(`[heartbeat] container-ready acknowledged for town=${townId}`); + } else { + console.warn( + `[heartbeat] container-ready failed for town=${townId}: ${response.status} ${response.statusText}` + ); + } + } catch (err) { + console.warn(`[heartbeat] container-ready error for town=${townId}:`, err); + } +} + async function sendHeartbeats(): Promise { // Prefer the live container token (refreshed via POST /refresh-token) // over the token captured at startHeartbeat() time. @@ -46,6 +91,12 @@ async function sendHeartbeats(): Promise { const active = listAgents().filter(a => a.status === 'running' || a.status === 'starting'); + // When no agents are active, the per-agent heartbeat loop has + // nothing to send. Idle container drain acknowledgment is handled + // by the /health endpoint instead (the TownDO passes the nonce via + // X-Drain-Nonce headers in ensureContainerReady). + if (active.length === 0) return; + for (const agent of active) { const payload: HeartbeatPayload = { agentId: agent.agentId, @@ -77,6 +128,18 @@ async function sendHeartbeats(): Promise { console.warn( `Heartbeat failed for agent ${agent.agentId}: ${response.status} ${response.statusText}` ); + } else if (!containerReadyAcknowledged) { + // If the TownDO is draining, the heartbeat response includes a + // drainNonce. Use it to call /container-ready and clear drain. + try { + const body = (await response.json()) as { data?: { drainNonce?: string } }; + const nonce = body?.data?.drainNonce; + if (nonce) { + void acknowledgeContainerReady(agent.townId, nonce); + } + } catch { + // Non-JSON or unexpected shape — ignore + } } } catch (err) { console.warn(`Heartbeat error for agent ${agent.agentId}:`, err); diff --git a/cloudflare-gastown/src/db/tables/town-events.table.ts b/cloudflare-gastown/src/db/tables/town-events.table.ts index 95309e7a0..30be09c65 100644 --- a/cloudflare-gastown/src/db/tables/town-events.table.ts +++ b/cloudflare-gastown/src/db/tables/town-events.table.ts @@ -5,6 +5,7 @@ export const TownEventType = z.enum([ 'agent_done', 'agent_completed', 'container_status', + 'container_eviction', 'pr_status_changed', 'bead_created', 'bead_cancelled', diff --git a/cloudflare-gastown/src/dos/Town.do.ts b/cloudflare-gastown/src/dos/Town.do.ts index b3eeaa158..7cccd35ad 100644 --- a/cloudflare-gastown/src/dos/Town.do.ts +++ b/cloudflare-gastown/src/dos/Town.do.ts @@ -504,6 +504,11 @@ export class TownDO extends DurableObject { const townConfig = await config.getTownConfig(this.ctx.storage); this._ownerUserId = townConfig.owner_user_id; + // Load persisted draining flag, nonce, and start time + this._draining = (await this.ctx.storage.get('town:draining')) ?? false; + this._drainNonce = (await this.ctx.storage.get('town:drainNonce')) ?? null; + this._drainStartedAt = (await this.ctx.storage.get('town:drainStartedAt')) ?? null; + // All tables are now initialized via beads.initBeadTables(): // beads, bead_events, bead_dependencies, agent_metadata, review_metadata, // escalation_metadata, convoy_metadata @@ -537,6 +542,9 @@ export class TownDO extends DurableObject { private _townId: string | null = null; private _lastReconcilerMetrics: reconciler.ReconcilerMetrics | null = null; private _dashboardContext: string | null = null; + private _draining = false; + private _drainNonce: string | null = null; + private _drainStartedAt: number | null = null; private get townId(): string { return this._townId ?? this.ctx.id.name ?? this.ctx.id.toString(); @@ -563,6 +571,72 @@ export class TownDO extends DurableObject { return this._dashboardContext; } + // ══════════════════════════════════════════════════════════════════ + // Container Eviction (graceful drain) + // ══════════════════════════════════════════════════════════════════ + + /** + * Record a container eviction event and set the draining flag. + * Called by the container when it receives SIGTERM. While draining, + * the reconciler skips dispatch to prevent new work from starting. + * + * Returns a drain nonce that must be presented via + * `acknowledgeContainerReady()` to clear the drain flag. This + * prevents stale heartbeats from the dying container from + * prematurely re-enabling dispatch. + */ + async recordContainerEviction(): Promise { + events.insertEvent(this.sql, 'container_eviction', {}); + const nonce = crypto.randomUUID(); + const startedAt = Date.now(); + this._draining = true; + this._drainNonce = nonce; + this._drainStartedAt = startedAt; + await this.ctx.storage.put('town:draining', true); + await this.ctx.storage.put('town:drainNonce', nonce); + await this.ctx.storage.put('town:drainStartedAt', startedAt); + console.log(`${TOWN_LOG} recordContainerEviction: draining flag set, nonce=${nonce}`); + return nonce; + } + + /** + * Acknowledge that the replacement container is ready. Clears the + * draining flag only if the provided nonce matches the one generated + * during `recordContainerEviction()`. This ensures that only the + * new container (which received the nonce via startup config) can + * re-enable dispatch — not a stale heartbeat from the old container. + */ + async acknowledgeContainerReady(nonce: string): Promise { + if (!this._draining) { + console.log(`${TOWN_LOG} acknowledgeContainerReady: not draining, noop`); + return true; + } + if (nonce !== this._drainNonce) { + console.warn( + `${TOWN_LOG} acknowledgeContainerReady: nonce mismatch (got=${nonce}, expected=${this._drainNonce})` + ); + return false; + } + this._draining = false; + this._drainNonce = null; + this._drainStartedAt = null; + await this.ctx.storage.put('town:draining', false); + await this.ctx.storage.delete('town:drainNonce'); + await this.ctx.storage.delete('town:drainStartedAt'); + console.log(`${TOWN_LOG} acknowledgeContainerReady: draining flag cleared`); + return true; + } + + /** Whether the town is in draining mode (container eviction in progress). */ + async isDraining(): Promise { + return this._draining; + } + + /** The current drain nonce (null when not draining). */ + async getDrainNonce(): Promise { + return this._drainNonce; + } + // ══════════════════════════════════════════════════════════════════ // Town Configuration // ══════════════════════════════════════════════════════════════════ @@ -1105,6 +1179,13 @@ export class TownDO extends DurableObject { // ── Heartbeat ───────────────────────────────────────────────────── + /** + * Update an agent's heartbeat timestamp. Returns the current drain + * nonce (if draining) so the caller can include it in the HTTP + * response without a second RPC — preventing a TOCTOU race where + * an in-flight heartbeat from the old container could observe a + * nonce generated between two separate DO calls. + */ async touchAgentHeartbeat( agentId: string, watermark?: { @@ -1112,9 +1193,10 @@ export class TownDO extends DurableObject { lastEventAt?: string | null; activeTools?: string[]; } - ): Promise { + ): Promise<{ drainNonce: string | null }> { agents.touchAgent(this.sql, agentId, watermark); await this.armAlarmIfNeeded(); + return { drainNonce: this._drainNonce }; } async updateAgentStatusMessage(agentId: string, message: string): Promise { @@ -3113,10 +3195,29 @@ export class TownDO extends DurableObject { Sentry.captureException(err); } + // Auto-clear drain flag if it has been active for too long. + // The drain sequence (drainAll) waits up to 10 minutes, so 15 + // minutes is a generous upper bound. After this timeout the old + // container is certainly dead and it is safe to resume dispatch. + const DRAIN_TIMEOUT_MS = 15 * 60 * 1000; + if ( + this._draining && + this._drainStartedAt && + Date.now() - this._drainStartedAt > DRAIN_TIMEOUT_MS + ) { + this._draining = false; + this._drainNonce = null; + this._drainStartedAt = null; + await this.ctx.storage.put('town:draining', false); + await this.ctx.storage.delete('town:drainNonce'); + await this.ctx.storage.delete('town:drainStartedAt'); + logger.info('reconciler: drain timeout exceeded, auto-clearing draining flag'); + } + // Phase 1: Reconcile — compute desired state vs actual state const sideEffects: Array<() => Promise> = []; try { - const actions = reconciler.reconcile(this.sql); + const actions = reconciler.reconcile(this.sql, { draining: this._draining }); metrics.actionsEmitted = actions.length; for (const a of actions) { metrics.actionsByType[a.type] = (metrics.actionsByType[a.type] ?? 0) + 1; @@ -3648,7 +3749,7 @@ export class TownDO extends DurableObject { if (!hasRigs) return; const hasWork = this.hasActiveWork(); - if (!hasWork) { + if (!hasWork && !this._draining) { const rigList = rigs.listRigs(this.sql); const newestRigAge = rigList.reduce((min, r) => { const age = Date.now() - new Date(r.created_at).getTime(); @@ -3663,8 +3764,27 @@ export class TownDO extends DurableObject { try { const container = getTownContainerStub(this.env, townId); + const headers: Record = {}; + // When draining AND enough time has passed for the old container + // to have exited (drainAll waits up to 10 min + exit), pass the + // nonce so the replacement container can acknowledge readiness. + // We only send the nonce after 11 minutes to avoid the old + // (still-draining) container receiving it and clearing drain + // prematurely — the health check goes to whichever container is + // currently serving this town. + const DRAIN_HANDOFF_DELAY_MS = 11 * 60 * 1000; + if ( + this._draining && + this._drainNonce && + this._drainStartedAt && + Date.now() - this._drainStartedAt > DRAIN_HANDOFF_DELAY_MS + ) { + headers['X-Drain-Nonce'] = this._drainNonce; + headers['X-Town-Id'] = townId; + } await container.fetch('http://container/health', { signal: AbortSignal.timeout(5_000), + headers, }); } catch { // Container is starting up or unavailable — alarm will retry diff --git a/cloudflare-gastown/src/dos/town/reconciler.ts b/cloudflare-gastown/src/dos/town/reconciler.ts index e9fef8d21..404d71d87 100644 --- a/cloudflare-gastown/src/dos/town/reconciler.ts +++ b/cloudflare-gastown/src/dos/town/reconciler.ts @@ -290,6 +290,13 @@ export function applyEvent(sql: SqlStorage, event: TownEventRecord): void { return; } + case 'container_eviction': { + // Draining flag is managed by the TownDO via KV storage. + // The reconciler reads it from there; no SQL state change needed here. + // The event is recorded for audit trail. + return; + } + case 'nudge_timeout': { // GUPP violations are handled by reconcileGUPP on the next pass. // The event just records the fact for audit trail. @@ -306,11 +313,12 @@ export function applyEvent(sql: SqlStorage, event: TownEventRecord): void { // Top-level reconcile // ════════════════════════════════════════════════════════════════════ -export function reconcile(sql: SqlStorage): Action[] { +export function reconcile(sql: SqlStorage, opts?: { draining?: boolean }): Action[] { + const draining = opts?.draining ?? false; const actions: Action[] = []; actions.push(...reconcileAgents(sql)); - actions.push(...reconcileBeads(sql)); - actions.push(...reconcileReviewQueue(sql)); + actions.push(...reconcileBeads(sql, { draining })); + actions.push(...reconcileReviewQueue(sql, { draining })); actions.push(...reconcileConvoys(sql)); actions.push(...reconcileGUPP(sql)); actions.push(...reconcileGC(sql)); @@ -457,7 +465,8 @@ export function reconcileAgents(sql: SqlStorage): Action[] { // reconcileBeads — handle unassigned beads, lost agents, stale reviews // ════════════════════════════════════════════════════════════════════ -export function reconcileBeads(sql: SqlStorage): Action[] { +export function reconcileBeads(sql: SqlStorage, opts?: { draining?: boolean }): Action[] { + const draining = opts?.draining ?? false; const actions: Action[] = []; // Rule 1: Open issue beads with no assignee, no blockers, not staged, not triage @@ -498,6 +507,10 @@ export function reconcileBeads(sql: SqlStorage): Action[] { for (const bead of unassigned) { if (!bead.rig_id) continue; + if (draining) { + console.log(`${LOG} Town is draining, skipping dispatch for bead ${bead.bead_id}`); + continue; + } // In shadow mode we can't call getOrCreateAgent, so we just note // that a hook_agent + dispatch_agent is needed. // The action includes rig_id so Phase 3's applyAction can resolve the agent. @@ -594,6 +607,13 @@ export function reconcileBeads(sql: SqlStorage): Action[] { if (blockerCount[0]?.cnt > 0) continue; + if (draining) { + console.log( + `${LOG} Town is draining, skipping dispatch for bead ${agent.current_hook_bead_id}` + ); + continue; + } + actions.push({ type: 'dispatch_agent', agent_id: agent.bead_id, @@ -743,7 +763,8 @@ export function reconcileBeads(sql: SqlStorage): Action[] { // refinery dispatch // ════════════════════════════════════════════════════════════════════ -export function reconcileReviewQueue(sql: SqlStorage): Action[] { +export function reconcileReviewQueue(sql: SqlStorage, opts?: { draining?: boolean }): Action[] { + const draining = opts?.draining ?? false; const actions: Action[] = []; // Get all MR beads that need attention @@ -933,6 +954,12 @@ export function reconcileReviewQueue(sql: SqlStorage): Action[] { if (oldestMr.length === 0) continue; + // Skip dispatch if the town is draining (container eviction in progress) + if (draining) { + console.log(`${LOG} Town is draining, skipping dispatch for bead ${oldestMr[0].bead_id}`); + continue; + } + // If no refinery exists or it's busy, emit a dispatch_agent with empty // agent_id — applyAction will create the refinery via getOrCreateAgent. if (refinery.length === 0) { @@ -1044,6 +1071,13 @@ export function reconcileReviewQueue(sql: SqlStorage): Action[] { const mr = mrRows[0]; if (mr.type !== 'merge_request' || mr.status !== 'in_progress') continue; + if (draining) { + console.log( + `${LOG} Town is draining, skipping dispatch for bead ${ref.current_hook_bead_id}` + ); + continue; + } + // Container status is checked at apply time (async). In shadow mode, // we just note that a dispatch is needed. actions.push({ diff --git a/cloudflare-gastown/src/gastown.worker.ts b/cloudflare-gastown/src/gastown.worker.ts index 32d676e0a..d5808e0af 100644 --- a/cloudflare-gastown/src/gastown.worker.ts +++ b/cloudflare-gastown/src/gastown.worker.ts @@ -134,6 +134,11 @@ import { handleListEscalations, handleAcknowledgeEscalation, } from './handlers/town-escalations.handler'; +import { + handleContainerEviction, + handleContainerReady, + handleDrainStatus, +} from './handlers/town-eviction.handler'; export { GastownUserDO } from './dos/GastownUser.do'; export { GastownOrgDO } from './dos/GastownOrg.do'; @@ -478,6 +483,29 @@ app.post('/api/towns/:townId/rigs/:rigId/triage/resolve', c => ) ); +// ── Container Eviction ────────────────────────────────────────────────── +// Called by the container on SIGTERM. Uses container JWT auth (not kilo +// user auth), so it must be registered before the kiloAuthMiddleware +// wildcard below. + +app.post('/api/towns/:townId/container-eviction', c => + instrumented(c, 'POST /api/towns/:townId/container-eviction', () => + handleContainerEviction(c, c.req.param()) + ) +); + +app.post('/api/towns/:townId/container-ready', c => + instrumented(c, 'POST /api/towns/:townId/container-ready', () => + handleContainerReady(c, c.req.param()) + ) +); + +app.get('/api/towns/:townId/drain-status', c => + instrumented(c, 'GET /api/towns/:townId/drain-status', () => + handleDrainStatus(c, c.req.param()) + ) +); + // ── Kilo User Auth ────────────────────────────────────────────────────── // Validate Kilo user JWT (signed with NEXTAUTH_SECRET) for dashboard/user // routes. Container→worker routes use the agent JWT middleware instead diff --git a/cloudflare-gastown/src/handlers/rig-agents.handler.ts b/cloudflare-gastown/src/handlers/rig-agents.handler.ts index 24c429a4c..f1dd8062a 100644 --- a/cloudflare-gastown/src/handlers/rig-agents.handler.ts +++ b/cloudflare-gastown/src/handlers/rig-agents.handler.ts @@ -221,7 +221,11 @@ export async function handleHeartbeat( // No body or invalid JSON — old container format, just touch } - await town.touchAgentHeartbeat( + // touchAgentHeartbeat returns the drain nonce atomically — no + // second RPC needed, which prevents a TOCTOU race where an + // in-flight heartbeat from the old container could observe a nonce + // generated between two separate DO calls. + const { drainNonce } = await town.touchAgentHeartbeat( params.agentId, watermark ? { @@ -232,7 +236,7 @@ export async function handleHeartbeat( : undefined ); - return c.json(resSuccess({ heartbeat: true })); + return c.json(resSuccess({ heartbeat: true, ...(drainNonce ? { drainNonce } : {}) })); } const GetOrCreateAgentBody = z.object({ diff --git a/cloudflare-gastown/src/handlers/town-eviction.handler.ts b/cloudflare-gastown/src/handlers/town-eviction.handler.ts new file mode 100644 index 000000000..b60847888 --- /dev/null +++ b/cloudflare-gastown/src/handlers/town-eviction.handler.ts @@ -0,0 +1,155 @@ +import type { Context } from 'hono'; +import { extractBearerToken } from '@kilocode/worker-utils'; +import type { GastownEnv } from '../gastown.worker'; +import { getTownDOStub } from '../dos/Town.do'; +import { verifyContainerJWT } from '../util/jwt.util'; +import { resolveSecret } from '../util/secret.util'; +import { resSuccess, resError } from '../util/res.util'; + +/** + * POST /api/towns/:townId/container-eviction + * + * Called by the container's process-manager when the container receives + * SIGTERM. Inserts a `container_eviction` event and sets the draining + * flag so the reconciler stops dispatching new work. + * + * Returns a `drainNonce` that must be presented via `/container-ready` + * to clear the drain flag. This prevents stale heartbeats from the + * dying container from prematurely re-enabling dispatch. + * + * Authenticated with the container-scoped JWT (same token used for all + * container→worker calls). + */ +export async function handleContainerEviction( + c: Context, + params: { townId: string } +): Promise { + // Authenticate with container JWT + const token = extractBearerToken(c.req.header('Authorization')); + if (!token) { + return c.json(resError('Authentication required'), 401); + } + + const secret = await resolveSecret(c.env.GASTOWN_JWT_SECRET); + if (!secret) { + console.error('[town-eviction] failed to resolve GASTOWN_JWT_SECRET'); + return c.json(resError('Internal server error'), 500); + } + + const result = verifyContainerJWT(token, secret); + if (!result.success) { + return c.json(resError(result.error), 401); + } + + // Cross-town guard + if (result.payload.townId !== params.townId) { + return c.json(resError('Cross-town access denied'), 403); + } + + const town = getTownDOStub(c.env, params.townId); + const drainNonce = await town.recordContainerEviction(); + + console.log(`[town-eviction] container eviction recorded for town=${params.townId}`); + return c.json(resSuccess({ acknowledged: true, drainNonce }), 200); +} + +/** + * GET /api/towns/:townId/drain-status + * + * Lightweight endpoint for the container to poll drain state. Used by + * the heartbeat module when no agents are running — the per-agent + * heartbeat loop has nothing to iterate, so a separate check is needed + * to discover the drain nonce and call /container-ready. + * + * Authenticated with the container-scoped JWT. + */ +export async function handleDrainStatus( + c: Context, + params: { townId: string } +): Promise { + const token = extractBearerToken(c.req.header('Authorization')); + if (!token) { + return c.json(resError('Authentication required'), 401); + } + + const secret = await resolveSecret(c.env.GASTOWN_JWT_SECRET); + if (!secret) { + return c.json(resError('Internal server error'), 500); + } + + const result = verifyContainerJWT(token, secret); + if (!result.success) { + return c.json(resError(result.error), 401); + } + + if (result.payload.townId !== params.townId) { + return c.json(resError('Cross-town access denied'), 403); + } + + const town = getTownDOStub(c.env, params.townId); + const [draining, drainNonce] = await Promise.all([ + town.isDraining(), + town.getDrainNonce(), + ]); + + return c.json(resSuccess({ draining, drainNonce }), 200); +} + +/** + * POST /api/towns/:townId/container-ready + * + * Called by the replacement container on startup to signal readiness. + * Clears the draining flag only if the provided `drainNonce` matches + * the nonce generated during the eviction that triggered the drain. + * + * Authenticated with the container-scoped JWT. + */ +export async function handleContainerReady( + c: Context, + params: { townId: string } +): Promise { + const token = extractBearerToken(c.req.header('Authorization')); + if (!token) { + return c.json(resError('Authentication required'), 401); + } + + const secret = await resolveSecret(c.env.GASTOWN_JWT_SECRET); + if (!secret) { + console.error('[container-ready] failed to resolve GASTOWN_JWT_SECRET'); + return c.json(resError('Internal server error'), 500); + } + + const result = verifyContainerJWT(token, secret); + if (!result.success) { + return c.json(resError(result.error), 401); + } + + if (result.payload.townId !== params.townId) { + return c.json(resError('Cross-town access denied'), 403); + } + + let nonce: string | undefined; + try { + const body: unknown = await c.req.json(); + if ( + body && + typeof body === 'object' && + 'nonce' in body && + typeof (body as { nonce: unknown }).nonce === 'string' + ) { + nonce = (body as { nonce: string }).nonce; + } + } catch { + // No body or invalid JSON + } + + if (!nonce) { + return c.json(resError('Missing required field: nonce'), 400); + } + + const town = getTownDOStub(c.env, params.townId); + const cleared = await town.acknowledgeContainerReady(nonce); + + console.log(`[container-ready] town=${params.townId} nonce=${nonce} cleared=${cleared}`); + return c.json(resSuccess({ cleared }), 200); +} From 5161f96db423a0ee14995a11355f8370c51fa55b Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Tue, 31 Mar 2026 20:12:41 -0500 Subject: [PATCH 2/9] feat(container): add drainAll() to process-manager for graceful container eviction (#1795) * feat(container): add drainAll() to process-manager for graceful container eviction Implements 4-phase drain sequence: 1. Notify TownDO via POST /container-eviction 2. Nudge running polecat/refinery agents via sendMessage() 3. Poll up to 10 min waiting for agents to finish 4. Force-save stragglers with WIP git commit + push via Bun.spawn * fix(tracking): only resolve signup product when callbackPath was explicitly provided (#1793) * fix(tracking): only resolve signup product when callbackPath was explicitly provided * style: apply oxfmt formatting to after-sign-in route * fix(drain): address PR review comments on drainAll() - Phase 1: wrap TownDO fetch in 10s abortable timeout so a hung endpoint cannot stall the entire drain sequence - Phase 2: clear idle timers before sending eviction nudge so a pre-existing timer cannot race and flip the agent to exited - Phase 4: abort SDK sessions before force-saving so git operations don't race with a still-running agent's worktree writes * fix(container): clear idle timers and abort sessions before Phase 4 git save Phase 4 (force-save stragglers) now runs in two sub-steps: 1. Freeze: cancel idle timers, abort event subscriptions, abort SDK sessions, and mark agents as exited. This prevents the normal completion path (idle timer -> onExit -> bead completion) from racing with the WIP snapshot. 2. Snapshot: git add/commit/push each worktree after all sessions are frozen, avoiding .git/index.lock collisions. Also adds AbortSignal.timeout(10s) to Phase 1 TownDO notification and clearIdleTimer before Phase 2 nudge messages. * fix(container): skip WIP snapshot for agents whose freeze failed Build the 4b snapshot list from successfully frozen agents only. If session.abort() throws in 4a, the agent is excluded from snapshotting to avoid racing git against a still-active session. * fix(container): set upstream on eviction push for first-time branches Use git push --set-upstream origin HEAD so branches without a configured upstream (e.g. freshly created agent worktrees) can push the WIP eviction snapshot instead of failing silently. * fix(container): skip git push for lightweight workspaces with no origin remote Phase 4b of drainAll() assumed all workspaces have an 'origin' remote. Lightweight workspaces (mayor/triage) are initialized with 'git init' and never add a remote, causing 'fatal: origin does not appear to be a git repository' during eviction. Check for the origin remote via 'git remote get-url origin' before attempting to push. When no remote exists, commit locally only and log a warning. --------- Co-authored-by: Pedro Heyerdahl <61753986+pedroheyerdahl@users.noreply.github.com> --- .../container/src/process-manager.ts | 193 ++++++++++++++++++ 1 file changed, 193 insertions(+) diff --git a/cloudflare-gastown/container/src/process-manager.ts b/cloudflare-gastown/container/src/process-manager.ts index f0008e320..eca69e631 100644 --- a/cloudflare-gastown/container/src/process-manager.ts +++ b/cloudflare-gastown/container/src/process-manager.ts @@ -973,6 +973,199 @@ export function activeServerCount(): number { return sdkInstances.size; } +/** + * Gracefully drain all running agents before container eviction. + * + * 4-phase sequence: + * 1. Notify TownDO of the eviction (fire-and-forget) + * 2. Nudge running polecats/refineries to commit & push + * 3. Poll up to 10 min waiting for agents to finish + * 4. Force-save any stragglers via WIP git commit + push + * + * Never throws — all errors are logged and swallowed so the caller + * can always proceed to stopAll() + process.exit(). + */ +export async function drainAll(): Promise { + const DRAIN_LOG = '[drain]'; + + // ── Phase 1: Notify TownDO ────────────────────────────────────────── + try { + const apiUrl = process.env.GASTOWN_API_URL; + const token = process.env.GASTOWN_CONTAINER_TOKEN; + // Grab townId from any registered agent — all agents in a container + // belong to the same town. + const anyAgent = [...agents.values()][0]; + const townId = anyAgent?.townId; + + if (apiUrl && token && townId) { + console.log(`${DRAIN_LOG} Phase 1: notifying TownDO of container eviction`); + const resp = await fetch(`${apiUrl}/api/towns/${townId}/container-eviction`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${token}`, + }, + signal: AbortSignal.timeout(10_000), + }); + console.log(`${DRAIN_LOG} Phase 1: TownDO responded ${resp.status}`); + } else { + console.warn( + `${DRAIN_LOG} Phase 1: skipping TownDO notification (missing apiUrl=${!!apiUrl} token=${!!token} townId=${!!townId})` + ); + } + } catch (err) { + console.warn(`${DRAIN_LOG} Phase 1: TownDO notification failed, continuing:`, err); + } + + // ── Phase 2: Nudge running agents to save ─────────────────────────── + const runningAgents = [...agents.values()].filter(a => a.status === 'running'); + console.log(`${DRAIN_LOG} Phase 2: nudging ${runningAgents.length} running agents`); + + for (const agent of runningAgents) { + try { + let nudgeMessage: string | null = null; + + if (agent.role === 'polecat') { + nudgeMessage = + 'URGENT: The container is shutting down in ~15 minutes. Please commit and push your current changes immediately, then call gt_done. You have 2 minutes before a forced save.'; + } else if (agent.role === 'refinery') { + nudgeMessage = + 'URGENT: The container is shutting down. If your review is complete, call gt_done now. Otherwise your work will be pushed as a WIP commit.'; + } + // Mayor and other roles: no nudge needed + + if (nudgeMessage) { + // Cancel the idle timer before nudging — if the agent was + // already idle, the timer could fire mid-nudge and exit the + // agent before it processes the eviction message. + clearIdleTimer(agent.agentId); + console.log(`${DRAIN_LOG} Phase 2: nudging ${agent.role} agent ${agent.agentId}`); + await sendMessage(agent.agentId, nudgeMessage); + } + } catch (err) { + console.warn( + `${DRAIN_LOG} Phase 2: failed to nudge agent ${agent.agentId} (${agent.role}):`, + err + ); + } + } + + // ── Phase 3: Wait up to 10 minutes ────────────────────────────────── + const DRAIN_WAIT_MS = 10 * 60 * 1000; + const pollInterval = 5000; + const start = Date.now(); + console.log(`${DRAIN_LOG} Phase 3: waiting up to ${DRAIN_WAIT_MS / 1000}s for agents to finish`); + + while (Date.now() - start < DRAIN_WAIT_MS) { + const running = [...agents.values()].filter(a => a.status === 'running'); + if (running.length === 0) break; + console.log(`${DRAIN_LOG} Waiting for ${running.length} agents...`); + await new Promise(r => setTimeout(r, pollInterval)); + } + + // ── Phase 4: Force-save remaining agents ──────────────────────────── + // Two sub-steps: first freeze all stragglers (cancel idle timers, + // abort event subscriptions and SDK sessions), then snapshot each + // worktree. Freezing first prevents the normal completion path + // (idle timer → onExit → bead completion) from racing with the WIP + // git save, and avoids .git/index.lock collisions with agent git ops. + const stragglers = [...agents.values()].filter(a => a.status === 'running'); + if (stragglers.length > 0) { + console.log(`${DRAIN_LOG} Phase 4: freezing ${stragglers.length} straggler(s)`); + } else { + console.log(`${DRAIN_LOG} Phase 4: all agents finished, no force-save needed`); + } + + // 4a: Freeze — cancel idle timers and abort sessions so no + // completion/exit callbacks can fire during the git snapshot. + // Only agents that freeze successfully are safe to snapshot. + const frozen: typeof stragglers = []; + for (const agent of stragglers) { + try { + // Cancel idle timer FIRST — prevents the timer from firing and + // marking the agent as completed via onExit() while we abort. + clearIdleTimer(agent.agentId); + + // Abort event subscription + const controller = eventAbortControllers.get(agent.agentId); + if (controller) { + controller.abort(); + eventAbortControllers.delete(agent.agentId); + } + + // Abort the SDK session + const instance = sdkInstances.get(agent.workdir); + if (instance) { + await instance.client.session.abort({ + path: { id: agent.sessionId }, + }); + } + + agent.status = 'exited'; + agent.exitReason = 'container eviction'; + frozen.push(agent); + console.log(`${DRAIN_LOG} Phase 4: froze agent ${agent.agentId}`); + } catch (err) { + // Freeze failed — the session may still be writing to the + // worktree. Skip this agent in 4b to avoid .git/index.lock + // races and partial snapshots. + console.warn( + `${DRAIN_LOG} Phase 4: failed to freeze agent ${agent.agentId}, skipping snapshot:`, + err + ); + } + } + + // 4b: Snapshot — git add/commit/push each worktree now that + // all sessions are frozen. Only iterate agents that froze + // successfully; unfrozen agents are skipped to avoid racing + // with a still-active SDK session. + for (const agent of frozen) { + try { + console.log(`${DRAIN_LOG} Phase 4: force-saving agent ${agent.agentId} in ${agent.workdir}`); + + // Check whether a remote named "origin" exists. Lightweight + // workspaces (mayor/triage) are created with `git init` and + // never add a remote, so pushing would fail with + // "fatal: 'origin' does not appear to be a git repository". + const remoteCheck = Bun.spawn(['git', 'remote', 'get-url', 'origin'], { + cwd: agent.workdir, + stdout: 'pipe', + stderr: 'pipe', + }); + const hasOrigin = (await remoteCheck.exited) === 0; + + const gitCmd = hasOrigin + ? "git add -A && git commit --allow-empty -m 'WIP: container eviction save' && git push --set-upstream origin HEAD --no-verify" + : "git add -A && git commit --allow-empty -m 'WIP: container eviction save'"; + + if (!hasOrigin) { + console.warn( + `${DRAIN_LOG} Phase 4: no origin remote for agent ${agent.agentId}, committing locally only (push skipped)` + ); + } + + const proc = Bun.spawn(['bash', '-c', gitCmd], { + cwd: agent.workdir, + stdout: 'pipe', + stderr: 'pipe', + }); + const exitCode = await proc.exited; + const stdout = await new Response(proc.stdout).text(); + const stderr = await new Response(proc.stderr).text(); + console.log( + `${DRAIN_LOG} Phase 4: agent ${agent.agentId} git save exited ${exitCode}` + + (stdout ? ` stdout=${stdout.trim()}` : '') + + (stderr ? ` stderr=${stderr.trim()}` : '') + ); + } catch (err) { + console.warn(`${DRAIN_LOG} Phase 4: force-save failed for agent ${agent.agentId}:`, err); + } + } + + console.log(`${DRAIN_LOG} Drain complete`); +} + export async function stopAll(): Promise { // Cancel all idle timers for (const [, timer] of idleTimers) { From 9a95f63faf29cc5568fca3779de397d8e703f5b8 Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Wed, 1 Apr 2026 01:15:38 +0000 Subject: [PATCH 3/9] feat(container): use drainAll in SIGTERM handler for graceful eviction MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit SIGTERM now runs stopHeartbeat → drainAll → stopAll → exit(0) to allow in-flight agent work to finish before the container shuts down. SIGINT retains the immediate shutdown path for local dev. --- cloudflare-gastown/container/src/control-server.ts | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/cloudflare-gastown/container/src/control-server.ts b/cloudflare-gastown/container/src/control-server.ts index 86b1d5c8d..ef3a181bb 100644 --- a/cloudflare-gastown/container/src/control-server.ts +++ b/cloudflare-gastown/container/src/control-server.ts @@ -10,6 +10,7 @@ import { activeServerCount, getUptime, stopAll, + drainAll, getAgentEvents, registerEventSink, } from './process-manager'; @@ -732,7 +733,7 @@ export function startControlServer(): void { startHeartbeat(apiUrl, authToken); } - // Handle graceful shutdown + // Handle graceful shutdown (immediate, no drain — used by SIGINT for dev) const shutdown = async () => { console.log('Shutting down control server...'); stopHeartbeat(); @@ -740,7 +741,14 @@ export function startControlServer(): void { process.exit(0); }; - process.on('SIGTERM', () => void shutdown()); + process.on('SIGTERM', () => void (async () => { + console.log('[control-server] SIGTERM received — starting graceful drain...'); + stopHeartbeat(); + await drainAll(); + await stopAll(); + process.exit(0); + })()); + process.on('SIGINT', () => void shutdown()); // Track connected WebSocket clients with optional agent filter From e1bb9ad3baf20e3effbf713dab0df9e68ca506a5 Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Wed, 1 Apr 2026 01:16:48 +0000 Subject: [PATCH 4/9] style: fix formatting from oxfmt --- .../container/src/control-server.ts | 18 +++++++++++------- cloudflare-gastown/src/gastown.worker.ts | 4 +--- .../src/handlers/town-eviction.handler.ts | 5 +---- 3 files changed, 13 insertions(+), 14 deletions(-) diff --git a/cloudflare-gastown/container/src/control-server.ts b/cloudflare-gastown/container/src/control-server.ts index ef3a181bb..2e7d4793e 100644 --- a/cloudflare-gastown/container/src/control-server.ts +++ b/cloudflare-gastown/container/src/control-server.ts @@ -741,13 +741,17 @@ export function startControlServer(): void { process.exit(0); }; - process.on('SIGTERM', () => void (async () => { - console.log('[control-server] SIGTERM received — starting graceful drain...'); - stopHeartbeat(); - await drainAll(); - await stopAll(); - process.exit(0); - })()); + process.on( + 'SIGTERM', + () => + void (async () => { + console.log('[control-server] SIGTERM received — starting graceful drain...'); + stopHeartbeat(); + await drainAll(); + await stopAll(); + process.exit(0); + })() + ); process.on('SIGINT', () => void shutdown()); diff --git a/cloudflare-gastown/src/gastown.worker.ts b/cloudflare-gastown/src/gastown.worker.ts index d5808e0af..2f0738c86 100644 --- a/cloudflare-gastown/src/gastown.worker.ts +++ b/cloudflare-gastown/src/gastown.worker.ts @@ -501,9 +501,7 @@ app.post('/api/towns/:townId/container-ready', c => ); app.get('/api/towns/:townId/drain-status', c => - instrumented(c, 'GET /api/towns/:townId/drain-status', () => - handleDrainStatus(c, c.req.param()) - ) + instrumented(c, 'GET /api/towns/:townId/drain-status', () => handleDrainStatus(c, c.req.param())) ); // ── Kilo User Auth ────────────────────────────────────────────────────── diff --git a/cloudflare-gastown/src/handlers/town-eviction.handler.ts b/cloudflare-gastown/src/handlers/town-eviction.handler.ts index b60847888..36d7a8e14 100644 --- a/cloudflare-gastown/src/handlers/town-eviction.handler.ts +++ b/cloudflare-gastown/src/handlers/town-eviction.handler.ts @@ -87,10 +87,7 @@ export async function handleDrainStatus( } const town = getTownDOStub(c.env, params.townId); - const [draining, drainNonce] = await Promise.all([ - town.isDraining(), - town.getDrainNonce(), - ]); + const [draining, drainNonce] = await Promise.all([town.isDraining(), town.getDrainNonce()]); return c.json(resSuccess({ draining, drainNonce }), 200); } From 09e5b3231e3edfc722b9208f89026c60aee4e0cb Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Tue, 31 Mar 2026 18:51:13 +0000 Subject: [PATCH 5/9] test(gastown): add integration tests for container_eviction draining lifecycle - Add 3 TownDO draining lifecycle tests in town-container.test.ts: - recordContainerEviction() sets draining flag - touchAgentHeartbeat() clears draining flag after eviction - draining flag persists across re-initialization - Add 1 reconciler applyEvent test in reconciler.test.ts: - container_eviction event is processed without error (audit-only no-op) --- .../test/integration/reconciler.test.ts | 13 +++++ .../test/integration/town-container.test.ts | 53 +++++++++++++++++++ 2 files changed, 66 insertions(+) diff --git a/cloudflare-gastown/test/integration/reconciler.test.ts b/cloudflare-gastown/test/integration/reconciler.test.ts index e5e286924..e46e7cae3 100644 --- a/cloudflare-gastown/test/integration/reconciler.test.ts +++ b/cloudflare-gastown/test/integration/reconciler.test.ts @@ -485,5 +485,18 @@ describe('Reconciler', () => { const statusAfter = await town.getConvoyStatus(result.convoy.id); expect(statusAfter?.staged).toBe(false); }); + + it('should process container_eviction event without error', async () => { + // Record a container eviction (inserts a container_eviction event) + await town.recordContainerEviction(); + + // Run alarm — the reconciler should drain the container_eviction + // event and process it as a no-op (audit-only) without throwing. + await runDurableObjectAlarm(town); + + // The draining flag should still be true (the reconciler doesn't + // clear it — only a heartbeat does). + expect(await town.isDraining()).toBe(true); + }); }); }); diff --git a/cloudflare-gastown/test/integration/town-container.test.ts b/cloudflare-gastown/test/integration/town-container.test.ts index 0849168b9..1f815da14 100644 --- a/cloudflare-gastown/test/integration/town-container.test.ts +++ b/cloudflare-gastown/test/integration/town-container.test.ts @@ -119,3 +119,56 @@ describe('Town DO — touchAgentHeartbeat', () => { expect(updated!.last_activity_at).not.toBe(initialActivity); }); }); + +// ── Container Eviction (draining lifecycle) ──────────────────────────── + +describe('Town DO — container eviction draining lifecycle', () => { + it('recordContainerEviction() sets draining flag', async () => { + const id = `town-${crypto.randomUUID()}`; + const town = env.TOWN.get(env.TOWN.idFromName(id)); + + // Before eviction, isDraining should be false + expect(await town.isDraining()).toBe(false); + + // Record eviction + await town.recordContainerEviction(); + + // isDraining should now be true + expect(await town.isDraining()).toBe(true); + }); + + it('touchAgentHeartbeat() clears draining flag after eviction', async () => { + const id = `town-${crypto.randomUUID()}`; + const town = env.TOWN.get(env.TOWN.idFromName(id)); + + // Register an agent so heartbeat has a valid target + const agent = await town.registerAgent({ + role: 'polecat', + name: 'drain-clear-test', + identity: `drain-clear-${id}`, + }); + + // Set draining flag + await town.recordContainerEviction(); + expect(await town.isDraining()).toBe(true); + + // Heartbeat should clear draining flag + await town.touchAgentHeartbeat(agent.id); + expect(await town.isDraining()).toBe(false); + }); + + it('draining flag persists across re-initialization', async () => { + const id = `town-${crypto.randomUUID()}`; + const town = env.TOWN.get(env.TOWN.idFromName(id)); + + // Record eviction on the first stub reference + await town.recordContainerEviction(); + expect(await town.isDraining()).toBe(true); + + // Get a fresh stub reference to the same DO (same name = same instance). + // The DO's blockConcurrencyWhile loads town:draining from KV on init, + // so the draining flag should survive. + const town2 = env.TOWN.get(env.TOWN.idFromName(id)); + expect(await town2.isDraining()).toBe(true); + }); +}); From bd8748348c467dfe26fab330e1c4905dfa555f90 Mon Sep 17 00:00:00 2001 From: "kilo-code-bot[bot]" <240665456+kilo-code-bot[bot]@users.noreply.github.com> Date: Wed, 1 Apr 2026 21:58:28 +0200 Subject: [PATCH 6/9] perf(providers): optimize gateway error rate query to avoid unnecessary view joins (#1865) The query was using microdollar_usage_view which joins 13+ tables via LEFT JOINs, but only needs columns from microdollar_usage and microdollar_usage_metadata. Query now directly joins only these two tables. Co-authored-by: kiloconnect[bot] <240665456+kiloconnect[bot]@users.noreply.github.com> --- src/lib/providers/gateway-error-rate.ts | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/lib/providers/gateway-error-rate.ts b/src/lib/providers/gateway-error-rate.ts index 8dbcdb64e..54bde1025 100644 --- a/src/lib/providers/gateway-error-rate.ts +++ b/src/lib/providers/gateway-error-rate.ts @@ -7,14 +7,15 @@ const getGatewayErrorRate_cached = unstable_cache( console.debug(`[getGatewayErrorRate_cached] refreshing at ${new Date().toISOString()}`); const { rows } = await db.execute(sql` select - provider as "gateway", - 1.0 * count(*) filter(where has_error = true) / count(*) as "errorRate" - from microdollar_usage_view + mu.provider as "gateway", + 1.0 * count(*) filter(where mu.has_error = true) / count(*) as "errorRate" + from microdollar_usage mu + join microdollar_usage_metadata meta on mu.id = meta.id where true - and created_at >= now() - interval '10 minutes' - and is_user_byok = false - and provider in ('openrouter', 'vercel') - group by provider + and mu.created_at >= now() - interval '10 minutes' + and meta.is_user_byok = false + and mu.provider in ('openrouter', 'vercel') + group by mu.provider `); return z .array( From f998129a9e2ac6963c95fcb05deb383ab17486fb Mon Sep 17 00:00:00 2001 From: "kilo-code-bot[bot]" <240665456+kilo-code-bot[bot]@users.noreply.github.com> Date: Wed, 1 Apr 2026 22:53:29 +0200 Subject: [PATCH 7/9] feat(admin): add safety identifier backfill panel (#1863) * feat(admin): add safety identifier backfill panel * feat(admin): show Vercel Downstream Safety Identifier in user admin UI * refactor(admin): merge safety identifier backfill into single API call * refactor(admin): single query for safety identifier backfill * fmt * Delete obsolete script --------- Co-authored-by: kiloconnect[bot] <240665456+kiloconnect[bot]@users.noreply.github.com> Co-authored-by: Christiaan Arnoldus --- src/app/admin/api/safety-identifiers/route.ts | 76 ++++++++++++++ src/app/admin/components/AppSidebar.tsx | 6 ++ .../components/SafetyIdentifiersBackfill.tsx | 99 +++++++++++++++++++ .../UserAdmin/UserAdminAccountInfo.tsx | 15 +++ src/app/admin/safety-identifiers/page.tsx | 24 +++++ src/lib/user.test.ts | 2 + .../openrouter/backfill-safety-identifier.ts | 40 -------- 7 files changed, 222 insertions(+), 40 deletions(-) create mode 100644 src/app/admin/api/safety-identifiers/route.ts create mode 100644 src/app/admin/components/SafetyIdentifiersBackfill.tsx create mode 100644 src/app/admin/safety-identifiers/page.tsx delete mode 100644 src/scripts/openrouter/backfill-safety-identifier.ts diff --git a/src/app/admin/api/safety-identifiers/route.ts b/src/app/admin/api/safety-identifiers/route.ts new file mode 100644 index 000000000..c9cb74867 --- /dev/null +++ b/src/app/admin/api/safety-identifiers/route.ts @@ -0,0 +1,76 @@ +import { NextResponse } from 'next/server'; +import { getUserFromAuth } from '@/lib/user.server'; +import { db } from '@/lib/drizzle'; +import { kilocode_users } from '@kilocode/db'; +import { + generateOpenRouterUpstreamSafetyIdentifier, + generateVercelDownstreamSafetyIdentifier, +} from '@/lib/providerHash'; +import { isNull, count, or, desc, eq } from 'drizzle-orm'; + +const missingEither = or( + isNull(kilocode_users.openrouter_upstream_safety_identifier), + isNull(kilocode_users.vercel_downstream_safety_identifier) +); + +export type SafetyIdentifierCountsResponse = { + missing: number; +}; + +export type BackfillBatchResponse = { + processed: number; + remaining: boolean; +}; + +export async function GET(): Promise< + NextResponse +> { + const { authFailedResponse } = await getUserFromAuth({ adminOnly: true }); + if (authFailedResponse) return authFailedResponse; + + const [result] = await db.select({ count: count() }).from(kilocode_users).where(missingEither); + + return NextResponse.json({ missing: result?.count ?? 0 }); +} + +export async function POST(): Promise> { + const { authFailedResponse } = await getUserFromAuth({ adminOnly: true }); + if (authFailedResponse) return authFailedResponse; + + const processed = await db.transaction(async tran => { + const rows = await tran + .select({ id: kilocode_users.id }) + .from(kilocode_users) + .where(missingEither) + .orderBy(desc(kilocode_users.created_at)) + .limit(1000); + + for (const user of rows) { + const openrouter_upstream_safety_identifier = generateOpenRouterUpstreamSafetyIdentifier( + user.id + ); + if (openrouter_upstream_safety_identifier === null) { + return null; + } + await tran + .update(kilocode_users) + .set({ + openrouter_upstream_safety_identifier, + vercel_downstream_safety_identifier: generateVercelDownstreamSafetyIdentifier(user.id), + }) + .where(eq(kilocode_users.id, user.id)) + .execute(); + } + + return rows.length; + }); + + if (processed === null) { + return NextResponse.json( + { error: 'OPENROUTER_ORG_ID is not configured on this server' }, + { status: 500 } + ); + } + + return NextResponse.json({ processed, remaining: processed === 1000 }); +} diff --git a/src/app/admin/components/AppSidebar.tsx b/src/app/admin/components/AppSidebar.tsx index 0fc8480f8..8eddc3788 100644 --- a/src/app/admin/components/AppSidebar.tsx +++ b/src/app/admin/components/AppSidebar.tsx @@ -23,6 +23,7 @@ import { Bell, Server, Network, + KeyRound, } from 'lucide-react'; import { useSession } from 'next-auth/react'; import type { Session } from 'next-auth'; @@ -79,6 +80,11 @@ const userManagementItems: MenuItem[] = [ url: '/admin/blacklisted-domains', icon: () => , }, + { + title: () => 'Safety Identifiers', + url: '/admin/safety-identifiers', + icon: () => , + }, ]; const financialItems: MenuItem[] = [ diff --git a/src/app/admin/components/SafetyIdentifiersBackfill.tsx b/src/app/admin/components/SafetyIdentifiersBackfill.tsx new file mode 100644 index 000000000..5c7b8ed64 --- /dev/null +++ b/src/app/admin/components/SafetyIdentifiersBackfill.tsx @@ -0,0 +1,99 @@ +'use client'; + +import { useState } from 'react'; +import { useQuery, useMutation, useQueryClient } from '@tanstack/react-query'; +import { Button } from '@/components/ui/button'; +import { Alert, AlertDescription } from '@/components/ui/alert'; +import { Badge } from '@/components/ui/badge'; +import type { + SafetyIdentifierCountsResponse, + BackfillBatchResponse, +} from '../api/safety-identifiers/route'; + +type BatchLog = { + processed: number; + timestamp: Date; +}; + +export function SafetyIdentifiersBackfill() { + const [logs, setLogs] = useState([]); + const queryClient = useQueryClient(); + + const { data: counts, isLoading } = useQuery({ + queryKey: ['safety-identifier-counts'], + queryFn: async () => { + const res = await fetch('/admin/api/safety-identifiers'); + return res.json() as Promise; + }, + refetchInterval: false, + }); + + const mutation = useMutation({ + mutationFn: async () => { + const res = await fetch('/admin/api/safety-identifiers', { method: 'POST' }); + if (!res.ok) { + const body = (await res.json()) as { error?: string }; + throw new Error(body.error ?? `HTTP ${res.status}`); + } + return res.json() as Promise; + }, + onSuccess: data => { + setLogs(prev => [{ processed: data.processed, timestamp: new Date() }, ...prev]); + void queryClient.invalidateQueries({ queryKey: ['safety-identifier-counts'] }); + }, + }); + + const isDone = counts?.missing === 0; + + return ( +
+

+ Backfill safety identifiers for users missing either field. Each click processes up to 1 000 + users. Click repeatedly until the counter reaches zero. +

+ +
+
+ Users missing a safety identifier + {isLoading ? ( + Loading… + ) : isDone ? ( + + All filled + + ) : ( + {(counts?.missing ?? 0).toLocaleString()} missing + )} +
+ + {mutation.isError && ( + + {mutation.error.message} + + )} + + +
+ + {logs.length > 0 && ( +
+

Batch log

+
+ {logs.map((log, i) => ( +
+ {log.timestamp.toLocaleTimeString()} + processed {log.processed.toLocaleString()} users +
+ ))} +
+
+ )} +
+ ); +} diff --git a/src/app/admin/components/UserAdmin/UserAdminAccountInfo.tsx b/src/app/admin/components/UserAdmin/UserAdminAccountInfo.tsx index 3566922e2..f60db2173 100644 --- a/src/app/admin/components/UserAdmin/UserAdminAccountInfo.tsx +++ b/src/app/admin/components/UserAdmin/UserAdminAccountInfo.tsx @@ -108,6 +108,21 @@ export function UserAdminAccountInfo(user: UserAdminAccountInfoProps) {

N/A

)} +
+

+ Vercel Downstream Safety Identifier +

+ {user.vercel_downstream_safety_identifier ? ( +
+

+ {user.vercel_downstream_safety_identifier} +

+ +
+ ) : ( +

N/A

+ )} +
diff --git a/src/app/admin/safety-identifiers/page.tsx b/src/app/admin/safety-identifiers/page.tsx new file mode 100644 index 000000000..f0ed5dbb0 --- /dev/null +++ b/src/app/admin/safety-identifiers/page.tsx @@ -0,0 +1,24 @@ +import { SafetyIdentifiersBackfill } from '../components/SafetyIdentifiersBackfill'; +import AdminPage from '../components/AdminPage'; +import { BreadcrumbItem, BreadcrumbPage } from '@/components/ui/breadcrumb'; + +const breadcrumbs = ( + <> + + Safety Identifiers + + +); + +export default function SafetyIdentifiersPage() { + return ( + +
+
+

Safety Identifier Backfill

+
+ +
+
+ ); +} diff --git a/src/lib/user.test.ts b/src/lib/user.test.ts index 04109844e..612a005dc 100644 --- a/src/lib/user.test.ts +++ b/src/lib/user.test.ts @@ -92,6 +92,7 @@ describe('User', () => { linkedin_url: 'https://linkedin.com/in/testuser', github_url: 'https://github.com/testuser', openrouter_upstream_safety_identifier: 'openrouter_upstream_safety_identifier', + vercel_downstream_safety_identifier: 'vercel_downstream_safety_identifier', customer_source: 'A YouTube video', is_admin: true, }); @@ -108,6 +109,7 @@ describe('User', () => { expect(softDeleted!.github_url).toBeNull(); expect(softDeleted!.discord_server_membership_verified_at).toBeNull(); expect(softDeleted!.openrouter_upstream_safety_identifier).toBeNull(); + expect(softDeleted!.vercel_downstream_safety_identifier).toBeNull(); expect(softDeleted!.customer_source).toBeNull(); expect(softDeleted!.api_token_pepper).toBeNull(); expect(softDeleted!.default_model).toBeNull(); diff --git a/src/scripts/openrouter/backfill-safety-identifier.ts b/src/scripts/openrouter/backfill-safety-identifier.ts deleted file mode 100644 index c8e1f690c..000000000 --- a/src/scripts/openrouter/backfill-safety-identifier.ts +++ /dev/null @@ -1,40 +0,0 @@ -import { db } from '@/lib/drizzle'; -import { generateOpenRouterUpstreamSafetyIdentifier } from '@/lib/providerHash'; -import { kilocode_users } from '@kilocode/db'; -import { isNull, desc, eq } from 'drizzle-orm'; - -export async function run() { - while (true) { - const count = await db.transaction(async tran => { - const rows = await tran - .select({ - id: kilocode_users.id, - }) - .from(kilocode_users) - .where(isNull(kilocode_users.openrouter_upstream_safety_identifier)) - .orderBy(desc(kilocode_users.created_at)) - .limit(1000); - if (rows.length === 0) { - return 0; - } - console.log(`Batch of ${rows.length} users`); - for (const user of rows) { - const openrouter_upstream_safety_identifier = generateOpenRouterUpstreamSafetyIdentifier( - user.id - ); - await tran - .update(kilocode_users) - .set({ - openrouter_upstream_safety_identifier, - }) - .where(eq(kilocode_users.id, user.id)) - .execute(); - } - console.log('Commit'); - return rows.length; - }); - if (count === 0) { - break; - } - } -} From 9da15e89b2703e41f9843138e68d8fe7062a04f9 Mon Sep 17 00:00:00 2001 From: Emilie Lima Schario <14057155+emilieschario@users.noreply.github.com> Date: Wed, 1 Apr 2026 17:18:30 -0400 Subject: [PATCH 8/9] remove beta label for KiloClaw (#1871) remove beta label --- src/app/(app)/claw/components/ClawHeader.tsx | 1 - 1 file changed, 1 deletion(-) diff --git a/src/app/(app)/claw/components/ClawHeader.tsx b/src/app/(app)/claw/components/ClawHeader.tsx index 0f7ccfc9d..ca9c827d0 100644 --- a/src/app/(app)/claw/components/ClawHeader.tsx +++ b/src/app/(app)/claw/components/ClawHeader.tsx @@ -28,7 +28,6 @@ export function ClawHeader({ title="KiloClaw" icon={} > - Beta {statusInfo && ( {statusInfo.label} From 892db1cbb113314d6bd9af541cd9fd5d71099d9a Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Wed, 1 Apr 2026 22:46:58 +0000 Subject: [PATCH 9/9] fix(gastown): fix eviction test assertions and replace as casts with Zod validation - Replace incorrect test that asserted touchAgentHeartbeat() clears drain flag (it doesn't) with tests for acknowledgeContainerReady() which is the actual mechanism that clears drain state - Add test for nonce mismatch case (wrong nonce keeps draining=true) - Add test verifying touchAgentHeartbeat returns drainNonce but doesn't clear draining flag, and only acknowledgeContainerReady clears it - Replace manual 'as' casts in handleContainerReady with Zod schema validation per project conventions --- .../src/handlers/town-eviction.handler.ts | 20 ++------ .../test/integration/town-container.test.ts | 46 ++++++++++++++++--- 2 files changed, 45 insertions(+), 21 deletions(-) diff --git a/cloudflare-gastown/src/handlers/town-eviction.handler.ts b/cloudflare-gastown/src/handlers/town-eviction.handler.ts index 36d7a8e14..b92e6a502 100644 --- a/cloudflare-gastown/src/handlers/town-eviction.handler.ts +++ b/cloudflare-gastown/src/handlers/town-eviction.handler.ts @@ -1,4 +1,5 @@ import type { Context } from 'hono'; +import { z } from 'zod'; import { extractBearerToken } from '@kilocode/worker-utils'; import type { GastownEnv } from '../gastown.worker'; import { getTownDOStub } from '../dos/Town.do'; @@ -125,24 +126,13 @@ export async function handleContainerReady( return c.json(resError('Cross-town access denied'), 403); } - let nonce: string | undefined; - try { - const body: unknown = await c.req.json(); - if ( - body && - typeof body === 'object' && - 'nonce' in body && - typeof (body as { nonce: unknown }).nonce === 'string' - ) { - nonce = (body as { nonce: string }).nonce; - } - } catch { - // No body or invalid JSON - } + const ContainerReadyBody = z.object({ nonce: z.string() }); - if (!nonce) { + const parsed = ContainerReadyBody.safeParse(await c.req.json().catch(() => null)); + if (!parsed.success) { return c.json(resError('Missing required field: nonce'), 400); } + const { nonce } = parsed.data; const town = getTownDOStub(c.env, params.townId); const cleared = await town.acknowledgeContainerReady(nonce); diff --git a/cloudflare-gastown/test/integration/town-container.test.ts b/cloudflare-gastown/test/integration/town-container.test.ts index 1f815da14..f9313f052 100644 --- a/cloudflare-gastown/test/integration/town-container.test.ts +++ b/cloudflare-gastown/test/integration/town-container.test.ts @@ -137,23 +137,57 @@ describe('Town DO — container eviction draining lifecycle', () => { expect(await town.isDraining()).toBe(true); }); - it('touchAgentHeartbeat() clears draining flag after eviction', async () => { + it('acknowledgeContainerReady() clears draining flag with correct nonce', async () => { + const id = `town-${crypto.randomUUID()}`; + const town = env.TOWN.get(env.TOWN.idFromName(id)); + + // Set draining flag and capture the nonce + const drainNonce = await town.recordContainerEviction(); + expect(await town.isDraining()).toBe(true); + + // Acknowledge with the correct nonce clears the drain flag + const cleared = await town.acknowledgeContainerReady(drainNonce); + expect(cleared).toBe(true); + expect(await town.isDraining()).toBe(false); + }); + + it('acknowledgeContainerReady() rejects wrong nonce and stays draining', async () => { + const id = `town-${crypto.randomUUID()}`; + const town = env.TOWN.get(env.TOWN.idFromName(id)); + + // Set draining flag + await town.recordContainerEviction(); + expect(await town.isDraining()).toBe(true); + + // Wrong nonce should not clear the drain flag + const cleared = await town.acknowledgeContainerReady('wrong-nonce'); + expect(cleared).toBe(false); + expect(await town.isDraining()).toBe(true); + }); + + it('touchAgentHeartbeat() returns drainNonce during eviction', async () => { const id = `town-${crypto.randomUUID()}`; const town = env.TOWN.get(env.TOWN.idFromName(id)); // Register an agent so heartbeat has a valid target const agent = await town.registerAgent({ role: 'polecat', - name: 'drain-clear-test', - identity: `drain-clear-${id}`, + name: 'drain-nonce-test', + identity: `drain-nonce-${id}`, }); // Set draining flag - await town.recordContainerEviction(); + const drainNonce = await town.recordContainerEviction(); expect(await town.isDraining()).toBe(true); - // Heartbeat should clear draining flag - await town.touchAgentHeartbeat(agent.id); + // Heartbeat returns the drainNonce (but does NOT clear draining) + const heartbeatResult = await town.touchAgentHeartbeat(agent.id); + expect(heartbeatResult.drainNonce).toBe(drainNonce); + expect(await town.isDraining()).toBe(true); + + // Only acknowledgeContainerReady with the nonce clears it + const cleared = await town.acknowledgeContainerReady(drainNonce); + expect(cleared).toBe(true); expect(await town.isDraining()).toBe(false); });