Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions cloudflare-gastown/container/src/control-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import {
activeServerCount,
getUptime,
stopAll,
drainAll,
getAgentEvents,
registerEventSink,
} from './process-manager';
import { startHeartbeat, stopHeartbeat } from './heartbeat';
import { startHeartbeat, stopHeartbeat, notifyContainerReady } from './heartbeat';
import { pushContext as pushDashboardContext } from './dashboard-context';
import { mergeBranch, setupRigBrowseWorktree } from './git-manager';
import {
Expand Down Expand Up @@ -92,6 +93,15 @@ app.use('*', async (c, next) => {

// GET /health
app.get('/health', c => {
// When the TownDO is draining, it passes the drain nonce and town
// ID via headers so idle containers (no running agents) can
// acknowledge readiness and clear the drain flag.
const drainNonce = c.req.header('X-Drain-Nonce');
const townId = c.req.header('X-Town-Id');
if (drainNonce && townId) {
void notifyContainerReady(townId, drainNonce);
}

const response: HealthResponse = {
status: 'ok',
agents: activeAgentCount(),
Expand Down Expand Up @@ -723,15 +733,26 @@ export function startControlServer(): void {
startHeartbeat(apiUrl, authToken);
}

// Handle graceful shutdown
// Handle graceful shutdown (immediate, no drain — used by SIGINT for dev)
const shutdown = async () => {
console.log('Shutting down control server...');
stopHeartbeat();
await stopAll();
process.exit(0);
};

process.on('SIGTERM', () => void shutdown());
process.on(
'SIGTERM',
() =>
void (async () => {
console.log('[control-server] SIGTERM received — starting graceful drain...');
stopHeartbeat();
await drainAll();
await stopAll();
process.exit(0);
})()
);

process.on('SIGINT', () => void shutdown());

// Track connected WebSocket clients with optional agent filter
Expand Down
63 changes: 63 additions & 0 deletions cloudflare-gastown/container/src/heartbeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ const HEARTBEAT_INTERVAL_MS = 30_000;
let heartbeatTimer: ReturnType<typeof setInterval> | null = null;
let gastownApiUrl: string | null = null;
let sessionToken: string | null = null;
/** Set once we've successfully acknowledged container-ready. */
let containerReadyAcknowledged = false;

/**
* Configure and start the heartbeat reporter.
Expand Down Expand Up @@ -38,6 +40,49 @@ export function stopHeartbeat(): void {
console.log('Heartbeat reporter stopped');
}

/**
* Notify the TownDO that the replacement container is ready.
* Exported so the health endpoint can trigger it when the TownDO
* passes the drain nonce via headers (handles idle containers that
* have no running agents and thus no per-agent heartbeats).
*/
export async function notifyContainerReady(townId: string, drainNonce: string): Promise<void> {
if (containerReadyAcknowledged) return;
await acknowledgeContainerReady(townId, drainNonce);
}

/**
* Call POST /container-ready to acknowledge that this is a fresh
* container replacing an evicted one. Clears the TownDO drain flag
* so the reconciler can resume dispatching.
*/
async function acknowledgeContainerReady(townId: string, drainNonce: string): Promise<void> {
const apiUrl = gastownApiUrl ?? process.env.GASTOWN_API_URL;
const currentToken = process.env.GASTOWN_CONTAINER_TOKEN ?? sessionToken;
if (!apiUrl || !currentToken) return;

try {
const response = await fetch(`${apiUrl}/api/towns/${townId}/container-ready`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${currentToken}`,
},
body: JSON.stringify({ nonce: drainNonce }),
});
if (response.ok) {
containerReadyAcknowledged = true;
console.log(`[heartbeat] container-ready acknowledged for town=${townId}`);
} else {
console.warn(
`[heartbeat] container-ready failed for town=${townId}: ${response.status} ${response.statusText}`
);
}
} catch (err) {
console.warn(`[heartbeat] container-ready error for town=${townId}:`, err);
}
}

async function sendHeartbeats(): Promise<void> {
// Prefer the live container token (refreshed via POST /refresh-token)
// over the token captured at startHeartbeat() time.
Expand All @@ -46,6 +91,12 @@ async function sendHeartbeats(): Promise<void> {

const active = listAgents().filter(a => a.status === 'running' || a.status === 'starting');

// When no agents are active, the per-agent heartbeat loop has
// nothing to send. Idle container drain acknowledgment is handled
// by the /health endpoint instead (the TownDO passes the nonce via
// X-Drain-Nonce headers in ensureContainerReady).
if (active.length === 0) return;

for (const agent of active) {
const payload: HeartbeatPayload = {
agentId: agent.agentId,
Expand Down Expand Up @@ -77,6 +128,18 @@ async function sendHeartbeats(): Promise<void> {
console.warn(
`Heartbeat failed for agent ${agent.agentId}: ${response.status} ${response.statusText}`
);
} else if (!containerReadyAcknowledged) {
// If the TownDO is draining, the heartbeat response includes a
// drainNonce. Use it to call /container-ready and clear drain.
try {
const body = (await response.json()) as { data?: { drainNonce?: string } };
const nonce = body?.data?.drainNonce;
if (nonce) {
void acknowledgeContainerReady(agent.townId, nonce);
}
} catch {
// Non-JSON or unexpected shape — ignore
}
}
} catch (err) {
console.warn(`Heartbeat error for agent ${agent.agentId}:`, err);
Expand Down
193 changes: 193 additions & 0 deletions cloudflare-gastown/container/src/process-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,199 @@ export function activeServerCount(): number {
return sdkInstances.size;
}

/**
* Gracefully drain all running agents before container eviction.
*
* 4-phase sequence:
* 1. Notify TownDO of the eviction (fire-and-forget)
* 2. Nudge running polecats/refineries to commit & push
* 3. Poll up to 10 min waiting for agents to finish
* 4. Force-save any stragglers via WIP git commit + push
*
* Never throws — all errors are logged and swallowed so the caller
* can always proceed to stopAll() + process.exit().
*/
export async function drainAll(): Promise<void> {
const DRAIN_LOG = '[drain]';

// ── Phase 1: Notify TownDO ──────────────────────────────────────────
try {
const apiUrl = process.env.GASTOWN_API_URL;
const token = process.env.GASTOWN_CONTAINER_TOKEN;
// Grab townId from any registered agent — all agents in a container
// belong to the same town.
const anyAgent = [...agents.values()][0];
const townId = anyAgent?.townId;

if (apiUrl && token && townId) {
console.log(`${DRAIN_LOG} Phase 1: notifying TownDO of container eviction`);
const resp = await fetch(`${apiUrl}/api/towns/${townId}/container-eviction`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${token}`,
},
signal: AbortSignal.timeout(10_000),
});
console.log(`${DRAIN_LOG} Phase 1: TownDO responded ${resp.status}`);
} else {
console.warn(
`${DRAIN_LOG} Phase 1: skipping TownDO notification (missing apiUrl=${!!apiUrl} token=${!!token} townId=${!!townId})`
);
}
} catch (err) {
console.warn(`${DRAIN_LOG} Phase 1: TownDO notification failed, continuing:`, err);
}

// ── Phase 2: Nudge running agents to save ───────────────────────────
const runningAgents = [...agents.values()].filter(a => a.status === 'running');
console.log(`${DRAIN_LOG} Phase 2: nudging ${runningAgents.length} running agents`);

for (const agent of runningAgents) {
try {
let nudgeMessage: string | null = null;

if (agent.role === 'polecat') {
nudgeMessage =
'URGENT: The container is shutting down in ~15 minutes. Please commit and push your current changes immediately, then call gt_done. You have 2 minutes before a forced save.';
} else if (agent.role === 'refinery') {
nudgeMessage =
'URGENT: The container is shutting down. If your review is complete, call gt_done now. Otherwise your work will be pushed as a WIP commit.';
}
// Mayor and other roles: no nudge needed

if (nudgeMessage) {
// Cancel the idle timer before nudging — if the agent was
// already idle, the timer could fire mid-nudge and exit the
// agent before it processes the eviction message.
clearIdleTimer(agent.agentId);
console.log(`${DRAIN_LOG} Phase 2: nudging ${agent.role} agent ${agent.agentId}`);
await sendMessage(agent.agentId, nudgeMessage);
}
} catch (err) {
console.warn(
`${DRAIN_LOG} Phase 2: failed to nudge agent ${agent.agentId} (${agent.role}):`,
err
);
}
}

// ── Phase 3: Wait up to 10 minutes ──────────────────────────────────
const DRAIN_WAIT_MS = 10 * 60 * 1000;
const pollInterval = 5000;
const start = Date.now();
console.log(`${DRAIN_LOG} Phase 3: waiting up to ${DRAIN_WAIT_MS / 1000}s for agents to finish`);

while (Date.now() - start < DRAIN_WAIT_MS) {
const running = [...agents.values()].filter(a => a.status === 'running');
if (running.length === 0) break;
console.log(`${DRAIN_LOG} Waiting for ${running.length} agents...`);
await new Promise(r => setTimeout(r, pollInterval));
}

// ── Phase 4: Force-save remaining agents ────────────────────────────
// Two sub-steps: first freeze all stragglers (cancel idle timers,
// abort event subscriptions and SDK sessions), then snapshot each
// worktree. Freezing first prevents the normal completion path
// (idle timer → onExit → bead completion) from racing with the WIP
// git save, and avoids .git/index.lock collisions with agent git ops.
const stragglers = [...agents.values()].filter(a => a.status === 'running');
if (stragglers.length > 0) {
console.log(`${DRAIN_LOG} Phase 4: freezing ${stragglers.length} straggler(s)`);
} else {
console.log(`${DRAIN_LOG} Phase 4: all agents finished, no force-save needed`);
}

// 4a: Freeze — cancel idle timers and abort sessions so no
// completion/exit callbacks can fire during the git snapshot.
// Only agents that freeze successfully are safe to snapshot.
const frozen: typeof stragglers = [];
for (const agent of stragglers) {
try {
// Cancel idle timer FIRST — prevents the timer from firing and
// marking the agent as completed via onExit() while we abort.
clearIdleTimer(agent.agentId);

// Abort event subscription
const controller = eventAbortControllers.get(agent.agentId);
if (controller) {
controller.abort();
eventAbortControllers.delete(agent.agentId);
}

// Abort the SDK session
const instance = sdkInstances.get(agent.workdir);
if (instance) {
await instance.client.session.abort({
path: { id: agent.sessionId },
});
}

agent.status = 'exited';
agent.exitReason = 'container eviction';
frozen.push(agent);
console.log(`${DRAIN_LOG} Phase 4: froze agent ${agent.agentId}`);
} catch (err) {
// Freeze failed — the session may still be writing to the
// worktree. Skip this agent in 4b to avoid .git/index.lock
// races and partial snapshots.
console.warn(
`${DRAIN_LOG} Phase 4: failed to freeze agent ${agent.agentId}, skipping snapshot:`,
err
);
}
}

// 4b: Snapshot — git add/commit/push each worktree now that
// all sessions are frozen. Only iterate agents that froze
// successfully; unfrozen agents are skipped to avoid racing
// with a still-active SDK session.
for (const agent of frozen) {
try {
console.log(`${DRAIN_LOG} Phase 4: force-saving agent ${agent.agentId} in ${agent.workdir}`);

// Check whether a remote named "origin" exists. Lightweight
// workspaces (mayor/triage) are created with `git init` and
// never add a remote, so pushing would fail with
// "fatal: 'origin' does not appear to be a git repository".
const remoteCheck = Bun.spawn(['git', 'remote', 'get-url', 'origin'], {
cwd: agent.workdir,
stdout: 'pipe',
stderr: 'pipe',
});
const hasOrigin = (await remoteCheck.exited) === 0;

const gitCmd = hasOrigin
? "git add -A && git commit --allow-empty -m 'WIP: container eviction save' && git push --set-upstream origin HEAD --no-verify"
: "git add -A && git commit --allow-empty -m 'WIP: container eviction save'";

if (!hasOrigin) {
console.warn(
`${DRAIN_LOG} Phase 4: no origin remote for agent ${agent.agentId}, committing locally only (push skipped)`
);
}

const proc = Bun.spawn(['bash', '-c', gitCmd], {
cwd: agent.workdir,
stdout: 'pipe',
stderr: 'pipe',
});
const exitCode = await proc.exited;
const stdout = await new Response(proc.stdout).text();
const stderr = await new Response(proc.stderr).text();
console.log(
`${DRAIN_LOG} Phase 4: agent ${agent.agentId} git save exited ${exitCode}` +
(stdout ? ` stdout=${stdout.trim()}` : '') +
(stderr ? ` stderr=${stderr.trim()}` : '')
);
} catch (err) {
console.warn(`${DRAIN_LOG} Phase 4: force-save failed for agent ${agent.agentId}:`, err);
}
}

console.log(`${DRAIN_LOG} Drain complete`);
}

export async function stopAll(): Promise<void> {
// Cancel all idle timers
for (const [, timer] of idleTimers) {
Expand Down
1 change: 1 addition & 0 deletions cloudflare-gastown/src/db/tables/town-events.table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export const TownEventType = z.enum([
'agent_done',
'agent_completed',
'container_status',
'container_eviction',
'pr_status_changed',
'bead_created',
'bead_cancelled',
Expand Down
Loading
Loading