Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions cloudflare-gastown/src/dos/Town.do.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1017,6 +1017,16 @@ export class TownDO extends DurableObject<Env> {
return beadOps.listBeadEvents(this.sql, options);
}

/** Overview stats for town list cards: bead counts, active agents, sparkline. */
async getOverviewStats(): Promise<beadOps.OverviewStats> {
return beadOps.getOverviewStats(this.sql);
}

/** Count beads closed in the last 7 days. */
async countClosedLast7d(): Promise<number> {
return beadOps.countClosedLast7d(this.sql);
}

/**
* Partially update a bead's editable fields.
* Only fields explicitly provided are updated (partial update semantics).
Expand Down
127 changes: 127 additions & 0 deletions cloudflare-gastown/src/dos/town/beads.ts
Original file line number Diff line number Diff line change
Expand Up @@ -939,3 +939,130 @@ export function getConvoyFeatureBranch(sql: SqlStorage, convoyId: string): strin
if (rows.length === 0) return null;
return z.object({ feature_branch: z.string().nullable() }).parse(rows[0]).feature_branch;
}

// ── Overview Stats (for town list overview cards) ───────────────────

export type OverviewStats = {
beadCounts: {
open: number;
in_progress: number;
in_review: number;
closed: number;
failed: number;
};
activeAgents: number;
lastActivityAt: string | null;
activitySparkline: number[];
};

const CountRow = z.object({ status: z.string(), cnt: z.coerce.number() });
const AgentCountRow = z.object({ cnt: z.coerce.number() });
const LastActivityRow = z.object({ last_at: z.string().nullable() });
const BucketRow = z.object({ bucket_idx: z.coerce.number(), cnt: z.coerce.number() });

/**
* Compute overview stats for a single town in one batch:
* - Bead counts by status (excluding agent/message types)
* - Active agent count (working or stalled)
* - Last activity timestamp
* - 24h activity sparkline (48 buckets, 30 min each)
*/
export function getOverviewStats(sql: SqlStorage): OverviewStats {
// 1. Bead counts by status
const beadRows = [
...query(
sql,
/* sql */ `
SELECT ${beads.status} AS status, COUNT(*) AS cnt
FROM ${beads}
WHERE ${beads.type} NOT IN ('agent', 'message')
GROUP BY ${beads.status}
`,
[]
),
];
const beadCounts = { open: 0, in_progress: 0, in_review: 0, closed: 0, failed: 0 };
for (const row of CountRow.array().parse(beadRows)) {
const s = row.status;
if (s === 'open') beadCounts.open = row.cnt;
else if (s === 'in_progress') beadCounts.in_progress = row.cnt;
else if (s === 'in_review') beadCounts.in_review = row.cnt;
else if (s === 'closed') beadCounts.closed = row.cnt;
else if (s === 'failed') beadCounts.failed = row.cnt;
}

// 2. Active agents (working or stalled)
const agentRows = [
...query(
sql,
/* sql */ `
SELECT COUNT(*) AS cnt
FROM ${agent_metadata}
WHERE ${agent_metadata.status} IN ('working', 'stalled')
`,
[]
),
];
const activeAgents = AgentCountRow.parse(agentRows[0] ?? { cnt: 0 }).cnt;

// 3. Last activity (most recent bead event)
const lastRows = [
...query(
sql,
/* sql */ `
SELECT MAX(${bead_events.created_at}) AS last_at
FROM ${bead_events}
`,
[]
),
];
const lastActivityAt = LastActivityRow.parse(lastRows[0] ?? { last_at: null }).last_at;

// 4. 24h sparkline — count events per 30-min bucket
// Bucket 0 = 24h ago, bucket 47 = now
const cutoff = new Date(Date.now() - 24 * 60 * 60 * 1000).toISOString();
const bucketRows = [
...query(
sql,
/* sql */ `
SELECT
CAST((julianday(${bead_events.created_at}) - julianday(?)) * 48.0 AS INTEGER) AS bucket_idx,
COUNT(*) AS cnt
FROM ${bead_events}
WHERE ${bead_events.created_at} > ?
GROUP BY bucket_idx
HAVING bucket_idx >= 0 AND bucket_idx < 48
`,
[cutoff, cutoff]
),
];
const sparkline = new Array<number>(48).fill(0);
for (const row of BucketRow.array().parse(bucketRows)) {
if (row.bucket_idx >= 0 && row.bucket_idx < 48) {
sparkline[row.bucket_idx] = row.cnt;
}
}

return { beadCounts, activeAgents, lastActivityAt, activitySparkline: sparkline };
}

/**
* Count beads closed in the last 7 days (excluding agent/message types).
*/
export function countClosedLast7d(sql: SqlStorage): number {
const cutoff = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000).toISOString();
const rows = [
...query(
sql,
/* sql */ `
SELECT COUNT(*) AS cnt
FROM ${beads}
WHERE ${beads.type} NOT IN ('agent', 'message')
AND ${beads.status} = 'closed'
AND ${beads.columns.closed_at} > ?
`,
[cutoff]
),
];
return AgentCountRow.parse(rows[0] ?? { cnt: 0 }).cnt;
}
179 changes: 179 additions & 0 deletions cloudflare-gastown/src/trpc/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
/* eslint-disable @typescript-eslint/await-thenable -- DO RPC stubs return Rpc.Promisified which is thenable at runtime */
import { TRPCError } from '@trpc/server';
import { z } from 'zod';
import { sql as dsql, and, eq, isNull, gte } from 'drizzle-orm';
import { getWorkerDb } from '@kilocode/db/client';
import { microdollar_usage } from '@kilocode/db/schema';
import { router, gastownProcedure, adminProcedure } from './init';
import { getTownDOStub } from '../dos/Town.do';
import { getTownContainerStub } from '../dos/TownContainer.do';
Expand Down Expand Up @@ -35,6 +38,7 @@ import {
RpcAlarmStatusOutput,
RpcOrgTownOutput,
RpcMergeQueueDataOutput,
RpcTownOverviewOutput,
} from './schemas';
import type { TRPCContext } from './init';

Expand Down Expand Up @@ -96,6 +100,38 @@ function listAccessibleOrgIds(memberships: JwtOrgMembership[]): string[] {
return memberships.filter(m => m.role !== 'billing_manager').map(m => m.orgId);
}

/**
* Query 7-day cost and token totals from microdollar_usage via Hyperdrive.
* Filters by kilo_user_id (personal) or organization_id (org).
*/
async function queryUsageLast7d(
env: Env,
scope: { type: 'user'; userId: string } | { type: 'org'; organizationId: string }
): Promise<{ costMicrodollars: number; tokens: number }> {
if (!env.HYPERDRIVE) return { costMicrodollars: 0, tokens: 0 };
const db = getWorkerDb(env.HYPERDRIVE.connectionString, { statement_timeout: 5_000 });
const cutoff = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000).toISOString();
const ownerFilter =
scope.type === 'user'
? and(
eq(microdollar_usage.kilo_user_id, scope.userId),
isNull(microdollar_usage.organization_id)
)
: eq(microdollar_usage.organization_id, scope.organizationId);
const rows = await db
.select({
totalCost: dsql<number>`COALESCE(SUM(${microdollar_usage.cost})::float, 0)`,
totalTokens: dsql<number>`COALESCE(SUM(${microdollar_usage.input_tokens} + ${microdollar_usage.output_tokens})::float, 0)`,
})
.from(microdollar_usage)
.where(and(ownerFilter, gte(microdollar_usage.created_at, cutoff)));
const row = rows[0];
return {
costMicrodollars: row?.totalCost ?? 0,
tokens: row?.totalTokens ?? 0,
};
}

/**
* Common interface for the rig/town management methods shared by
* GastownUserDO and GastownOrgDO stubs. Used to abstract over
Expand Down Expand Up @@ -356,6 +392,76 @@ export const gastownRouter = router({
return userStub.listTowns();
}),

/**
* Overview data for the town list page — cards with bead counts,
* sparklines, active agents, plus aggregate stats across all towns.
*/
getTownOverview: gastownProcedure.output(RpcTownOverviewOutput).query(async ({ ctx }) => {
const userStub = getGastownUserStub(ctx.env, ctx.userId);

// Fan out DO stats + Postgres usage query in parallel
const [towns, usage] = await Promise.all([
userStub.listTowns(),
queryUsageLast7d(ctx.env, { type: 'user', userId: ctx.userId }),
]);

const statsResults = await Promise.allSettled(
towns.map(async town => {
const townStub = getTownDOStub(ctx.env, town.id);
const [overview, closedLast7d] = await Promise.all([
townStub.getOverviewStats() as Promise<{
beadCounts: {
open: number;
in_progress: number;
in_review: number;
closed: number;
failed: number;
};
activeAgents: number;
lastActivityAt: string | null;
activitySparkline: number[];
}>,
townStub.countClosedLast7d() as Promise<number>,
]);
return { town, overview, closedLast7d };
})
);

const cards = [];
let totalOpen = 0;
let totalClosedLast7d = 0;
let totalActiveAgents = 0;

for (const result of statsResults) {
if (result.status === 'rejected') continue;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WARNING: Silently dropping failed town stats can misclassify non-empty accounts as empty

listTowns() has already confirmed the town exists, but this continue removes it from cards whenever a TownDO RPC fails. In the personal overview that can produce cards.length === 0, which triggers the onboarding redirect even for users who already have towns. Consider returning a fallback card or surfacing an error instead of skipping the town entirely. The same pattern appears again in getOrgTownOverview below.

const { town, overview, closedLast7d } = result.value;
cards.push({
townId: town.id,
name: town.name,
lastActivityAt: overview.lastActivityAt,
beadCounts: overview.beadCounts,
activeAgents: overview.activeAgents,
activitySparkline: overview.activitySparkline,
});
totalOpen +=
overview.beadCounts.open + overview.beadCounts.in_progress + overview.beadCounts.in_review;
totalClosedLast7d += closedLast7d;
totalActiveAgents += overview.activeAgents;
}

return {
cards,
aggregate: {
totalTowns: cards.length,
openBeads: totalOpen,
closedLast7d: totalClosedLast7d,
activeAgents: totalActiveAgents,
costLast7dMicrodollars: usage.costMicrodollars,
tokensLast7d: usage.tokens,
},
};
}),

getTown: gastownProcedure
.input(z.object({ townId: z.string().uuid() }))
.output(RpcTownOutput)
Expand Down Expand Up @@ -1269,6 +1375,79 @@ export const gastownRouter = router({
return stub.listTowns();
}),

getOrgTownOverview: gastownProcedure
.input(z.object({ organizationId: z.string().uuid() }))
.output(RpcTownOverviewOutput)
.query(async ({ input, ctx }) => {
const membership = getOrgMembership(ctx.orgMemberships, input.organizationId);
if (!membership || membership.role === 'billing_manager')
throw new TRPCError({ code: 'FORBIDDEN' });
const orgStub = getGastownOrgStub(ctx.env, input.organizationId);

const [towns, usage] = await Promise.all([
orgStub.listTowns(),
queryUsageLast7d(ctx.env, { type: 'org', organizationId: input.organizationId }),
]);

const statsResults = await Promise.allSettled(
towns.map(async town => {
const townStub = getTownDOStub(ctx.env, town.id);
const [overview, closedLast7d] = await Promise.all([
townStub.getOverviewStats() as Promise<{
beadCounts: {
open: number;
in_progress: number;
in_review: number;
closed: number;
failed: number;
};
activeAgents: number;
lastActivityAt: string | null;
activitySparkline: number[];
}>,
townStub.countClosedLast7d() as Promise<number>,
]);
return { town, overview, closedLast7d };
})
);

const cards = [];
let totalOpen = 0;
let totalClosedLast7d = 0;
let totalActiveAgents = 0;

for (const result of statsResults) {
if (result.status === 'rejected') continue;
const { town, overview, closedLast7d } = result.value;
cards.push({
townId: town.id,
name: town.name,
lastActivityAt: overview.lastActivityAt,
beadCounts: overview.beadCounts,
activeAgents: overview.activeAgents,
activitySparkline: overview.activitySparkline,
});
totalOpen +=
overview.beadCounts.open +
overview.beadCounts.in_progress +
overview.beadCounts.in_review;
totalClosedLast7d += closedLast7d;
totalActiveAgents += overview.activeAgents;
}

return {
cards,
aggregate: {
totalTowns: cards.length,
openBeads: totalOpen,
closedLast7d: totalClosedLast7d,
activeAgents: totalActiveAgents,
costLast7dMicrodollars: usage.costMicrodollars,
tokensLast7d: usage.tokens,
},
};
}),

createOrgTown: gastownProcedure
.input(z.object({ organizationId: z.string().uuid(), name: z.string().min(1).max(64) }))
.output(RpcOrgTownOutput)
Expand Down
Loading
Loading