From 4cf4fdcdfda37461ef9840ba743898d1ed8d1eca Mon Sep 17 00:00:00 2001 From: KeKs0r Date: Sat, 28 Mar 2026 20:48:08 +0100 Subject: [PATCH 1/2] feat(plugin-backfill): report intermediate query metrics during polling queryStatus now fetches read_rows, written_rows, elapsed etc. from system.processes instead of just count(). onProgress fires whenever metrics change, not only on state transitions, giving visibility into long-running chunks. Co-Authored-By: Claude Opus 4.6 --- .changeset/query-progress-metrics.md | 6 +++ packages/clickhouse/src/index.ts | 25 ++++++++++-- .../src/async-backfill.test.ts | 38 +++++++++++++++++++ .../plugin-backfill/src/async-backfill.ts | 21 +++++++++- 4 files changed, 84 insertions(+), 6 deletions(-) create mode 100644 .changeset/query-progress-metrics.md diff --git a/.changeset/query-progress-metrics.md b/.changeset/query-progress-metrics.md new file mode 100644 index 0000000..2c497b6 --- /dev/null +++ b/.changeset/query-progress-metrics.md @@ -0,0 +1,6 @@ +--- +"@chkit/clickhouse": patch +"@chkit/plugin-backfill": patch +--- + +Report intermediate query metrics (read_rows, written_rows, elapsed, etc.) from system.processes during backfill polling. Previously queryStatus returned only `{ status: 'running' }` with no metrics, and onProgress only fired on state transitions. Now every poll with metric changes triggers onProgress, giving visibility into long-running chunks. diff --git a/packages/clickhouse/src/index.ts b/packages/clickhouse/src/index.ts index fdd5c8b..704b371 100644 --- a/packages/clickhouse/src/index.ts +++ b/packages/clickhouse/src/index.ts @@ -19,8 +19,11 @@ import { export interface QueryStatus { status: 'running' | 'finished' | 'failed' | 'unknown' + readRows?: number + readBytes?: number writtenRows?: number writtenBytes?: number + elapsedMs?: number durationMs?: number error?: string } @@ -273,13 +276,27 @@ export function createClickHouseExecutor(config: NonNullable { try { const running = await client.query({ - query: `SELECT count() AS cnt FROM clusterAllReplicas('parallel_replicas', system.processes) WHERE query_id = {qid:String} SETTINGS skip_unavailable_shards = 1`, + query: `SELECT read_rows, read_bytes, written_rows, written_bytes, elapsed FROM clusterAllReplicas('parallel_replicas', system.processes) WHERE query_id = {qid:String} SETTINGS skip_unavailable_shards = 1 LIMIT 1`, query_params: { qid: queryId }, format: 'JSONEachRow', }) - const runningRows = await running.json<{ cnt: string }>() - if (Number(runningRows[0]?.cnt) > 0) { - return { status: 'running' } + const runningRows = await running.json<{ + read_rows: string + read_bytes: string + written_rows: string + written_bytes: string + elapsed: string + }>() + if (runningRows.length > 0) { + const proc = runningRows[0]! + return { + status: 'running', + readRows: Number(proc.read_rows), + readBytes: Number(proc.read_bytes), + writtenRows: Number(proc.written_rows), + writtenBytes: Number(proc.written_bytes), + elapsedMs: Math.round(Number(proc.elapsed) * 1000), + } } const afterTime = options?.afterTime ?? '1970-01-01T00:00:00Z' diff --git a/packages/plugin-backfill/src/async-backfill.test.ts b/packages/plugin-backfill/src/async-backfill.test.ts index d92df05..7ed13eb 100644 --- a/packages/plugin-backfill/src/async-backfill.test.ts +++ b/packages/plugin-backfill/src/async-backfill.test.ts @@ -129,6 +129,44 @@ describe('executeBackfill', () => { expect(lastSnapshot.c1.status).toBe('done') }) + test('reports intermediate metrics while running', async () => { + const statuses = new Map([ + [`backfill-${PLAN_ID}-c1`, [ + { status: 'running', readRows: 100, readBytes: 400, writtenRows: 0, writtenBytes: 0, elapsedMs: 1000 }, + { status: 'running', readRows: 500, readBytes: 2000, writtenRows: 50, writtenBytes: 200, elapsedMs: 3000 }, + { status: 'finished', writtenRows: 200, writtenBytes: 800, durationMs: 5000 }, + ]], + ]) + + const progressSnapshots: BackfillProgress[] = [] + + await executeBackfill({ + executor: createMockExecutor(statuses), + planId: PLAN_ID, + chunks: [chunks[0]], + buildQuery: () => 'SELECT 1', + pollIntervalMs: 10, + onProgress: (p) => { progressSnapshots.push(structuredClone(p)) }, + }) + + // Should have at least 3 progress calls: submitted, running (with metrics), done + expect(progressSnapshots.length).toBeGreaterThanOrEqual(3) + + // Find the running snapshots with metrics + const runningSnapshots = progressSnapshots.filter((p) => p.c1.status === 'running') + expect(runningSnapshots.length).toBe(2) + expect(runningSnapshots[0].c1.readRows).toBe(100) + expect(runningSnapshots[0].c1.elapsedMs).toBe(1000) + expect(runningSnapshots[1].c1.readRows).toBe(500) + expect(runningSnapshots[1].c1.writtenRows).toBe(50) + expect(runningSnapshots[1].c1.elapsedMs).toBe(3000) + + // Final snapshot should be done + const lastSnapshot = progressSnapshots[progressSnapshots.length - 1] + expect(lastSnapshot.c1.status).toBe('done') + expect(lastSnapshot.c1.writtenRows).toBe(200) + }) + test('resumes from saved progress', async () => { const queryId = `backfill-${PLAN_ID}-c2` const statuses = new Map([ diff --git a/packages/plugin-backfill/src/async-backfill.ts b/packages/plugin-backfill/src/async-backfill.ts index 7c92260..d86fa44 100644 --- a/packages/plugin-backfill/src/async-backfill.ts +++ b/packages/plugin-backfill/src/async-backfill.ts @@ -29,9 +29,12 @@ export interface BackfillChunkState { queryId?: string submittedAt?: string finishedAt?: string - durationMs?: number + readRows?: number + readBytes?: number writtenRows?: number writtenBytes?: number + elapsedMs?: number + durationMs?: number error?: string } @@ -58,7 +61,21 @@ function applyQueryStatus( qs: QueryStatus, ): { state: BackfillChunkState; changed: boolean } { if (qs.status === 'running') { - return { state: { ...state, status: 'running' }, changed: state.status !== 'running' } + const next: BackfillChunkState = { + ...state, + status: 'running', + readRows: qs.readRows, + readBytes: qs.readBytes, + writtenRows: qs.writtenRows, + writtenBytes: qs.writtenBytes, + elapsedMs: qs.elapsedMs, + } + const metricsChanged = + state.status !== 'running' || + state.readRows !== qs.readRows || + state.writtenRows !== qs.writtenRows || + state.elapsedMs !== qs.elapsedMs + return { state: next, changed: metricsChanged } } if (qs.status === 'finished') { return { From da0a9bd3fde132c44021f61e0a50013283a12a5d Mon Sep 17 00:00:00 2001 From: KeKs0r Date: Sat, 28 Mar 2026 21:24:48 +0100 Subject: [PATCH 2/2] fix(clickhouse): remove invalid LIMIT clause from clusterAllReplicas query LIMIT after SETTINGS is a syntax error in ClickHouse distributed queries, causing queryStatus() to fail on every poll and backfill chunks to be marked as failed after maxPollErrors consecutive exceptions. Co-Authored-By: Claude Opus 4.6 --- packages/clickhouse/src/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/clickhouse/src/index.ts b/packages/clickhouse/src/index.ts index 704b371..2626b24 100644 --- a/packages/clickhouse/src/index.ts +++ b/packages/clickhouse/src/index.ts @@ -276,7 +276,7 @@ export function createClickHouseExecutor(config: NonNullable { try { const running = await client.query({ - query: `SELECT read_rows, read_bytes, written_rows, written_bytes, elapsed FROM clusterAllReplicas('parallel_replicas', system.processes) WHERE query_id = {qid:String} SETTINGS skip_unavailable_shards = 1 LIMIT 1`, + query: `SELECT read_rows, read_bytes, written_rows, written_bytes, elapsed FROM clusterAllReplicas('parallel_replicas', system.processes) WHERE query_id = {qid:String} SETTINGS skip_unavailable_shards = 1`, query_params: { qid: queryId }, format: 'JSONEachRow', })