Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions cloudflare-gastown/container/src/control-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ app.post('/agents/start', async c => {
return c.json({ error: 'Invalid request body', issues: parsed.error.issues }, 400);
}

// Persist the organization ID as a standalone env var so it survives
// config rebuilds (e.g. model hot-swap). The env var is the primary
// source of truth; KILO_CONFIG_CONTENT extraction is the fallback.
process.env.GASTOWN_ORGANIZATION_ID = parsed.data.organizationId ?? '';

console.log(
`[control-server] /agents/start: role=${parsed.data.role} name=${parsed.data.name} rigId=${parsed.data.rigId} agentId=${parsed.data.agentId}`
);
Expand Down Expand Up @@ -211,6 +216,11 @@ app.patch('/agents/:agentId/model', async c => {
return c.json({ error: 'Invalid request body', issues: parsed.error.issues }, 400);
}

// Update org billing context from the request body if provided.
if (parsed.data.organizationId) {
process.env.GASTOWN_ORGANIZATION_ID = parsed.data.organizationId;
}

// Sync config-derived env vars from X-Town-Config into process.env so
// the SDK server restart picks up fresh tokens and git identity.
// The middleware already parsed the header into lastKnownTownConfig.
Expand Down Expand Up @@ -252,6 +262,12 @@ app.patch('/agents/:agentId/model', async c => {
} else {
delete process.env.GASTOWN_DISABLE_AI_COAUTHOR;
}
// organization_id — keep the standalone env var in sync with the
// town config so org billing context is never lost.
const orgId = cfg.organization_id;
if (typeof orgId === 'string' && orgId) {
process.env.GASTOWN_ORGANIZATION_ID = orgId;
}
}

await updateAgentModel(
Expand Down
89 changes: 89 additions & 0 deletions cloudflare-gastown/container/src/process-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,14 @@ export async function startAgent(
console.log(
`${MANAGER_LOG} startAgent: stopping existing session for ${request.agentId} (status=${existing.status})`
);

// If the agent is still starting, abort the in-flight startup to prevent
// an orphaned session from being created after stopAgent returns.
if (existing.status === 'starting' && existing.startupAbortController) {
console.log(`${MANAGER_LOG} startAgent: aborting in-flight startup for ${request.agentId}`);
existing.startupAbortController.abort();
}

await stopAgent(request.agentId).catch(err => {
console.warn(
`${MANAGER_LOG} startAgent: failed to stop existing session for ${request.agentId}`,
Expand All @@ -555,6 +563,7 @@ export async function startAgent(
}

const now = new Date().toISOString();
const startupAbortController = new AbortController();
const agent: ManagedAgent = {
agentId: request.agentId,
rigId: request.rigId,
Expand All @@ -579,15 +588,22 @@ export async function startAgent(
completionCallbackUrl: request.envVars?.GASTOWN_COMPLETION_CALLBACK_URL ?? null,
model: request.model ?? null,
startupEnv: env,
startupAbortController,
};
agents.set(request.agentId, agent);

const { signal } = startupAbortController;
let sessionCounted = false;
try {
// 1. Ensure SDK server is running for this workdir
const { client, port } = await ensureSDKServer(workdir, env);
agent.serverPort = port;

// Check if startup was cancelled while waiting for the SDK server
if (signal.aborted) {
throw new StartupAbortedError(request.agentId);
}

// Track session count on the SDK instance
const instance = sdkInstances.get(workdir);
if (instance) {
Expand All @@ -597,6 +613,10 @@ export async function startAgent(

// 2. Create a session
const sessionResult = await client.session.create({ body: {} });

// Parse and store the session ID immediately so the catch block can
// abort an orphaned session if startupAbortController fires during
// the await above.
const rawSession: unknown = sessionResult.data ?? sessionResult;
const parsed = SessionResponse.safeParse(rawSession);
if (!parsed.success) {
Expand All @@ -610,6 +630,12 @@ export async function startAgent(
const sessionId = parsed.data.id;
agent.sessionId = sessionId;

// Now check if startup was cancelled while creating the session.
// agent.sessionId is already set, so the catch block will abort it.
if (signal.aborted) {
throw new StartupAbortedError(request.agentId);
}

// 3. Subscribe to events (async, runs in background)
void subscribeToEvents(client, agent, request);

Expand All @@ -622,6 +648,11 @@ export async function startAgent(
modelParam = { providerID: 'kilo', modelID: request.model };
}

// Final abort check before sending the prompt
if (signal.aborted) {
throw new StartupAbortedError(request.agentId);
}

await client.session.prompt({
path: { id: sessionId },
body: {
Expand All @@ -634,6 +665,7 @@ export async function startAgent(
if (agent.status === 'starting') {
agent.status = 'running';
}
agent.startupAbortController = null;
agent.messageCount = 1;

log.info('agent.start', {
Expand All @@ -646,7 +678,39 @@ export async function startAgent(

return agent;
} catch (err) {
// On abort, clean up silently — the new startAgent invocation will
// proceed with a fresh entry.
if (err instanceof StartupAbortedError) {
console.log(`${MANAGER_LOG} startAgent: startup aborted for ${request.agentId}, cleaning up`);
if (sessionCounted) {
const instance = sdkInstances.get(workdir);
if (instance) {
// Abort the orphaned session if one was created before the abort
if (agent.sessionId) {
try {
await instance.client.session.abort({ path: { id: agent.sessionId } });
} catch (abortErr) {
console.error(
`${MANAGER_LOG} startAgent: failed to abort orphaned session ${agent.sessionId}:`,
abortErr
);
}
}
instance.sessionCount--;
if (instance.sessionCount <= 0) {
instance.server.close();
sdkInstances.delete(workdir);
}
}
}
if (agents.get(request.agentId) === agent) {
agents.delete(request.agentId);
}
throw err;
}

agent.status = 'failed';
agent.startupAbortController = null;
agent.exitReason = err instanceof Error ? err.message : String(err);
if (sessionCounted) {
const instance = sdkInstances.get(workdir);
Expand All @@ -656,6 +720,18 @@ export async function startAgent(
}
}

/**
* Thrown when a startup sequence is cancelled via AbortController.
* Distinct from other errors so the catch block can clean up without
* marking the agent as failed (a new startup is taking over).
*/
class StartupAbortedError extends Error {
constructor(agentId: string) {
super(`Startup aborted for agent ${agentId}`);
this.name = 'StartupAbortedError';
}
}

/**
* Stop an agent by aborting its session.
*/
Expand All @@ -664,6 +740,13 @@ export async function stopAgent(agentId: string): Promise<void> {
if (!agent) throw new Error(`Agent ${agentId} not found`);
if (agent.status !== 'running' && agent.status !== 'starting') return;

// If still starting, abort the in-flight startup so session.create()
// doesn't produce an orphaned session after we return.
if (agent.startupAbortController) {
agent.startupAbortController.abort();
agent.startupAbortController = null;
}

agent.status = 'stopping';

// Cancel any pending idle timer
Expand Down Expand Up @@ -750,6 +833,12 @@ export async function sendMessage(agentId: string, prompt: string): Promise<void
* by `buildKiloConfigContent` at agent startup.
*/
function extractOrganizationId(): string | undefined {
// Primary source: standalone env var set by control-server on /agents/start
// and updated on every PATCH /model via X-Town-Config.
const envOrgId = process.env.GASTOWN_ORGANIZATION_ID;
if (envOrgId) return envOrgId;

// Fallback: extract from KILO_CONFIG_CONTENT (legacy path)
const raw = process.env.KILO_CONFIG_CONTENT;
if (!raw) return undefined;
try {
Expand Down
6 changes: 6 additions & 0 deletions cloudflare-gastown/container/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ export const UpdateAgentModelRequest = z.object({
smallModel: z.string().optional(),
/** Pre-formatted conversation history to inject into the new session prompt. */
conversationHistory: z.string().optional(),
/** Organization ID — ensures org billing context is preserved across model changes. */
organizationId: z.string().optional(),
});
export type UpdateAgentModelRequest = z.infer<typeof UpdateAgentModelRequest>;

Expand Down Expand Up @@ -133,6 +135,10 @@ export type ManagedAgent = {
model: string | null;
/** Full env dict from buildAgentEnv, stored so model hot-swap can replay it. */
startupEnv: Record<string, string>;
/** AbortController for the in-flight startup sequence. Aborted when a
* restart is requested while the agent is still in 'starting' status,
* preventing orphaned sessions from leaking. */
startupAbortController: AbortController | null;
};

export type AgentStatusResponse = {
Expand Down
27 changes: 24 additions & 3 deletions cloudflare-gastown/src/dos/Town.do.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2067,14 +2067,20 @@ export class TownDO extends DurableObject<Env> {
// before restarting the SDK server (tokens, git identity, etc.).
const containerConfig = await config.buildContainerConfig(this.ctx.storage, this.env);

// Resolve townConfig to thread the organization_id into the request body
// (belt-and-suspenders: ensures org billing survives even if X-Town-Config
// header parsing fails on the container side).
const townConfig = await config.getTownConfig(this.ctx.storage);

const updated = await dispatch.updateAgentModelInContainer(
this.env,
townId,
mayor.id,
model,
smallModel,
conversationHistory || undefined,
containerConfig
containerConfig,
townConfig.organization_id
);
if (updated) {
console.log(
Expand Down Expand Up @@ -3498,9 +3504,24 @@ export class TownDO extends DurableObject<Env> {
const ghMatch = prUrl.match(/^https:\/\/github\.com\/([^/]+)\/([^/]+)\/pull\/(\d+)/);
if (ghMatch) {
const [, owner, repo, numberStr] = ghMatch;
const token = townConfig.git_auth.github_token;
// Fix 1 & 2: Token fallback chain — github_token → github_cli_pat → platform integration
let token = townConfig.git_auth.github_token ?? townConfig.github_cli_pat;
if (!token) {
// Try resolving from GitHub App installation as final fallback
const integrationId = townConfig.git_auth.platform_integration_id;
if (integrationId && this.env.GIT_TOKEN_SERVICE) {
try {
token = await this.env.GIT_TOKEN_SERVICE.getToken(integrationId);
} catch (err) {
console.warn(
`${TOWN_LOG} checkPRStatus: platform integration token lookup failed for ${integrationId}`,
err
);
}
}
}
if (!token) {
console.warn(`${TOWN_LOG} checkPRStatus: no github_token configured, cannot poll ${prUrl}`);
console.warn(`${TOWN_LOG} checkPRStatus: no github token available, cannot poll ${prUrl}`);
return null;
}

Expand Down
Loading
Loading