diff --git a/cloudflare-gastown/container/src/control-server.ts b/cloudflare-gastown/container/src/control-server.ts index 70ba58c62..15b481355 100644 --- a/cloudflare-gastown/container/src/control-server.ts +++ b/cloudflare-gastown/container/src/control-server.ts @@ -142,6 +142,11 @@ app.post('/agents/start', async c => { return c.json({ error: 'Invalid request body', issues: parsed.error.issues }, 400); } + // Persist the organization ID as a standalone env var so it survives + // config rebuilds (e.g. model hot-swap). The env var is the primary + // source of truth; KILO_CONFIG_CONTENT extraction is the fallback. + process.env.GASTOWN_ORGANIZATION_ID = parsed.data.organizationId ?? ''; + console.log( `[control-server] /agents/start: role=${parsed.data.role} name=${parsed.data.name} rigId=${parsed.data.rigId} agentId=${parsed.data.agentId}` ); @@ -211,6 +216,11 @@ app.patch('/agents/:agentId/model', async c => { return c.json({ error: 'Invalid request body', issues: parsed.error.issues }, 400); } + // Update org billing context from the request body if provided. + if (parsed.data.organizationId) { + process.env.GASTOWN_ORGANIZATION_ID = parsed.data.organizationId; + } + // Sync config-derived env vars from X-Town-Config into process.env so // the SDK server restart picks up fresh tokens and git identity. // The middleware already parsed the header into lastKnownTownConfig. @@ -252,6 +262,12 @@ app.patch('/agents/:agentId/model', async c => { } else { delete process.env.GASTOWN_DISABLE_AI_COAUTHOR; } + // organization_id — keep the standalone env var in sync with the + // town config so org billing context is never lost. + const orgId = cfg.organization_id; + if (typeof orgId === 'string' && orgId) { + process.env.GASTOWN_ORGANIZATION_ID = orgId; + } } await updateAgentModel( diff --git a/cloudflare-gastown/container/src/process-manager.ts b/cloudflare-gastown/container/src/process-manager.ts index f0008e320..8c027d99b 100644 --- a/cloudflare-gastown/container/src/process-manager.ts +++ b/cloudflare-gastown/container/src/process-manager.ts @@ -546,6 +546,14 @@ export async function startAgent( console.log( `${MANAGER_LOG} startAgent: stopping existing session for ${request.agentId} (status=${existing.status})` ); + + // If the agent is still starting, abort the in-flight startup to prevent + // an orphaned session from being created after stopAgent returns. + if (existing.status === 'starting' && existing.startupAbortController) { + console.log(`${MANAGER_LOG} startAgent: aborting in-flight startup for ${request.agentId}`); + existing.startupAbortController.abort(); + } + await stopAgent(request.agentId).catch(err => { console.warn( `${MANAGER_LOG} startAgent: failed to stop existing session for ${request.agentId}`, @@ -555,6 +563,7 @@ export async function startAgent( } const now = new Date().toISOString(); + const startupAbortController = new AbortController(); const agent: ManagedAgent = { agentId: request.agentId, rigId: request.rigId, @@ -579,15 +588,22 @@ export async function startAgent( completionCallbackUrl: request.envVars?.GASTOWN_COMPLETION_CALLBACK_URL ?? null, model: request.model ?? null, startupEnv: env, + startupAbortController, }; agents.set(request.agentId, agent); + const { signal } = startupAbortController; let sessionCounted = false; try { // 1. Ensure SDK server is running for this workdir const { client, port } = await ensureSDKServer(workdir, env); agent.serverPort = port; + // Check if startup was cancelled while waiting for the SDK server + if (signal.aborted) { + throw new StartupAbortedError(request.agentId); + } + // Track session count on the SDK instance const instance = sdkInstances.get(workdir); if (instance) { @@ -597,6 +613,10 @@ export async function startAgent( // 2. Create a session const sessionResult = await client.session.create({ body: {} }); + + // Parse and store the session ID immediately so the catch block can + // abort an orphaned session if startupAbortController fires during + // the await above. const rawSession: unknown = sessionResult.data ?? sessionResult; const parsed = SessionResponse.safeParse(rawSession); if (!parsed.success) { @@ -610,6 +630,12 @@ export async function startAgent( const sessionId = parsed.data.id; agent.sessionId = sessionId; + // Now check if startup was cancelled while creating the session. + // agent.sessionId is already set, so the catch block will abort it. + if (signal.aborted) { + throw new StartupAbortedError(request.agentId); + } + // 3. Subscribe to events (async, runs in background) void subscribeToEvents(client, agent, request); @@ -622,6 +648,11 @@ export async function startAgent( modelParam = { providerID: 'kilo', modelID: request.model }; } + // Final abort check before sending the prompt + if (signal.aborted) { + throw new StartupAbortedError(request.agentId); + } + await client.session.prompt({ path: { id: sessionId }, body: { @@ -634,6 +665,7 @@ export async function startAgent( if (agent.status === 'starting') { agent.status = 'running'; } + agent.startupAbortController = null; agent.messageCount = 1; log.info('agent.start', { @@ -646,7 +678,39 @@ export async function startAgent( return agent; } catch (err) { + // On abort, clean up silently — the new startAgent invocation will + // proceed with a fresh entry. + if (err instanceof StartupAbortedError) { + console.log(`${MANAGER_LOG} startAgent: startup aborted for ${request.agentId}, cleaning up`); + if (sessionCounted) { + const instance = sdkInstances.get(workdir); + if (instance) { + // Abort the orphaned session if one was created before the abort + if (agent.sessionId) { + try { + await instance.client.session.abort({ path: { id: agent.sessionId } }); + } catch (abortErr) { + console.error( + `${MANAGER_LOG} startAgent: failed to abort orphaned session ${agent.sessionId}:`, + abortErr + ); + } + } + instance.sessionCount--; + if (instance.sessionCount <= 0) { + instance.server.close(); + sdkInstances.delete(workdir); + } + } + } + if (agents.get(request.agentId) === agent) { + agents.delete(request.agentId); + } + throw err; + } + agent.status = 'failed'; + agent.startupAbortController = null; agent.exitReason = err instanceof Error ? err.message : String(err); if (sessionCounted) { const instance = sdkInstances.get(workdir); @@ -656,6 +720,18 @@ export async function startAgent( } } +/** + * Thrown when a startup sequence is cancelled via AbortController. + * Distinct from other errors so the catch block can clean up without + * marking the agent as failed (a new startup is taking over). + */ +class StartupAbortedError extends Error { + constructor(agentId: string) { + super(`Startup aborted for agent ${agentId}`); + this.name = 'StartupAbortedError'; + } +} + /** * Stop an agent by aborting its session. */ @@ -664,6 +740,13 @@ export async function stopAgent(agentId: string): Promise { if (!agent) throw new Error(`Agent ${agentId} not found`); if (agent.status !== 'running' && agent.status !== 'starting') return; + // If still starting, abort the in-flight startup so session.create() + // doesn't produce an orphaned session after we return. + if (agent.startupAbortController) { + agent.startupAbortController.abort(); + agent.startupAbortController = null; + } + agent.status = 'stopping'; // Cancel any pending idle timer @@ -750,6 +833,12 @@ export async function sendMessage(agentId: string, prompt: string): Promise; @@ -133,6 +135,10 @@ export type ManagedAgent = { model: string | null; /** Full env dict from buildAgentEnv, stored so model hot-swap can replay it. */ startupEnv: Record; + /** AbortController for the in-flight startup sequence. Aborted when a + * restart is requested while the agent is still in 'starting' status, + * preventing orphaned sessions from leaking. */ + startupAbortController: AbortController | null; }; export type AgentStatusResponse = { diff --git a/cloudflare-gastown/src/dos/Town.do.ts b/cloudflare-gastown/src/dos/Town.do.ts index b3eeaa158..a8e998916 100644 --- a/cloudflare-gastown/src/dos/Town.do.ts +++ b/cloudflare-gastown/src/dos/Town.do.ts @@ -2067,6 +2067,11 @@ export class TownDO extends DurableObject { // before restarting the SDK server (tokens, git identity, etc.). const containerConfig = await config.buildContainerConfig(this.ctx.storage, this.env); + // Resolve townConfig to thread the organization_id into the request body + // (belt-and-suspenders: ensures org billing survives even if X-Town-Config + // header parsing fails on the container side). + const townConfig = await config.getTownConfig(this.ctx.storage); + const updated = await dispatch.updateAgentModelInContainer( this.env, townId, @@ -2074,7 +2079,8 @@ export class TownDO extends DurableObject { model, smallModel, conversationHistory || undefined, - containerConfig + containerConfig, + townConfig.organization_id ); if (updated) { console.log( @@ -3498,9 +3504,24 @@ export class TownDO extends DurableObject { const ghMatch = prUrl.match(/^https:\/\/github\.com\/([^/]+)\/([^/]+)\/pull\/(\d+)/); if (ghMatch) { const [, owner, repo, numberStr] = ghMatch; - const token = townConfig.git_auth.github_token; + // Fix 1 & 2: Token fallback chain — github_token → github_cli_pat → platform integration + let token = townConfig.git_auth.github_token ?? townConfig.github_cli_pat; + if (!token) { + // Try resolving from GitHub App installation as final fallback + const integrationId = townConfig.git_auth.platform_integration_id; + if (integrationId && this.env.GIT_TOKEN_SERVICE) { + try { + token = await this.env.GIT_TOKEN_SERVICE.getToken(integrationId); + } catch (err) { + console.warn( + `${TOWN_LOG} checkPRStatus: platform integration token lookup failed for ${integrationId}`, + err + ); + } + } + } if (!token) { - console.warn(`${TOWN_LOG} checkPRStatus: no github_token configured, cannot poll ${prUrl}`); + console.warn(`${TOWN_LOG} checkPRStatus: no github token available, cannot poll ${prUrl}`); return null; } diff --git a/cloudflare-gastown/src/dos/town/actions.ts b/cloudflare-gastown/src/dos/town/actions.ts index 5f799ec90..39d6ae493 100644 --- a/cloudflare-gastown/src/dos/town/actions.ts +++ b/cloudflare-gastown/src/dos/town/actions.ts @@ -257,6 +257,12 @@ export type ApplyActionContext = { const LOG = '[actions]'; +/** Fail MR bead after this many consecutive null poll results (#1632). */ +const PR_POLL_NULL_THRESHOLD = 10; + +/** Minimum interval between PR polls per MR bead (ms) (#1632). */ +export const PR_POLL_INTERVAL_MS = 60_000; // 1 minute + function now(): string { return new Date().toISOString(); } @@ -545,30 +551,102 @@ export function applyAction(ctx: ApplyActionContext, action: Action): (() => Pro } case 'poll_pr': { - // Touch updated_at synchronously so the bead doesn't look stale - // to Rule 4 (orphaned PR review, 30 min timeout). Without this, - // active polling keeps the PR alive but updated_at was set once - // at PR creation and never refreshed, causing a false "orphaned" - // failure after 30 minutes. + // Touch updated_at and record last_poll_at synchronously so the bead + // doesn't look stale to Rule 4 (orphaned PR review, 30 min timeout). + // Without this, active polling keeps the PR alive but updated_at was + // set once at PR creation and never refreshed, causing a false + // "orphaned" failure after 30 minutes. + const timestamp = now(); query( sql, /* sql */ ` UPDATE ${beads} - SET ${beads.columns.updated_at} = ? + SET ${beads.columns.updated_at} = ?, + ${beads.columns.metadata} = json_set( + COALESCE(${beads.columns.metadata}, '{}'), + '$.last_poll_at', ? + ) WHERE ${beads.bead_id} = ? `, - [now(), action.bead_id] + [timestamp, timestamp, action.bead_id] ); return async () => { try { const status = await ctx.checkPRStatus(action.pr_url); - if (status && status !== 'open') { - ctx.insertEvent('pr_status_changed', { - bead_id: action.bead_id, - payload: { pr_url: action.pr_url, pr_state: status }, - }); + if (status !== null) { + // Any non-null result resets the consecutive null counter + query( + sql, + /* sql */ ` + UPDATE ${beads} + SET ${beads.columns.metadata} = json_set( + COALESCE(${beads.columns.metadata}, '{}'), + '$.poll_null_count', 0 + ) + WHERE ${beads.bead_id} = ? + `, + [action.bead_id] + ); + if (status !== 'open') { + ctx.insertEvent('pr_status_changed', { + bead_id: action.bead_id, + payload: { pr_url: action.pr_url, pr_state: status }, + }); + } + } else { + // Null result — increment consecutive null counter + query( + sql, + /* sql */ ` + UPDATE ${beads} + SET ${beads.columns.metadata} = json_set( + COALESCE(${beads.columns.metadata}, '{}'), + '$.poll_null_count', + COALESCE( + json_extract(${beads.columns.metadata}, '$.poll_null_count'), + 0 + ) + 1 + ) + WHERE ${beads.bead_id} = ? + `, + [action.bead_id] + ); + // Read back the updated count + const rows = [ + ...query( + sql, + /* sql */ ` + SELECT json_extract(${beads.columns.metadata}, '$.poll_null_count') AS null_count + FROM ${beads} + WHERE ${beads.bead_id} = ? + `, + [action.bead_id] + ), + ]; + const nullCount = Number(rows[0]?.null_count ?? 0); + if (nullCount >= PR_POLL_NULL_THRESHOLD) { + console.warn( + `${LOG} poll_pr: ${nullCount} consecutive null results for bead=${action.bead_id}, failing` + ); + beadOps.updateBeadStatus(sql, action.bead_id, 'failed', 'system'); + // Store failure reason in metadata + query( + sql, + /* sql */ ` + UPDATE ${beads} + SET ${beads.columns.metadata} = json_set( + COALESCE(${beads.columns.metadata}, '{}'), + '$.failureReason', 'no_github_token', + '$.failureMessage', 'Cannot poll PR status — no GitHub token configured. Please add a token in town settings.' + ) + WHERE ${beads.bead_id} = ? + `, + [action.bead_id] + ); + } } + // status === 'open' — no action needed, poll again next tick } catch (err) { console.warn(`${LOG} poll_pr failed: bead=${action.bead_id} url=${action.pr_url}`, err); } diff --git a/cloudflare-gastown/src/dos/town/config.ts b/cloudflare-gastown/src/dos/town/config.ts index 0d064ed6b..05a90d827 100644 --- a/cloudflare-gastown/src/dos/town/config.ts +++ b/cloudflare-gastown/src/dos/town/config.ts @@ -150,5 +150,6 @@ export async function buildContainerConfig( disable_ai_coauthor: config.disable_ai_coauthor, kilo_api_url: env.KILO_API_URL ?? '', gastown_api_url: env.GASTOWN_API_URL ?? '', + organization_id: config.organization_id, }; } diff --git a/cloudflare-gastown/src/dos/town/container-dispatch.ts b/cloudflare-gastown/src/dos/town/container-dispatch.ts index 4932e92a5..65d6266aa 100644 --- a/cloudflare-gastown/src/dos/town/container-dispatch.ts +++ b/cloudflare-gastown/src/dos/town/container-dispatch.ts @@ -676,7 +676,8 @@ export async function updateAgentModelInContainer( model: string, smallModel?: string, conversationHistory?: string, - containerConfig?: Record + containerConfig?: Record, + organizationId?: string ): Promise { try { const container = getTownContainerStub(env, townId); @@ -691,6 +692,7 @@ export async function updateAgentModelInContainer( model, ...(smallModel ? { smallModel } : {}), ...(conversationHistory ? { conversationHistory } : {}), + ...(organizationId ? { organizationId } : {}), }), }); return response.ok; diff --git a/cloudflare-gastown/src/dos/town/reconciler.ts b/cloudflare-gastown/src/dos/town/reconciler.ts index e9fef8d21..1e472b48b 100644 --- a/cloudflare-gastown/src/dos/town/reconciler.ts +++ b/cloudflare-gastown/src/dos/town/reconciler.ts @@ -30,6 +30,7 @@ import * as reviewQueue from './review-queue'; import * as agents from './agents'; import * as beadOps from './beads'; import { getRig } from './rigs'; +import { PR_POLL_INTERVAL_MS } from './actions'; import type { Action } from './actions'; import type { TownEventRecord } from '../../db/tables/town-events.table'; @@ -98,6 +99,7 @@ const MrBeadRow = BeadRecord.pick({ rig_id: true, updated_at: true, assignee_agent_bead_id: true, + metadata: true, }).extend({ // Joined from review_metadata pr_url: ReviewMetadataRecord.shape.pr_url, @@ -753,6 +755,7 @@ export function reconcileReviewQueue(sql: SqlStorage): Action[] { /* sql */ ` SELECT b.${beads.columns.bead_id}, b.${beads.columns.status}, b.${beads.columns.rig_id}, b.${beads.columns.updated_at}, + b.${beads.columns.metadata}, rm.${review_metadata.columns.pr_url}, b.${beads.columns.assignee_agent_bead_id} FROM ${beads} b @@ -765,13 +768,20 @@ export function reconcileReviewQueue(sql: SqlStorage): Action[] { ]); for (const mr of mrBeads) { - // Rule 1: PR-strategy MR beads in_progress need polling + // Rule 1: PR-strategy MR beads in_progress need polling. + // Rate-limit: skip if polled less than PR_POLL_INTERVAL_MS ago (#1632). if (mr.status === 'in_progress' && mr.pr_url) { - actions.push({ - type: 'poll_pr', - bead_id: mr.bead_id, - pr_url: mr.pr_url, - }); + const lastPollAt: unknown = mr.metadata?.last_poll_at; + const msSinceLastPoll = + typeof lastPollAt === 'string' ? Date.now() - new Date(lastPollAt).getTime() : Infinity; + + if (msSinceLastPoll >= PR_POLL_INTERVAL_MS) { + actions.push({ + type: 'poll_pr', + bead_id: mr.bead_id, + pr_url: mr.pr_url, + }); + } } // Rule 2: Stuck MR beads in_progress with no PR, no working agent, stale >30min @@ -1494,6 +1504,7 @@ export function checkInvariants(sql: SqlStorage): Violation[] { const violations: Violation[] = []; // Invariant 7: Working agents must have hooks + // Mayors are always 'working' and intentionally have no hook — exclude them. const unhookedWorkers = z .object({ bead_id: z.string() }) .array() @@ -1505,6 +1516,7 @@ export function checkInvariants(sql: SqlStorage): Violation[] { FROM ${agent_metadata} WHERE ${agent_metadata.status} = 'working' AND ${agent_metadata.current_hook_bead_id} IS NULL + AND ${agent_metadata.role} != 'mayor' `, [] ), @@ -1516,26 +1528,27 @@ export function checkInvariants(sql: SqlStorage): Violation[] { }); } - // Invariant 5: Convoy beads should not be in_progress - const inProgressConvoys = z - .object({ bead_id: z.string() }) + // Invariant 5: Convoy beads should not be in unexpected states. + // Valid transient states: open, in_progress, in_review, closed. + const badStateConvoys = z + .object({ bead_id: z.string(), status: z.string() }) .array() .parse([ ...query( sql, /* sql */ ` - SELECT ${beads.bead_id} + SELECT ${beads.bead_id}, ${beads.status} FROM ${beads} WHERE ${beads.type} = 'convoy' - AND ${beads.status} = 'in_progress' + AND ${beads.status} NOT IN ('open', 'in_progress', 'in_review', 'closed') `, [] ), ]); - for (const c of inProgressConvoys) { + for (const c of badStateConvoys) { violations.push({ invariant: 5, - message: `Convoy bead ${c.bead_id} is in_progress (should only be open or closed)`, + message: `Convoy bead ${c.bead_id} is in unexpected state '${c.status}'`, }); }