Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
228dd86
docs(gastown): add local debug testing guide and drain test script
jrf0110 Apr 3, 2026
bf7a035
fix(gastown): refresh container token when mayor is alive but waiting…
jrf0110 Apr 3, 2026
7fd0e5e
feat(gastown): skip review queue for gt:pr-fixup beads (#1985)
jrf0110 Apr 3, 2026
0349fcc
feat(container): expand apt-get block with build tools, ripgrep, and …
jrf0110 Apr 3, 2026
ca6e7fb
feat(gastown): polecat creates PRs, refinery reviews via GitHub, code…
jrf0110 Apr 4, 2026
f31a378
fix(gastown): check commit statuses, propagate hasUncheckedRuns, gate…
jrf0110 Apr 4, 2026
2460b2c
style: auto-format files for CI
jrf0110 Apr 4, 2026
3ae19f2
fix(gastown): bug fixes — org billing (#1756), reconciler spam (#1364…
jrf0110 Apr 4, 2026
d75325a
fix(reconciler): implement poison event protection in alarm loop
Apr 4, 2026
a21ebac
fix(reconciler): Rule 4 now catches both open and in_progress MR bead…
Apr 4, 2026
c2e87af
fix(reconciler): emit stop_agent when bead is cancelled with a workin…
Apr 4, 2026
630d4f9
fix(gastown): deduplicate checkPRFeedback call and hoist circuit breaker
Apr 4, 2026
a225e49
fix(reconciler): defer async SQL writes to synchronous event drain (B3)
Apr 4, 2026
ec1a624
fix(reconciler): add from-state guard to transition_bead action handler
Apr 4, 2026
240a4bd
fix(gastown): rollback agent to idle on dispatch container start failure
Apr 4, 2026
6c72827
fix(reconciler): parameterize circuit breaker SQL query (#1999)
jrf0110 Apr 4, 2026
188e246
fix(actions,reconciler): address review feedback on dispatch rollback…
Apr 4, 2026
10b33aa
merge: resolve conflicts with main for PR #2001
Apr 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cloudflare-gastown/container/plugin/mayor-tools.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ export function createMayorTools(client: MayorGastownClient) {
.array(tool.schema.string())
.describe('Labels to attach to the bead (e.g. ["gt:pr-fixup"])')
.optional(),
labels: tool.schema
.array(tool.schema.string())
.describe('Labels to attach to the bead (e.g. ["gt:pr-fixup"])')
.optional(),
},
async execute(args) {
const result = await client.sling({
Expand Down
9 changes: 9 additions & 0 deletions cloudflare-gastown/src/db/tables/town-events.table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ export const TownEventRecord = z.object({
.pipe(z.record(z.string(), z.unknown())),
created_at: z.string(),
processed_at: z.string().nullable(),
retry_count: z.coerce.number().default(0),
});

export type TownEventRecord = z.output<typeof TownEventRecord>;
Expand All @@ -50,9 +51,17 @@ export function createTableTownEvents(): string {
payload: `text not null default '{}'`,
created_at: `text not null`,
processed_at: `text`,
retry_count: `integer not null default 0`,
});
}

/** Idempotent ALTER statements for existing databases. */
export function migrateTownEvents(): string[] {
return [
`ALTER TABLE ${town_events} ADD COLUMN ${town_events.columns.retry_count} integer not null default 0`,
];
}

export function getIndexesTownEvents(): string[] {
return [
`CREATE INDEX IF NOT EXISTS idx_town_events_pending ON ${town_events}(${town_events.columns.created_at}) WHERE ${town_events.columns.processed_at} IS NULL`,
Expand Down
97 changes: 79 additions & 18 deletions cloudflare-gastown/src/dos/Town.do.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3460,26 +3460,75 @@ export class TownDO extends DurableObject<Env> {
};

// Phase 0: Drain events and apply state transitions
// Events may return actions (e.g. stop_agent on bead cancellation)
// that are merged into the Phase 1 action list.
const eventActions: Action[] = [];
try {
const pending = events.drainEvents(this.sql);
metrics.eventsDrained = pending.length;
if (pending.length > 0) {
logger.info('reconciler: draining events', { count: pending.length });
}
for (const event of pending) {
// Skip poison events that have already exceeded the retry limit
if (event.retry_count >= events.MAX_EVENT_RETRIES) {
logger.error('reconciler: marking poison event as processed', {
eventId: event.event_id,
eventType: event.event_type,
retryCount: event.retry_count,
payload: event.payload,
});
Sentry.captureException(
new Error(`Poison event skipped: ${event.event_type} (${event.event_id})`),
{
extra: {
eventId: event.event_id,
eventType: event.event_type,
retryCount: event.retry_count,
payload: event.payload,
},
}
);
events.markPoisoned(this.sql, event.event_id);
continue;
}

try {
reconciler.applyEvent(this.sql, event);
const actions = reconciler.applyEvent(this.sql, event);
eventActions.push(...actions);
events.markProcessed(this.sql, event.event_id);
} catch (err) {
const retryCount = events.incrementRetryCount(this.sql, event.event_id);
const errorMessage = err instanceof Error ? err.message : String(err);
logger.error('reconciler: applyEvent failed', {
eventId: event.event_id,
eventType: event.event_type,
error: err instanceof Error ? err.message : String(err),
retryCount,
maxRetries: events.MAX_EVENT_RETRIES,
error: errorMessage,
});
// Event stays unprocessed — will be retried on the next alarm tick.
// Mark it processed anyway after 3 consecutive failures to prevent
// a poison event from blocking the entire queue forever.
// For now, we skip it and let the next tick retry.
if (retryCount >= events.MAX_EVENT_RETRIES) {
logger.error('reconciler: event exceeded retry limit, marking as poison', {
eventId: event.event_id,
eventType: event.event_type,
retryCount,
payload: event.payload,
});
Sentry.captureException(
new Error(
`Poison event after ${retryCount} failures: ${event.event_type} (${event.event_id}) — ${errorMessage}`
),
{
extra: {
eventId: event.event_id,
eventType: event.event_type,
retryCount,
payload: event.payload,
},
}
);
events.markPoisoned(this.sql, event.event_id);
}
}
}
} catch (err) {
Expand Down Expand Up @@ -3510,10 +3559,13 @@ export class TownDO extends DurableObject<Env> {
const sideEffects: Array<() => Promise<void>> = [];
try {
const townConfig = await this.getTownConfig();
const actions = reconciler.reconcile(this.sql, {
draining: this._draining,
refineryCodeReview: townConfig.refinery?.code_review ?? true,
});
const actions = [
...eventActions,
...reconciler.reconcile(this.sql, {
draining: this._draining,
refineryCodeReview: townConfig.refinery?.code_review ?? true,
}),
];
metrics.actionsEmitted = actions.length;
for (const a of actions) {
metrics.actionsByType[a.type] = (metrics.actionsByType[a.type] ?? 0) + 1;
Expand Down Expand Up @@ -4038,6 +4090,7 @@ export class TownDO extends DurableObject<Env> {
const token = await this.resolveGitHubToken(townConfig);
if (!token) {
console.warn(`${TOWN_LOG} checkPRStatus: no GitHub token available, cannot poll ${prUrl}`);

return null;
}

Expand Down Expand Up @@ -4743,15 +4796,19 @@ export class TownDO extends DurableObject<Env> {
]);

// Apply each event to reconstruct state transitions
const replayEventActions: Action[] = [];
for (const event of rangeEvents) {
reconciler.applyEvent(this.sql, event);
replayEventActions.push(...reconciler.applyEvent(this.sql, event));
}

// Run reconciler against the resulting state
const tc = await this.getTownConfig();
const actions = reconciler.reconcile(this.sql, {
refineryCodeReview: tc.refinery?.code_review ?? true,
});
const actions = [
...replayEventActions,
...reconciler.reconcile(this.sql, {
refineryCodeReview: tc.refinery?.code_review ?? true,
}),
];

// Capture a state snapshot before rollback
const agentSnapshot = [
Expand Down Expand Up @@ -4824,16 +4881,20 @@ export class TownDO extends DurableObject<Env> {
try {
// Phase 0: Drain and apply pending events (same as real alarm loop)
const pending = events.drainEvents(this.sql);
const dryRunEventActions: Action[] = [];
for (const event of pending) {
reconciler.applyEvent(this.sql, event);
dryRunEventActions.push(...reconciler.applyEvent(this.sql, event));
events.markProcessed(this.sql, event.event_id);
}

// Phase 1: Reconcile against now-current state
const tc2 = await this.getTownConfig();
const actions = reconciler.reconcile(this.sql, {
refineryCodeReview: tc2.refinery?.code_review ?? true,
});
const actions = [
...dryRunEventActions,
...reconciler.reconcile(this.sql, {
refineryCodeReview: tc2.refinery?.code_review ?? true,
}),
];
const pendingEventCount = events.pendingEventCount(this.sql);
const actionsByType: Record<string, number> = {};
for (const a of actions) {
Expand Down
36 changes: 29 additions & 7 deletions cloudflare-gastown/src/dos/town/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,19 @@ export function applyAction(ctx: ApplyActionContext, action: Action): (() => Pro
// ── Bead mutations ──────────────────────────────────────────

case 'transition_bead': {
// When `from` is specified, verify the bead is in the expected state
// before transitioning. This guards against concurrent rule firings
// producing unexpected transitions. (H3, S1 — reconciliation-spec §4)
if (action.from !== null) {
const currentBead = beadOps.getBead(sql, action.bead_id);
if (currentBead && currentBead.status !== action.from) {
console.warn(
`${LOG} transition_bead: expected from=${action.from} but bead is ${currentBead.status}, skipping (bead=${action.bead_id})`
);
return null;
}
}

try {
const failureReason =
action.to === 'failed'
Expand Down Expand Up @@ -558,16 +571,25 @@ export function applyAction(ctx: ApplyActionContext, action: Action): (() => Pro

const capturedAgentId = agentId;
return async () => {
// Best-effort dispatch. If it fails, the agent stays 'working'
// and the bead stays 'in_progress'. The reconciler detects the
// mismatch on the next tick (idle agent hooked to in_progress
// bead) and retries dispatch.
await ctx.dispatchAgent(capturedAgentId, beadId, rigId).catch(err => {
// Best-effort dispatch. If it fails (thrown error or resolved
// false), roll the agent back to 'idle' so the reconciler can
// retry on the next tick. The bead stays 'in_progress' — no
// separate recovery needed (§5.4).
let accepted = false;
try {
accepted = await ctx.dispatchAgent(capturedAgentId, beadId, rigId);
} catch (err) {
console.warn(
`${LOG} dispatch_agent: container start failed for agent=${capturedAgentId} bead=${beadId}`,
`${LOG} dispatch_agent: container start threw for agent=${capturedAgentId} bead=${beadId}, rolling back to idle`,
err
);
});
}
if (!accepted) {
console.warn(
`${LOG} dispatch_agent: container did not accept start for agent=${capturedAgentId} bead=${beadId}, rolling back to idle`
);
agentOps.updateAgentStatus(sql, capturedAgentId, 'idle');
}
};
}

Expand Down
54 changes: 53 additions & 1 deletion cloudflare-gastown/src/dos/town/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
TownEventRecord,
createTableTownEvents,
getIndexesTownEvents,
migrateTownEvents,
} from '../../db/tables/town-events.table';
import type { TownEventType } from '../../db/tables/town-events.table';
import { query } from '../../util/query.util';
Expand All @@ -26,6 +27,16 @@ function now(): string {
/** Create the town_events table and indexes. Idempotent. */
export function initTownEventsTable(sql: SqlStorage): void {
query(sql, createTableTownEvents(), []);

// Migrations: add columns to existing tables (idempotent)
for (const stmt of migrateTownEvents()) {
try {
query(sql, stmt, []);
} catch {
// Column already exists — expected after first run
}
}

for (const idx of getIndexesTownEvents()) {
query(sql, idx, []);
}
Expand Down Expand Up @@ -138,6 +149,9 @@ export function upsertContainerStatus(
});
}

/** Maximum number of times an event is retried before being marked as poison. */
export const MAX_EVENT_RETRIES = 3;

/**
* Drain all unprocessed events, ordered by creation time.
* Returns events with processed_at = NULL, oldest first.
Expand All @@ -150,7 +164,7 @@ export function drainEvents(sql: SqlStorage): TownEventRecord[] {
SELECT ${town_events.event_id}, ${town_events.event_type},
${town_events.agent_id}, ${town_events.bead_id},
${town_events.payload}, ${town_events.created_at},
${town_events.processed_at}
${town_events.processed_at}, ${town_events.retry_count}
FROM ${town_events}
WHERE ${town_events.processed_at} IS NULL
ORDER BY ${town_events.created_at} ASC
Expand All @@ -174,6 +188,44 @@ export function markProcessed(sql: SqlStorage, eventId: string): void {
);
}

/**
* Increment the retry count for an event that failed processing.
* Returns the new retry count.
*/
export function incrementRetryCount(sql: SqlStorage, eventId: string): number {
const rows = [
...query(
sql,
/* sql */ `
UPDATE ${town_events}
SET ${town_events.columns.retry_count} = ${town_events.columns.retry_count} + 1
WHERE ${town_events.event_id} = ?
RETURNING ${town_events.retry_count}
`,
[eventId]
),
];
const row = rows[0];
return typeof row?.retry_count === 'number' ? row.retry_count : 1;
}

/**
* Mark an event as a poison event — it has exceeded the retry limit and
* is being removed from the queue to unblock processing. Sets processed_at
* so drainEvents no longer returns it.
*/
export function markPoisoned(sql: SqlStorage, eventId: string): void {
query(
sql,
/* sql */ `
UPDATE ${town_events}
SET ${town_events.columns.processed_at} = ?
WHERE ${town_events.event_id} = ?
`,
[now(), eventId]
);
}

/**
* Delete old processed events beyond the retention window.
* Only deletes events that have been processed (processed_at IS NOT NULL)
Expand Down
Loading
Loading