From 38602bdf2fe94ae6d041de782f470a27133b90dd Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Wed, 1 Apr 2026 03:49:57 +0000 Subject: [PATCH 1/7] fix(gastown): stop false-positive invariant violations spamming logs every 5s MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Invariant 7: Exclude mayors from the 'working agent must have a hook' check — mayors are always working and intentionally hookless. Invariant 5: Widen valid convoy bead states to include in_progress and in_review, which are legitimate transient states while child beads are being worked on. Only flag truly unexpected states. Fixes #1364 --- cloudflare-gastown/src/dos/town/reconciler.ts | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/cloudflare-gastown/src/dos/town/reconciler.ts b/cloudflare-gastown/src/dos/town/reconciler.ts index e9fef8d21..4f8664f96 100644 --- a/cloudflare-gastown/src/dos/town/reconciler.ts +++ b/cloudflare-gastown/src/dos/town/reconciler.ts @@ -1494,6 +1494,7 @@ export function checkInvariants(sql: SqlStorage): Violation[] { const violations: Violation[] = []; // Invariant 7: Working agents must have hooks + // Mayors are always 'working' and intentionally have no hook — exclude them. const unhookedWorkers = z .object({ bead_id: z.string() }) .array() @@ -1505,6 +1506,7 @@ export function checkInvariants(sql: SqlStorage): Violation[] { FROM ${agent_metadata} WHERE ${agent_metadata.status} = 'working' AND ${agent_metadata.current_hook_bead_id} IS NULL + AND ${agent_metadata.role} != 'mayor' `, [] ), @@ -1516,26 +1518,27 @@ export function checkInvariants(sql: SqlStorage): Violation[] { }); } - // Invariant 5: Convoy beads should not be in_progress - const inProgressConvoys = z - .object({ bead_id: z.string() }) + // Invariant 5: Convoy beads should not be in unexpected states. + // Valid transient states: open, in_progress, in_review, closed. + const badStateConvoys = z + .object({ bead_id: z.string(), status: z.string() }) .array() .parse([ ...query( sql, /* sql */ ` - SELECT ${beads.bead_id} + SELECT ${beads.bead_id}, ${beads.status} FROM ${beads} WHERE ${beads.type} = 'convoy' - AND ${beads.status} = 'in_progress' + AND ${beads.status} NOT IN ('open', 'in_progress', 'in_review', 'closed') `, [] ), ]); - for (const c of inProgressConvoys) { + for (const c of badStateConvoys) { violations.push({ invariant: 5, - message: `Convoy bead ${c.bead_id} is in_progress (should only be open or closed)`, + message: `Convoy bead ${c.bead_id} is in unexpected state '${c.status}'`, }); } From 884daf85d966f52e876b751e6f461b13d58351e4 Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Wed, 1 Apr 2026 03:52:37 +0000 Subject: [PATCH 2/7] fix(gastown): preserve org billing context across model changes (#1756) Store organizationId as a standalone env var (GASTOWN_ORGANIZATION_ID) that survives KILO_CONFIG_CONTENT rebuilds. Three defense layers: 1. Set GASTOWN_ORGANIZATION_ID on /agents/start and read it first in extractOrganizationId() before falling back to config extraction. 2. Pass organizationId in the PATCH /model request body so the container receives it even if X-Town-Config parsing fails. 3. Include organization_id in buildContainerConfig and sync it to process.env in the PATCH handler's X-Town-Config processing. --- .../container/src/control-server.ts | 16 ++++++++++++++++ .../container/src/process-manager.ts | 6 ++++++ cloudflare-gastown/container/src/types.ts | 2 ++ cloudflare-gastown/src/dos/Town.do.ts | 8 +++++++- cloudflare-gastown/src/dos/town/config.ts | 1 + .../src/dos/town/container-dispatch.ts | 4 +++- 6 files changed, 35 insertions(+), 2 deletions(-) diff --git a/cloudflare-gastown/container/src/control-server.ts b/cloudflare-gastown/container/src/control-server.ts index 70ba58c62..15b481355 100644 --- a/cloudflare-gastown/container/src/control-server.ts +++ b/cloudflare-gastown/container/src/control-server.ts @@ -142,6 +142,11 @@ app.post('/agents/start', async c => { return c.json({ error: 'Invalid request body', issues: parsed.error.issues }, 400); } + // Persist the organization ID as a standalone env var so it survives + // config rebuilds (e.g. model hot-swap). The env var is the primary + // source of truth; KILO_CONFIG_CONTENT extraction is the fallback. + process.env.GASTOWN_ORGANIZATION_ID = parsed.data.organizationId ?? ''; + console.log( `[control-server] /agents/start: role=${parsed.data.role} name=${parsed.data.name} rigId=${parsed.data.rigId} agentId=${parsed.data.agentId}` ); @@ -211,6 +216,11 @@ app.patch('/agents/:agentId/model', async c => { return c.json({ error: 'Invalid request body', issues: parsed.error.issues }, 400); } + // Update org billing context from the request body if provided. + if (parsed.data.organizationId) { + process.env.GASTOWN_ORGANIZATION_ID = parsed.data.organizationId; + } + // Sync config-derived env vars from X-Town-Config into process.env so // the SDK server restart picks up fresh tokens and git identity. // The middleware already parsed the header into lastKnownTownConfig. @@ -252,6 +262,12 @@ app.patch('/agents/:agentId/model', async c => { } else { delete process.env.GASTOWN_DISABLE_AI_COAUTHOR; } + // organization_id — keep the standalone env var in sync with the + // town config so org billing context is never lost. + const orgId = cfg.organization_id; + if (typeof orgId === 'string' && orgId) { + process.env.GASTOWN_ORGANIZATION_ID = orgId; + } } await updateAgentModel( diff --git a/cloudflare-gastown/container/src/process-manager.ts b/cloudflare-gastown/container/src/process-manager.ts index f0008e320..e24ef0b18 100644 --- a/cloudflare-gastown/container/src/process-manager.ts +++ b/cloudflare-gastown/container/src/process-manager.ts @@ -750,6 +750,12 @@ export async function sendMessage(agentId: string, prompt: string): Promise; diff --git a/cloudflare-gastown/src/dos/Town.do.ts b/cloudflare-gastown/src/dos/Town.do.ts index b3eeaa158..f2c391df7 100644 --- a/cloudflare-gastown/src/dos/Town.do.ts +++ b/cloudflare-gastown/src/dos/Town.do.ts @@ -2067,6 +2067,11 @@ export class TownDO extends DurableObject { // before restarting the SDK server (tokens, git identity, etc.). const containerConfig = await config.buildContainerConfig(this.ctx.storage, this.env); + // Resolve townConfig to thread the organization_id into the request body + // (belt-and-suspenders: ensures org billing survives even if X-Town-Config + // header parsing fails on the container side). + const townConfig = await config.getTownConfig(this.ctx.storage); + const updated = await dispatch.updateAgentModelInContainer( this.env, townId, @@ -2074,7 +2079,8 @@ export class TownDO extends DurableObject { model, smallModel, conversationHistory || undefined, - containerConfig + containerConfig, + townConfig.organization_id ); if (updated) { console.log( diff --git a/cloudflare-gastown/src/dos/town/config.ts b/cloudflare-gastown/src/dos/town/config.ts index 0d064ed6b..05a90d827 100644 --- a/cloudflare-gastown/src/dos/town/config.ts +++ b/cloudflare-gastown/src/dos/town/config.ts @@ -150,5 +150,6 @@ export async function buildContainerConfig( disable_ai_coauthor: config.disable_ai_coauthor, kilo_api_url: env.KILO_API_URL ?? '', gastown_api_url: env.GASTOWN_API_URL ?? '', + organization_id: config.organization_id, }; } diff --git a/cloudflare-gastown/src/dos/town/container-dispatch.ts b/cloudflare-gastown/src/dos/town/container-dispatch.ts index 4932e92a5..65d6266aa 100644 --- a/cloudflare-gastown/src/dos/town/container-dispatch.ts +++ b/cloudflare-gastown/src/dos/town/container-dispatch.ts @@ -676,7 +676,8 @@ export async function updateAgentModelInContainer( model: string, smallModel?: string, conversationHistory?: string, - containerConfig?: Record + containerConfig?: Record, + organizationId?: string ): Promise { try { const container = getTownContainerStub(env, townId); @@ -691,6 +692,7 @@ export async function updateAgentModelInContainer( model, ...(smallModel ? { smallModel } : {}), ...(conversationHistory ? { conversationHistory } : {}), + ...(organizationId ? { organizationId } : {}), }), }); return response.ok; From 6f3decb406ac8af967151f009931432adedaeef0 Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Wed, 1 Apr 2026 03:56:26 +0000 Subject: [PATCH 3/7] fix(gastown): prevent duplicate session leak when restarting a starting agent Thread an AbortController through the startAgent() startup sequence so that when a restart is requested for an agent still in 'starting' status, the in-flight startup is cancelled before session.create() can produce an orphaned session. - Add startupAbortController field to ManagedAgent type - In startAgent: abort existing startup controller when agent is 'starting' - Check signal.aborted after each async step (ensureSDKServer, session.create, before session.prompt) - On abort: decrement sessionCount, remove agent entry, clean up SDK instance if no sessions remain - In stopAgent: also abort startupAbortController for 'starting' agents - Introduce StartupAbortedError to distinguish abort from real failures Fixes #1341 --- .../container/src/process-manager.ts | 71 +++++++++++++++++++ cloudflare-gastown/container/src/types.ts | 4 ++ 2 files changed, 75 insertions(+) diff --git a/cloudflare-gastown/container/src/process-manager.ts b/cloudflare-gastown/container/src/process-manager.ts index f0008e320..c79ab5bbc 100644 --- a/cloudflare-gastown/container/src/process-manager.ts +++ b/cloudflare-gastown/container/src/process-manager.ts @@ -546,6 +546,16 @@ export async function startAgent( console.log( `${MANAGER_LOG} startAgent: stopping existing session for ${request.agentId} (status=${existing.status})` ); + + // If the agent is still starting, abort the in-flight startup to prevent + // an orphaned session from being created after stopAgent returns. + if (existing.status === 'starting' && existing.startupAbortController) { + console.log( + `${MANAGER_LOG} startAgent: aborting in-flight startup for ${request.agentId}` + ); + existing.startupAbortController.abort(); + } + await stopAgent(request.agentId).catch(err => { console.warn( `${MANAGER_LOG} startAgent: failed to stop existing session for ${request.agentId}`, @@ -555,6 +565,7 @@ export async function startAgent( } const now = new Date().toISOString(); + const startupAbortController = new AbortController(); const agent: ManagedAgent = { agentId: request.agentId, rigId: request.rigId, @@ -579,15 +590,22 @@ export async function startAgent( completionCallbackUrl: request.envVars?.GASTOWN_COMPLETION_CALLBACK_URL ?? null, model: request.model ?? null, startupEnv: env, + startupAbortController, }; agents.set(request.agentId, agent); + const { signal } = startupAbortController; let sessionCounted = false; try { // 1. Ensure SDK server is running for this workdir const { client, port } = await ensureSDKServer(workdir, env); agent.serverPort = port; + // Check if startup was cancelled while waiting for the SDK server + if (signal.aborted) { + throw new StartupAbortedError(request.agentId); + } + // Track session count on the SDK instance const instance = sdkInstances.get(workdir); if (instance) { @@ -597,6 +615,13 @@ export async function startAgent( // 2. Create a session const sessionResult = await client.session.create({ body: {} }); + + // Check if startup was cancelled while creating the session — this is + // the critical window where an orphaned session would leak. + if (signal.aborted) { + throw new StartupAbortedError(request.agentId); + } + const rawSession: unknown = sessionResult.data ?? sessionResult; const parsed = SessionResponse.safeParse(rawSession); if (!parsed.success) { @@ -622,6 +647,11 @@ export async function startAgent( modelParam = { providerID: 'kilo', modelID: request.model }; } + // Final abort check before sending the prompt + if (signal.aborted) { + throw new StartupAbortedError(request.agentId); + } + await client.session.prompt({ path: { id: sessionId }, body: { @@ -634,6 +664,7 @@ export async function startAgent( if (agent.status === 'starting') { agent.status = 'running'; } + agent.startupAbortController = null; agent.messageCount = 1; log.info('agent.start', { @@ -646,7 +677,28 @@ export async function startAgent( return agent; } catch (err) { + // On abort, clean up silently — the new startAgent invocation will + // proceed with a fresh entry. + if (err instanceof StartupAbortedError) { + console.log( + `${MANAGER_LOG} startAgent: startup aborted for ${request.agentId}, cleaning up` + ); + if (sessionCounted) { + const instance = sdkInstances.get(workdir); + if (instance) { + instance.sessionCount--; + if (instance.sessionCount <= 0) { + instance.server.close(); + sdkInstances.delete(workdir); + } + } + } + agents.delete(request.agentId); + throw err; + } + agent.status = 'failed'; + agent.startupAbortController = null; agent.exitReason = err instanceof Error ? err.message : String(err); if (sessionCounted) { const instance = sdkInstances.get(workdir); @@ -656,6 +708,18 @@ export async function startAgent( } } +/** + * Thrown when a startup sequence is cancelled via AbortController. + * Distinct from other errors so the catch block can clean up without + * marking the agent as failed (a new startup is taking over). + */ +class StartupAbortedError extends Error { + constructor(agentId: string) { + super(`Startup aborted for agent ${agentId}`); + this.name = 'StartupAbortedError'; + } +} + /** * Stop an agent by aborting its session. */ @@ -664,6 +728,13 @@ export async function stopAgent(agentId: string): Promise { if (!agent) throw new Error(`Agent ${agentId} not found`); if (agent.status !== 'running' && agent.status !== 'starting') return; + // If still starting, abort the in-flight startup so session.create() + // doesn't produce an orphaned session after we return. + if (agent.startupAbortController) { + agent.startupAbortController.abort(); + agent.startupAbortController = null; + } + agent.status = 'stopping'; // Cancel any pending idle timer diff --git a/cloudflare-gastown/container/src/types.ts b/cloudflare-gastown/container/src/types.ts index da458e349..fb24d608e 100644 --- a/cloudflare-gastown/container/src/types.ts +++ b/cloudflare-gastown/container/src/types.ts @@ -133,6 +133,10 @@ export type ManagedAgent = { model: string | null; /** Full env dict from buildAgentEnv, stored so model hot-swap can replay it. */ startupEnv: Record; + /** AbortController for the in-flight startup sequence. Aborted when a + * restart is requested while the agent is still in 'starting' status, + * preventing orphaned sessions from leaking. */ + startupAbortController: AbortController | null; }; export type AgentStatusResponse = { From d67b8be837a4f993ee14b2fe4a7258934d15976e Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Wed, 1 Apr 2026 04:27:56 +0000 Subject: [PATCH 4/7] fix(gastown): prevent MR beads stuck in_progress when github_token missing (#1632) - Fall back to github_cli_pat then platform integration token in checkPRStatus - Track consecutive null poll results; fail MR bead after 10 nulls - Rate-limit PR polling to once per minute per MR bead via last_poll_at metadata - Store failureReason and failureMessage in bead metadata on permanent failure --- cloudflare-gastown/src/dos/Town.do.ts | 19 +++- cloudflare-gastown/src/dos/town/actions.ts | 90 +++++++++++++++++-- cloudflare-gastown/src/dos/town/reconciler.ts | 23 +++-- 3 files changed, 117 insertions(+), 15 deletions(-) diff --git a/cloudflare-gastown/src/dos/Town.do.ts b/cloudflare-gastown/src/dos/Town.do.ts index b3eeaa158..bc9268bd3 100644 --- a/cloudflare-gastown/src/dos/Town.do.ts +++ b/cloudflare-gastown/src/dos/Town.do.ts @@ -3498,9 +3498,24 @@ export class TownDO extends DurableObject { const ghMatch = prUrl.match(/^https:\/\/github\.com\/([^/]+)\/([^/]+)\/pull\/(\d+)/); if (ghMatch) { const [, owner, repo, numberStr] = ghMatch; - const token = townConfig.git_auth.github_token; + // Fix 1 & 2: Token fallback chain — github_token → github_cli_pat → platform integration + let token = townConfig.git_auth.github_token ?? townConfig.github_cli_pat; if (!token) { - console.warn(`${TOWN_LOG} checkPRStatus: no github_token configured, cannot poll ${prUrl}`); + // Try resolving from GitHub App installation as final fallback + const integrationId = townConfig.git_auth.platform_integration_id; + if (integrationId && this.env.GIT_TOKEN_SERVICE) { + try { + token = await this.env.GIT_TOKEN_SERVICE.getToken(integrationId); + } catch (err) { + console.warn( + `${TOWN_LOG} checkPRStatus: platform integration token lookup failed for ${integrationId}`, + err + ); + } + } + } + if (!token) { + console.warn(`${TOWN_LOG} checkPRStatus: no github token available, cannot poll ${prUrl}`); return null; } diff --git a/cloudflare-gastown/src/dos/town/actions.ts b/cloudflare-gastown/src/dos/town/actions.ts index 5f799ec90..3c4f75e6f 100644 --- a/cloudflare-gastown/src/dos/town/actions.ts +++ b/cloudflare-gastown/src/dos/town/actions.ts @@ -257,6 +257,12 @@ export type ApplyActionContext = { const LOG = '[actions]'; +/** Fail MR bead after this many consecutive null poll results (#1632). */ +const PR_POLL_NULL_THRESHOLD = 10; + +/** Minimum interval between PR polls per MR bead (ms) (#1632). */ +export const PR_POLL_INTERVAL_MS = 60_000; // 1 minute + function now(): string { return new Date().toISOString(); } @@ -545,30 +551,100 @@ export function applyAction(ctx: ApplyActionContext, action: Action): (() => Pro } case 'poll_pr': { - // Touch updated_at synchronously so the bead doesn't look stale - // to Rule 4 (orphaned PR review, 30 min timeout). Without this, - // active polling keeps the PR alive but updated_at was set once - // at PR creation and never refreshed, causing a false "orphaned" - // failure after 30 minutes. + // Touch updated_at and record last_poll_at synchronously so the bead + // doesn't look stale to Rule 4 (orphaned PR review, 30 min timeout). + // Without this, active polling keeps the PR alive but updated_at was + // set once at PR creation and never refreshed, causing a false + // "orphaned" failure after 30 minutes. + const timestamp = now(); query( sql, /* sql */ ` UPDATE ${beads} - SET ${beads.columns.updated_at} = ? + SET ${beads.columns.updated_at} = ?, + ${beads.columns.metadata} = json_set( + COALESCE(${beads.columns.metadata}, '{}'), + '$.last_poll_at', ? + ) WHERE ${beads.bead_id} = ? `, - [now(), action.bead_id] + [timestamp, timestamp, action.bead_id] ); return async () => { try { const status = await ctx.checkPRStatus(action.pr_url); if (status && status !== 'open') { + // Successful non-open status — reset null counter and emit event + query( + sql, + /* sql */ ` + UPDATE ${beads} + SET ${beads.columns.metadata} = json_set( + COALESCE(${beads.columns.metadata}, '{}'), + '$.poll_null_count', 0 + ) + WHERE ${beads.bead_id} = ? + `, + [action.bead_id] + ); ctx.insertEvent('pr_status_changed', { bead_id: action.bead_id, payload: { pr_url: action.pr_url, pr_state: status }, }); + } else if (status === null) { + // Null result — increment consecutive null counter + query( + sql, + /* sql */ ` + UPDATE ${beads} + SET ${beads.columns.metadata} = json_set( + COALESCE(${beads.columns.metadata}, '{}'), + '$.poll_null_count', + COALESCE( + json_extract(${beads.columns.metadata}, '$.poll_null_count'), + 0 + ) + 1 + ) + WHERE ${beads.bead_id} = ? + `, + [action.bead_id] + ); + // Read back the updated count + const rows = [ + ...query( + sql, + /* sql */ ` + SELECT json_extract(${beads.columns.metadata}, '$.poll_null_count') AS null_count + FROM ${beads} + WHERE ${beads.bead_id} = ? + `, + [action.bead_id] + ), + ]; + const nullCount = Number(rows[0]?.null_count ?? 0); + if (nullCount >= PR_POLL_NULL_THRESHOLD) { + console.warn( + `${LOG} poll_pr: ${nullCount} consecutive null results for bead=${action.bead_id}, failing` + ); + beadOps.updateBeadStatus(sql, action.bead_id, 'failed', 'system'); + // Store failure reason in metadata + query( + sql, + /* sql */ ` + UPDATE ${beads} + SET ${beads.columns.metadata} = json_set( + COALESCE(${beads.columns.metadata}, '{}'), + '$.failureReason', 'no_github_token', + '$.failureMessage', 'Cannot poll PR status — no GitHub token configured. Please add a token in town settings.' + ) + WHERE ${beads.bead_id} = ? + `, + [action.bead_id] + ); + } } + // status === 'open' — no action needed, poll again next tick } catch (err) { console.warn(`${LOG} poll_pr failed: bead=${action.bead_id} url=${action.pr_url}`, err); } diff --git a/cloudflare-gastown/src/dos/town/reconciler.ts b/cloudflare-gastown/src/dos/town/reconciler.ts index e9fef8d21..22c47c6d8 100644 --- a/cloudflare-gastown/src/dos/town/reconciler.ts +++ b/cloudflare-gastown/src/dos/town/reconciler.ts @@ -30,6 +30,7 @@ import * as reviewQueue from './review-queue'; import * as agents from './agents'; import * as beadOps from './beads'; import { getRig } from './rigs'; +import { PR_POLL_INTERVAL_MS } from './actions'; import type { Action } from './actions'; import type { TownEventRecord } from '../../db/tables/town-events.table'; @@ -98,6 +99,7 @@ const MrBeadRow = BeadRecord.pick({ rig_id: true, updated_at: true, assignee_agent_bead_id: true, + metadata: true, }).extend({ // Joined from review_metadata pr_url: ReviewMetadataRecord.shape.pr_url, @@ -753,6 +755,7 @@ export function reconcileReviewQueue(sql: SqlStorage): Action[] { /* sql */ ` SELECT b.${beads.columns.bead_id}, b.${beads.columns.status}, b.${beads.columns.rig_id}, b.${beads.columns.updated_at}, + b.${beads.columns.metadata}, rm.${review_metadata.columns.pr_url}, b.${beads.columns.assignee_agent_bead_id} FROM ${beads} b @@ -765,13 +768,21 @@ export function reconcileReviewQueue(sql: SqlStorage): Action[] { ]); for (const mr of mrBeads) { - // Rule 1: PR-strategy MR beads in_progress need polling + // Rule 1: PR-strategy MR beads in_progress need polling. + // Rate-limit: skip if polled less than PR_POLL_INTERVAL_MS ago (#1632). if (mr.status === 'in_progress' && mr.pr_url) { - actions.push({ - type: 'poll_pr', - bead_id: mr.bead_id, - pr_url: mr.pr_url, - }); + const lastPollAt = mr.metadata?.last_poll_at; + const msSinceLastPoll = typeof lastPollAt === 'string' + ? Date.now() - new Date(lastPollAt).getTime() + : Infinity; + + if (msSinceLastPoll >= PR_POLL_INTERVAL_MS) { + actions.push({ + type: 'poll_pr', + bead_id: mr.bead_id, + pr_url: mr.pr_url, + }); + } } // Rule 2: Stuck MR beads in_progress with no PR, no working agent, stale >30min From 16fa42f78fb2bf02a488b191163505faaa364393 Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Wed, 1 Apr 2026 04:37:40 +0000 Subject: [PATCH 5/7] fix: format and lint errors in reconciler.ts - Add explicit 'unknown' type annotation to lastPollAt to fix no-unsafe-assignment lint error - Auto-format ternary expression per oxfmt rules --- cloudflare-gastown/src/dos/town/reconciler.ts | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/cloudflare-gastown/src/dos/town/reconciler.ts b/cloudflare-gastown/src/dos/town/reconciler.ts index 22c47c6d8..b76333788 100644 --- a/cloudflare-gastown/src/dos/town/reconciler.ts +++ b/cloudflare-gastown/src/dos/town/reconciler.ts @@ -771,10 +771,9 @@ export function reconcileReviewQueue(sql: SqlStorage): Action[] { // Rule 1: PR-strategy MR beads in_progress need polling. // Rate-limit: skip if polled less than PR_POLL_INTERVAL_MS ago (#1632). if (mr.status === 'in_progress' && mr.pr_url) { - const lastPollAt = mr.metadata?.last_poll_at; - const msSinceLastPoll = typeof lastPollAt === 'string' - ? Date.now() - new Date(lastPollAt).getTime() - : Infinity; + const lastPollAt: unknown = mr.metadata?.last_poll_at; + const msSinceLastPoll = + typeof lastPollAt === 'string' ? Date.now() - new Date(lastPollAt).getTime() : Infinity; if (msSinceLastPoll >= PR_POLL_INTERVAL_MS) { actions.push({ From ab7d0ca462bffa896d5fe8f73a45995400a74ab8 Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Wed, 1 Apr 2026 21:53:47 +0000 Subject: [PATCH 6/7] =?UTF-8?q?fix(gastown):=20address=20review=20comments?= =?UTF-8?q?=20=E2=80=94=20reset=20poll=5Fnull=5Fcount=20on=20all=20non-nul?= =?UTF-8?q?l=20polls,=20abort=20orphaned=20session=20on=20startup=20abort,?= =?UTF-8?q?=20guard=20agents.delete=20with=20identity=20check?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../container/src/process-manager.ts | 15 ++++++++++++++- cloudflare-gastown/src/dos/town/actions.ts | 16 +++++++++------- 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/cloudflare-gastown/container/src/process-manager.ts b/cloudflare-gastown/container/src/process-manager.ts index 912377c39..e4cd2fa66 100644 --- a/cloudflare-gastown/container/src/process-manager.ts +++ b/cloudflare-gastown/container/src/process-manager.ts @@ -686,6 +686,17 @@ export async function startAgent( if (sessionCounted) { const instance = sdkInstances.get(workdir); if (instance) { + // Abort the orphaned session if one was created before the abort + if (agent.sessionId) { + try { + await instance.client.session.abort({ path: { id: agent.sessionId } }); + } catch (abortErr) { + console.error( + `${MANAGER_LOG} startAgent: failed to abort orphaned session ${agent.sessionId}:`, + abortErr + ); + } + } instance.sessionCount--; if (instance.sessionCount <= 0) { instance.server.close(); @@ -693,7 +704,9 @@ export async function startAgent( } } } - agents.delete(request.agentId); + if (agents.get(request.agentId) === agent) { + agents.delete(request.agentId); + } throw err; } diff --git a/cloudflare-gastown/src/dos/town/actions.ts b/cloudflare-gastown/src/dos/town/actions.ts index 3c4f75e6f..39d6ae493 100644 --- a/cloudflare-gastown/src/dos/town/actions.ts +++ b/cloudflare-gastown/src/dos/town/actions.ts @@ -574,8 +574,8 @@ export function applyAction(ctx: ApplyActionContext, action: Action): (() => Pro return async () => { try { const status = await ctx.checkPRStatus(action.pr_url); - if (status && status !== 'open') { - // Successful non-open status — reset null counter and emit event + if (status !== null) { + // Any non-null result resets the consecutive null counter query( sql, /* sql */ ` @@ -588,11 +588,13 @@ export function applyAction(ctx: ApplyActionContext, action: Action): (() => Pro `, [action.bead_id] ); - ctx.insertEvent('pr_status_changed', { - bead_id: action.bead_id, - payload: { pr_url: action.pr_url, pr_state: status }, - }); - } else if (status === null) { + if (status !== 'open') { + ctx.insertEvent('pr_status_changed', { + bead_id: action.bead_id, + payload: { pr_url: action.pr_url, pr_state: status }, + }); + } + } else { // Null result — increment consecutive null counter query( sql, From 83337e4d36fa70ef5dde7ce870d92dcf09f015b8 Mon Sep 17 00:00:00 2001 From: John Fawcett Date: Thu, 2 Apr 2026 14:53:22 +0000 Subject: [PATCH 7/7] fix(gastown): store session ID before abort check to prevent orphaned session leak After session.create() resolves, parse and store the session ID on the agent object BEFORE checking signal.aborted. This ensures the catch block has agent.sessionId available to call session.abort(), closing the race window where an abort during session.create() could leak an orphaned session. Also fixes format-check CI by running oxfmt. --- .../container/src/process-manager.ts | 23 +++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/cloudflare-gastown/container/src/process-manager.ts b/cloudflare-gastown/container/src/process-manager.ts index e4cd2fa66..8c027d99b 100644 --- a/cloudflare-gastown/container/src/process-manager.ts +++ b/cloudflare-gastown/container/src/process-manager.ts @@ -550,9 +550,7 @@ export async function startAgent( // If the agent is still starting, abort the in-flight startup to prevent // an orphaned session from being created after stopAgent returns. if (existing.status === 'starting' && existing.startupAbortController) { - console.log( - `${MANAGER_LOG} startAgent: aborting in-flight startup for ${request.agentId}` - ); + console.log(`${MANAGER_LOG} startAgent: aborting in-flight startup for ${request.agentId}`); existing.startupAbortController.abort(); } @@ -616,12 +614,9 @@ export async function startAgent( // 2. Create a session const sessionResult = await client.session.create({ body: {} }); - // Check if startup was cancelled while creating the session — this is - // the critical window where an orphaned session would leak. - if (signal.aborted) { - throw new StartupAbortedError(request.agentId); - } - + // Parse and store the session ID immediately so the catch block can + // abort an orphaned session if startupAbortController fires during + // the await above. const rawSession: unknown = sessionResult.data ?? sessionResult; const parsed = SessionResponse.safeParse(rawSession); if (!parsed.success) { @@ -635,6 +630,12 @@ export async function startAgent( const sessionId = parsed.data.id; agent.sessionId = sessionId; + // Now check if startup was cancelled while creating the session. + // agent.sessionId is already set, so the catch block will abort it. + if (signal.aborted) { + throw new StartupAbortedError(request.agentId); + } + // 3. Subscribe to events (async, runs in background) void subscribeToEvents(client, agent, request); @@ -680,9 +681,7 @@ export async function startAgent( // On abort, clean up silently — the new startAgent invocation will // proceed with a fresh entry. if (err instanceof StartupAbortedError) { - console.log( - `${MANAGER_LOG} startAgent: startup aborted for ${request.agentId}, cleaning up` - ); + console.log(`${MANAGER_LOG} startAgent: startup aborted for ${request.agentId}, cleaning up`); if (sessionCounted) { const instance = sdkInstances.get(workdir); if (instance) {