Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/query-progress-metrics.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@chkit/clickhouse": patch
"@chkit/plugin-backfill": patch
---

Report intermediate query metrics (read_rows, written_rows, elapsed, etc.) from system.processes during backfill polling. Previously queryStatus returned only `{ status: 'running' }` with no metrics, and onProgress only fired on state transitions. Now every poll with metric changes triggers onProgress, giving visibility into long-running chunks.
25 changes: 21 additions & 4 deletions packages/clickhouse/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ import {

export interface QueryStatus {
status: 'running' | 'finished' | 'failed' | 'unknown'
readRows?: number
readBytes?: number
writtenRows?: number
writtenBytes?: number
elapsedMs?: number
durationMs?: number
error?: string
}
Expand Down Expand Up @@ -273,13 +276,27 @@ export function createClickHouseExecutor(config: NonNullable<ChxConfig['clickhou
async queryStatus(queryId: string, options?: { afterTime?: string }): Promise<QueryStatus> {
try {
const running = await client.query({
query: `SELECT count() AS cnt FROM clusterAllReplicas('parallel_replicas', system.processes) WHERE query_id = {qid:String} SETTINGS skip_unavailable_shards = 1`,
query: `SELECT read_rows, read_bytes, written_rows, written_bytes, elapsed FROM clusterAllReplicas('parallel_replicas', system.processes) WHERE query_id = {qid:String} SETTINGS skip_unavailable_shards = 1`,
query_params: { qid: queryId },
format: 'JSONEachRow',
})
const runningRows = await running.json<{ cnt: string }>()
if (Number(runningRows[0]?.cnt) > 0) {
return { status: 'running' }
const runningRows = await running.json<{
read_rows: string
read_bytes: string
written_rows: string
written_bytes: string
elapsed: string
}>()
if (runningRows.length > 0) {
const proc = runningRows[0]!
return {
status: 'running',
readRows: Number(proc.read_rows),
readBytes: Number(proc.read_bytes),
writtenRows: Number(proc.written_rows),
writtenBytes: Number(proc.written_bytes),
elapsedMs: Math.round(Number(proc.elapsed) * 1000),
}
}

const afterTime = options?.afterTime ?? '1970-01-01T00:00:00Z'
Expand Down
38 changes: 38 additions & 0 deletions packages/plugin-backfill/src/async-backfill.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,44 @@ describe('executeBackfill', () => {
expect(lastSnapshot.c1.status).toBe('done')
})

test('reports intermediate metrics while running', async () => {
const statuses = new Map<string, QueryStatus[]>([
[`backfill-${PLAN_ID}-c1`, [
{ status: 'running', readRows: 100, readBytes: 400, writtenRows: 0, writtenBytes: 0, elapsedMs: 1000 },
{ status: 'running', readRows: 500, readBytes: 2000, writtenRows: 50, writtenBytes: 200, elapsedMs: 3000 },
{ status: 'finished', writtenRows: 200, writtenBytes: 800, durationMs: 5000 },
]],
])

const progressSnapshots: BackfillProgress[] = []

await executeBackfill({
executor: createMockExecutor(statuses),
planId: PLAN_ID,
chunks: [chunks[0]],
buildQuery: () => 'SELECT 1',
pollIntervalMs: 10,
onProgress: (p) => { progressSnapshots.push(structuredClone(p)) },
})

// Should have at least 3 progress calls: submitted, running (with metrics), done
expect(progressSnapshots.length).toBeGreaterThanOrEqual(3)

// Find the running snapshots with metrics
const runningSnapshots = progressSnapshots.filter((p) => p.c1.status === 'running')
expect(runningSnapshots.length).toBe(2)
expect(runningSnapshots[0].c1.readRows).toBe(100)
expect(runningSnapshots[0].c1.elapsedMs).toBe(1000)
expect(runningSnapshots[1].c1.readRows).toBe(500)
expect(runningSnapshots[1].c1.writtenRows).toBe(50)
expect(runningSnapshots[1].c1.elapsedMs).toBe(3000)

// Final snapshot should be done
const lastSnapshot = progressSnapshots[progressSnapshots.length - 1]
expect(lastSnapshot.c1.status).toBe('done')
expect(lastSnapshot.c1.writtenRows).toBe(200)
})

test('resumes from saved progress', async () => {
const queryId = `backfill-${PLAN_ID}-c2`
const statuses = new Map<string, QueryStatus[]>([
Expand Down
21 changes: 19 additions & 2 deletions packages/plugin-backfill/src/async-backfill.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@ export interface BackfillChunkState {
queryId?: string
submittedAt?: string
finishedAt?: string
durationMs?: number
readRows?: number
readBytes?: number
writtenRows?: number
writtenBytes?: number
elapsedMs?: number
durationMs?: number
error?: string
}

Expand All @@ -58,7 +61,21 @@ function applyQueryStatus(
qs: QueryStatus,
): { state: BackfillChunkState; changed: boolean } {
if (qs.status === 'running') {
return { state: { ...state, status: 'running' }, changed: state.status !== 'running' }
const next: BackfillChunkState = {
...state,
status: 'running',
readRows: qs.readRows,
readBytes: qs.readBytes,
writtenRows: qs.writtenRows,
writtenBytes: qs.writtenBytes,
elapsedMs: qs.elapsedMs,
}
const metricsChanged =
state.status !== 'running' ||
state.readRows !== qs.readRows ||
state.writtenRows !== qs.writtenRows ||
state.elapsedMs !== qs.elapsedMs
return { state: next, changed: metricsChanged }
}
if (qs.status === 'finished') {
return {
Expand Down
Loading