diff --git a/.changeset/async-query-backfill.md b/.changeset/async-query-backfill.md new file mode 100644 index 0000000..43d7a87 --- /dev/null +++ b/.changeset/async-query-backfill.md @@ -0,0 +1,7 @@ +--- +"@chkit/plugin-backfill": patch +"@chkit/clickhouse": patch +"chkit": patch +--- + +Replace sequential backfill execution with async query submission and server-side polling. Chunks are submitted as fire-and-forget queries to ClickHouse and polled via `system.processes`/`system.query_log`, with configurable concurrency (`--concurrency`) and poll interval (`--poll-interval`). Removes the old synchronous executor, runtime, simulation flags, compatibility tokens, and event logging. diff --git a/apps/docs/src/content/docs/plugins/backfill.md b/apps/docs/src/content/docs/plugins/backfill.md index 864a0bd..0cc6f96 100644 --- a/apps/docs/src/content/docs/plugins/backfill.md +++ b/apps/docs/src/content/docs/plugins/backfill.md @@ -1,6 +1,6 @@ --- title: Backfill Plugin -description: Plan, execute, and monitor time-windowed backfill operations with checkpointed progress and automatic retries. +description: Plan, execute, and monitor time-windowed backfill operations with async query submission, concurrent execution, and checkpointed progress. --- This document covers practical usage of the optional `backfill` plugin. @@ -8,21 +8,21 @@ This document covers practical usage of the optional `backfill` plugin. ## What it does - Builds deterministic, immutable backfill plans that divide a time window into chunks. -- Executes backfills against ClickHouse with per-chunk checkpointing, automatic retries, and idempotency tokens. +- Executes backfills via async query submission with configurable concurrency and server-side polling. - Detects materialized views and automatically generates correct CTE-wrapped replay queries. - Supports resume from checkpoint, cancel, status monitoring, and doctor-style diagnostics. - Integrates with [`chkit check`](/cli/check/) for CI enforcement of pending backfills. -- Persists all state as JSON/NDJSON on disk. +- Persists all state as JSON on disk. ## How it fits your workflow The plugin follows a plan-then-execute lifecycle: 1. `plan` — Build an immutable backfill plan dividing the time window into chunks. -2. `run` — Execute the plan with checkpointed progress. +2. `run` — Submit chunks as async queries to ClickHouse with concurrent execution and progress polling. 3. `status` — Monitor chunk progress and run state. -Additional commands: `resume` (continue from checkpoint), `cancel` (stop execution), `doctor` (actionable diagnostics). +Additional commands: `resume` (continue from checkpoint with optional failed-chunk replay), `cancel` (mark run as cancelled), `doctor` (actionable diagnostics). [`chkit check`](/cli/check/) integration reports pending or failed backfills in CI. @@ -39,24 +39,17 @@ export default defineConfig({ plugins: [ backfill({ stateDir: './chkit/backfill', - defaults: { - chunkHours: 6, - maxParallelChunks: 1, - maxRetriesPerChunk: 3, - retryDelayMs: 1000, - requireIdempotencyToken: true, - timeColumn: 'created_at', - }, - policy: { - requireDryRunBeforeRun: true, - requireExplicitWindow: true, - blockOverlappingRuns: true, - failCheckOnRequiredPendingBackfill: true, - }, - limits: { - maxWindowHours: 720, - minChunkMinutes: 15, - }, + chunkHours: 6, + maxParallelChunks: 1, + maxRetriesPerChunk: 3, + requireIdempotencyToken: true, + timeColumn: 'created_at', + requireDryRunBeforeRun: true, + requireExplicitWindow: true, + blockOverlappingRuns: true, + failCheckOnRequiredPendingBackfill: true, + maxWindowHours: 720, + minChunkMinutes: 15, }), ], }) @@ -121,24 +114,23 @@ This requires importing `@chkit/plugin-backfill` somewhere in the project (typic ## Options -Configuration is organized into three groups plus a top-level `stateDir`. +All options are passed as a flat object to `backfill({...})`. They are grouped here by function for readability. -**Top-level:** +- `stateDir` (default: `/backfill`) — Directory for plan and run state files. -- `stateDir` (default: `/backfill`) — Directory for plan, run, and event state files. - -**`defaults` group:** +**Planning defaults:** | Option | Type | Default | Description | |--------|------|---------|-------------| | `chunkHours` | `number` | `6` | Hours per chunk | -| `maxParallelChunks` | `number` | `1` | Max concurrent chunks | +| `maxChunkBytes` | `string \| number` | `10G` | Max bytes per chunk (accepts suffixes: `K`, `M`, `G`, `T`) | +| `maxParallelChunks` | `number` | `1` | Max concurrent chunks in plan | | `maxRetriesPerChunk` | `number` | `3` | Retry budget per chunk | | `retryDelayMs` | `number` | `1000` | Exponential backoff delay between retries (milliseconds) | | `requireIdempotencyToken` | `boolean` | `true` | Generate deterministic tokens | | `timeColumn` | `string` | auto-detect | Fallback column name for time-based WHERE clause (overridden by schema-level config) | -**`policy` group:** +**Policy:** | Option | Type | Default | Description | |--------|------|---------|-------------| @@ -147,7 +139,7 @@ Configuration is organized into three groups plus a top-level `stateDir`. | `blockOverlappingRuns` | `boolean` | `true` | Prevent concurrent runs | | `failCheckOnRequiredPendingBackfill` | `boolean` | `true` | Fail `chkit check` on incomplete backfills | -**`limits` group:** +**Limits:** | Option | Type | Default | Description | |--------|------|---------|-------------| @@ -176,28 +168,25 @@ Build a deterministic backfill plan and persist immutable plan state. ### `chkit plugin backfill run` -Execute a planned backfill with checkpointed chunk progress. +Execute a planned backfill by submitting chunks as async queries to ClickHouse with concurrent execution and progress polling. | Flag | Required | Description | |------|----------|-------------| | `--plan-id ` | Yes | Plan ID (16-char hex) | -| `--replay-done` | No | Re-execute already-completed chunks | -| `--replay-failed` | No | Re-execute failed chunks | -| `--force-overlap` | No | Allow concurrent runs for the same target | -| `--force-compatibility` | No | Skip compatibility token check | +| `--concurrency ` | No | Max concurrent async queries (default: `3`) | +| `--poll-interval ` | No | Polling interval in milliseconds (default: `5000`) | | `--force-environment` | No | Skip environment mismatch check (plan was created for a different ClickHouse cluster/database) | ### `chkit plugin backfill resume` -Resume a backfill run from last checkpoint. Automatically retries failed chunks. +Resume a backfill run from last checkpoint. Picks up where the previous run left off, executing only pending chunks. | Flag | Required | Description | |------|----------|-------------| | `--plan-id ` | Yes | Plan ID (16-char hex) | -| `--replay-done` | No | Re-execute already-completed chunks | -| `--replay-failed` | No | Re-execute failed chunks (enabled by default on resume) | -| `--force-overlap` | No | Allow concurrent runs for the same target | -| `--force-compatibility` | No | Skip compatibility token check | +| `--concurrency ` | No | Max concurrent async queries (default: `3`) | +| `--poll-interval ` | No | Polling interval in milliseconds (default: `5000`) | +| `--replay-failed` | No | Reset failed chunks to pending and re-execute them | | `--force-environment` | No | Skip environment mismatch check (plan was created for a different ClickHouse cluster/database) | ### `chkit plugin backfill status` @@ -244,7 +233,6 @@ All state is persisted to the configured `stateDir`: / plans/.json # Immutable plan state (written once) runs/.json # Mutable run checkpoint (updated per chunk) - events/.ndjson # Append-only event log ``` Plan IDs are deterministic: `sha256("|||||")` truncated to 16 hex characters. When a ClickHouse connection is configured, an environment fingerprint is included in the plan ID, so different clusters/databases automatically produce different plan files. Re-planning with the same parameters produces the same plan ID. @@ -287,6 +275,3 @@ chkit plugin backfill resume --plan-id # automatically retries failed chkit check # fails if pending backfills exist ``` -## Current limits - -- `maxParallelChunks` is declared but execution is currently sequential. diff --git a/bun.lock b/bun.lock index a3a4a53..902efff 100644 --- a/bun.lock +++ b/bun.lock @@ -10,7 +10,9 @@ "devDependencies": { "@biomejs/biome": "^2.3.14", "@changesets/cli": "^2.29.8", + "@chkit/plugin-backfill": "workspace:*", "@types/node": "^24.0.0", + "p-map": "^7.0.4", "turbo": "^2.8.20", "typescript": "^5.8.0", }, @@ -71,6 +73,7 @@ "dependencies": { "@chkit/clickhouse": "workspace:*", "@chkit/core": "workspace:*", + "p-map": "^7.0.4", "zod": "^4.3.6", }, }, @@ -966,7 +969,7 @@ "p-locate": ["p-locate@4.1.0", "", { "dependencies": { "p-limit": "^2.2.0" } }, "sha512-R79ZZ/0wAxKGu3oYMlz8jy/kbhsNrS7SKZ7PxEHBgJ5+F2mtFW2fK2cOtBh1cHYkQsbzFV7I+EoRKe6Yt0oK7A=="], - "p-map": ["p-map@2.1.0", "", {}, "sha512-y3b8Kpd8OAN444hxfBbFfj1FY/RjtTd8tzYwhUqNYXx0fXx2iX4maP4Qr6qhIKbQXI02wTLAda4fYUbDagTUFw=="], + "p-map": ["p-map@7.0.4", "", {}, "sha512-tkAQEw8ysMzmkhgw8k+1U/iPhWNhykKnSk4Rd5zLoPJCuJaGRPo6YposrZgaxHKzDHdDWWZvE/Sk7hsL2X/CpQ=="], "p-queue": ["p-queue@8.1.1", "", { "dependencies": { "eventemitter3": "^5.0.1", "p-timeout": "^6.1.2" } }, "sha512-aNZ+VfjobsWryoiPnEApGGmf5WmNsCo9xu8dfaYamG5qaLP7ClhLN6NgsFe6SwJ2UbLEBK5dv9x8Mn5+RVhMWQ=="], @@ -1288,6 +1291,8 @@ "micromatch/picomatch": ["picomatch@2.3.1", "", {}, "sha512-JU3teHTNjmE2VCGFzuY8EXzCDVwEqB2a8fsIvwaStHhAWJEeVd1o1QD80CU6+ZdEXXSLbSsuLwJjkCBWqRQUVA=="], + "p-filter/p-map": ["p-map@2.1.0", "", {}, "sha512-y3b8Kpd8OAN444hxfBbFfj1FY/RjtTd8tzYwhUqNYXx0fXx2iX4maP4Qr6qhIKbQXI02wTLAda4fYUbDagTUFw=="], + "parse-entities/@types/unist": ["@types/unist@2.0.11", "", {}, "sha512-CmBKiL6NNo/OqgmMn95Fk9Whlp2mtvIv+KNpQKN2F4SjvrEesubTRWGYSg+BnWZOnlCaSTU1sMpsBOzgbYhnsA=="], "read-yaml-file/js-yaml": ["js-yaml@3.14.2", "", { "dependencies": { "argparse": "^1.0.7", "esprima": "^4.0.0" }, "bin": { "js-yaml": "bin/js-yaml.js" } }, "sha512-PMSmkqxr106Xa156c2M265Z+FTrPl+oxd/rgOQy2tijQeK5TxQ43psO1ZCwhVOSdnn+RzkzlRz/eY4BgJBYVpg=="], diff --git a/package.json b/package.json index 2c7031f..4f8b9d9 100644 --- a/package.json +++ b/package.json @@ -36,7 +36,9 @@ "devDependencies": { "@biomejs/biome": "^2.3.14", "@changesets/cli": "^2.29.8", + "@chkit/plugin-backfill": "workspace:*", "@types/node": "^24.0.0", + "p-map": "^7.0.4", "turbo": "^2.8.20", "typescript": "^5.8.0" }, diff --git a/packages/cli/src/plugin.test.ts b/packages/cli/src/plugin.test.ts index e970871..fd0923e 100644 --- a/packages/cli/src/plugin.test.ts +++ b/packages/cli/src/plugin.test.ts @@ -44,8 +44,14 @@ async function waitForParts( ): Promise { const start = Date.now() while (Date.now() - start < timeoutMs) { - // Sync replica state for the target table first, then check system.parts - await db.query(`SELECT 1 FROM ${database}.${table} LIMIT 1 SETTINGS select_sequential_consistency = 1`) + try { + // Sync replica state for the target table first, then check system.parts + await db.query(`SELECT 1 FROM ${database}.${table} LIMIT 1 SETTINGS select_sequential_consistency = 1`) + } catch { + // Table may not be visible yet on ClickHouse Cloud (DDL propagation) + await new Promise((r) => setTimeout(r, 500)) + continue + } const rows = await db.query<{ cnt: string }>( `SELECT count(DISTINCT partition) AS cnt FROM system.parts WHERE database = '${database}' AND table = '${table}' AND active SETTINGS select_sequential_consistency = 1` ) @@ -244,7 +250,7 @@ describe('plugin runtime', () => { } expect(payload.ok).toBe(true) expect(payload.planId).toMatch(/^[a-f0-9]{16}$/) - expect(payload.chunkCount).toBe(2) + expect(payload.chunkCount).toBeGreaterThanOrEqual(1) expect(existsSync(payload.planPath)).toBe(true) } finally { await rm(fixture.dir, { recursive: true, force: true }) @@ -253,7 +259,7 @@ describe('plugin runtime', () => { await db.command(`DROP TABLE IF EXISTS ${chEnv.database}.${tableName}`) await db.close() } - }) + }, 120_000) test('chkit plugin backfill run and status complete planned chunks', async () => { const chEnv = getClickHouseEnv() @@ -262,8 +268,8 @@ describe('plugin runtime', () => { const db = createClickHouseExecutor(chEnv) try { await db.command(`CREATE TABLE ${chEnv.database}.${tableName} (id UInt64, event_time DateTime) ENGINE = MergeTree() PARTITION BY toYYYYMMDD(event_time) ORDER BY (event_time, id)`) - await db.command(`INSERT INTO ${chEnv.database}.${tableName} VALUES (1, '2026-01-01 12:00:00'), (2, '2026-01-02 12:00:00'), (3, '2026-01-03 12:00:00')`) - await waitForParts(db, chEnv.database, tableName, 3) + await db.command(`INSERT INTO ${chEnv.database}.${tableName} VALUES (1, '2026-01-01 12:00:00'), (2, '2026-01-02 12:00:00')`) + await waitForParts(db, chEnv.database, tableName, 2) const fixture = await createFixture() const pluginPath = join(fixture.dir, 'backfill-plugin.ts') @@ -289,7 +295,7 @@ describe('plugin runtime', () => { '--from', '2026-01-01T00:00:00.000Z', '--to', - '2026-01-04T00:00:00.000Z', + '2026-01-03T00:00:00.000Z', '--config', fixture.configPath, '--json', @@ -303,6 +309,8 @@ describe('plugin runtime', () => { 'run', '--plan-id', planPayload.planId, + '--poll-interval', + '1000', '--config', fixture.configPath, '--json', @@ -313,8 +321,8 @@ describe('plugin runtime', () => { chunkCounts: { done: number; total: number; failed: number } } expect(runPayload.status).toBe('completed') - expect(runPayload.chunkCounts.done).toBe(3) - expect(runPayload.chunkCounts.total).toBe(3) + expect(runPayload.chunkCounts.total).toBeGreaterThanOrEqual(1) + expect(runPayload.chunkCounts.done).toBe(runPayload.chunkCounts.total) expect(runPayload.chunkCounts.failed).toBe(0) const status = runCli([ @@ -330,10 +338,10 @@ describe('plugin runtime', () => { expect(status.exitCode).toBe(0) const statusPayload = JSON.parse(status.stdout) as { status: string - chunkCounts: { done: number; failed: number } + chunkCounts: { done: number; total: number; failed: number } } expect(statusPayload.status).toBe('completed') - expect(statusPayload.chunkCounts.done).toBe(3) + expect(statusPayload.chunkCounts.done).toBe(statusPayload.chunkCounts.total) expect(statusPayload.chunkCounts.failed).toBe(0) } finally { await rm(fixture.dir, { recursive: true, force: true }) @@ -342,24 +350,24 @@ describe('plugin runtime', () => { await db.command(`DROP TABLE IF EXISTS ${chEnv.database}.${tableName}`) await db.close() } - }) + }, 120_000) - test('chkit plugin backfill fail then resume without replaying done chunks', async () => { + test('chkit plugin backfill resume on completed run is a no-op', async () => { const chEnv = getClickHouseEnv() const chConfig = clickhouseConfigBlock(chEnv) const tableName = `chkit_e2e_bf_resume_${Date.now()}_${Math.floor(Math.random() * 100000)}` const db = createClickHouseExecutor(chEnv) try { await db.command(`CREATE TABLE ${chEnv.database}.${tableName} (id UInt64, event_time DateTime) ENGINE = MergeTree() PARTITION BY toYYYYMMDD(event_time) ORDER BY (event_time, id)`) - await db.command(`INSERT INTO ${chEnv.database}.${tableName} VALUES (1, '2026-01-01 12:00:00'), (2, '2026-01-02 12:00:00'), (3, '2026-01-03 12:00:00')`) - await waitForParts(db, chEnv.database, tableName, 3) + await db.command(`INSERT INTO ${chEnv.database}.${tableName} VALUES (1, '2026-01-01 12:00:00'), (2, '2026-01-02 12:00:00')`) + await waitForParts(db, chEnv.database, tableName, 2) const fixture = await createFixture() const pluginPath = join(fixture.dir, 'backfill-plugin.ts') try { await writeFile( pluginPath, - `import { createBackfillPlugin } from '${BACKFILL_PLUGIN_ENTRY}'\n\nexport default createBackfillPlugin({ maxRetriesPerChunk: 1 })\n`, + `import { createBackfillPlugin } from '${BACKFILL_PLUGIN_ENTRY}'\n\nexport default createBackfillPlugin()\n`, 'utf8' ) @@ -378,44 +386,29 @@ describe('plugin runtime', () => { '--from', '2026-01-01T00:00:00.000Z', '--to', - '2026-01-04T00:00:00.000Z', + '2026-01-03T00:00:00.000Z', '--config', fixture.configPath, '--json', ]) expect(planned.exitCode).toBe(0) - const planPayload = JSON.parse(planned.stdout) as { - planId: string - planPath: string - } - const planState = JSON.parse(await readFile(planPayload.planPath, 'utf8')) as { - chunks: Array<{ id: string }> - } - const failChunkId = planState.chunks[1]?.id - expect(failChunkId).toBeTruthy() + const planPayload = JSON.parse(planned.stdout) as { planId: string } - const failedRun = runCli([ + const ran = runCli([ 'plugin', 'backfill', 'run', '--plan-id', planPayload.planId, - '--simulate-fail-chunk', - failChunkId as string, - '--simulate-fail-count', - '1', + '--poll-interval', + '1000', '--config', fixture.configPath, '--json', ]) - expect(failedRun.exitCode).toBe(1) - const failedPayload = JSON.parse(failedRun.stdout) as { - status: string - chunkCounts: { done: number; failed: number } - } - expect(failedPayload.status).toBe('failed') - expect(failedPayload.chunkCounts.done).toBe(2) - expect(failedPayload.chunkCounts.failed).toBe(1) + expect(ran.exitCode).toBe(0) + const ranPayload = JSON.parse(ran.stdout) as { status: string } + expect(ranPayload.status).toBe('completed') const resumed = runCli([ 'plugin', @@ -423,26 +416,15 @@ describe('plugin runtime', () => { 'resume', '--plan-id', planPayload.planId, - '--replay-failed', + '--poll-interval', + '1000', '--config', fixture.configPath, '--json', ]) expect(resumed.exitCode).toBe(0) - const resumedPayload = JSON.parse(resumed.stdout) as { - status: string - chunkCounts: { done: number } - runPath: string - } - expect(resumedPayload.status).toBe('completed') - expect(resumedPayload.chunkCounts.done).toBe(3) - - const runState = JSON.parse(await readFile(resumedPayload.runPath, 'utf8')) as { - chunks: Array<{ id: string; attempts: number }> - } - const firstChunkId = planState.chunks[0]?.id - const firstChunk = runState.chunks.find((chunk) => chunk.id === firstChunkId) - expect(firstChunk?.attempts).toBe(1) + const resumedPayload = JSON.parse(resumed.stdout) as { noop?: boolean } + expect(resumedPayload.noop).toBe(true) } finally { await rm(fixture.dir, { recursive: true, force: true }) } @@ -450,7 +432,7 @@ describe('plugin runtime', () => { await db.command(`DROP TABLE IF EXISTS ${chEnv.database}.${tableName}`) await db.close() } - }) + }, 120_000) test('chkit check --json requires clickhouse config', async () => { const fixture = await createFixture() @@ -500,27 +482,27 @@ describe('plugin runtime', () => { } }) - test('chkit plugin backfill resume enforces compatibility check unless force override is provided', async () => { + test('chkit plugin backfill resume requires existing run state', async () => { const chEnv = getClickHouseEnv() const chConfig = clickhouseConfigBlock(chEnv) const tableName = `chkit_e2e_bf_compat_${Date.now()}_${Math.floor(Math.random() * 100000)}` const db = createClickHouseExecutor(chEnv) try { await db.command(`CREATE TABLE ${chEnv.database}.${tableName} (id UInt64, event_time DateTime) ENGINE = MergeTree() PARTITION BY toYYYYMMDD(event_time) ORDER BY (event_time, id)`) - await db.command(`INSERT INTO ${chEnv.database}.${tableName} VALUES (1, '2026-01-01 12:00:00'), (2, '2026-01-02 12:00:00'), (3, '2026-01-03 12:00:00')`) - await waitForParts(db, chEnv.database, tableName, 3) + await db.command(`INSERT INTO ${chEnv.database}.${tableName} VALUES (1, '2026-01-01 12:00:00'), (2, '2026-01-02 12:00:00')`) + await waitForParts(db, chEnv.database, tableName, 2) const fixture = await createFixture() const pluginPath = join(fixture.dir, 'backfill-plugin.ts') try { await writeFile( pluginPath, - `import { createBackfillPlugin } from '${BACKFILL_PLUGIN_ENTRY}'\n\nexport default createBackfillPlugin({ maxRetriesPerChunk: 1 })\n`, + `import { createBackfillPlugin } from '${BACKFILL_PLUGIN_ENTRY}'\n\nexport default createBackfillPlugin()\n`, 'utf8' ) await writeFile( fixture.configPath, - `export default {\n schema: '${fixture.schemaPath}',\n outDir: '${join(fixture.dir, 'chkit')}',\n migrationsDir: '${fixture.migrationsDir}',\n metaDir: '${fixture.metaDir}',\n ${chConfig}\n plugins: [{ resolve: './backfill-plugin.ts', options: { maxRetriesPerChunk: 1 } }],\n}\n`, + `export default {\n schema: '${fixture.schemaPath}',\n outDir: '${join(fixture.dir, 'chkit')}',\n migrationsDir: '${fixture.migrationsDir}',\n metaDir: '${fixture.metaDir}',\n ${chConfig}\n plugins: [{ resolve: './backfill-plugin.ts' }],\n}\n`, 'utf8' ) @@ -533,38 +515,13 @@ describe('plugin runtime', () => { '--from', '2026-01-01T00:00:00.000Z', '--to', - '2026-01-04T00:00:00.000Z', + '2026-01-03T00:00:00.000Z', '--config', fixture.configPath, '--json', ]) expect(planned.exitCode).toBe(0) - const planPayload = JSON.parse(planned.stdout) as { planId: string; planPath: string } - const planState = JSON.parse(await readFile(planPayload.planPath, 'utf8')) as { - chunks: Array<{ id: string }> - } - - const failed = runCli([ - 'plugin', - 'backfill', - 'run', - '--plan-id', - planPayload.planId, - '--simulate-fail-chunk', - planState.chunks[1]?.id as string, - '--simulate-fail-count', - '1', - '--config', - fixture.configPath, - '--json', - ]) - expect(failed.exitCode).toBe(1) - - await writeFile( - fixture.configPath, - `export default {\n schema: '${fixture.schemaPath}',\n outDir: '${join(fixture.dir, 'chkit')}',\n migrationsDir: '${fixture.migrationsDir}',\n metaDir: '${fixture.metaDir}',\n ${chConfig}\n plugins: [{ resolve: './backfill-plugin.ts', options: { maxRetriesPerChunk: 5 } }],\n}\n`, - 'utf8' - ) + const planPayload = JSON.parse(planned.stdout) as { planId: string } const blockedResume = runCli([ 'plugin', @@ -572,29 +529,12 @@ describe('plugin runtime', () => { 'resume', '--plan-id', planPayload.planId, - '--replay-failed', '--config', fixture.configPath, '--json', ]) expect(blockedResume.exitCode).toBe(2) - expect(blockedResume.stdout).toContain('compatibility check failed') - - const forcedResume = runCli([ - 'plugin', - 'backfill', - 'resume', - '--plan-id', - planPayload.planId, - '--replay-failed', - '--force-compatibility', - '--config', - fixture.configPath, - '--json', - ]) - expect(forcedResume.exitCode).toBe(0) - const forcedPayload = JSON.parse(forcedResume.stdout) as { status: string } - expect(forcedPayload.status).toBe('completed') + expect(blockedResume.stdout).toContain('Run state not found') } finally { await rm(fixture.dir, { recursive: true, force: true }) } @@ -602,7 +542,7 @@ describe('plugin runtime', () => { await db.command(`DROP TABLE IF EXISTS ${chEnv.database}.${tableName}`) await db.close() } - }) + }, 120_000) test('chkit plugin backfill cancel and doctor provide operator remediation flow', async () => { const chEnv = getClickHouseEnv() @@ -611,15 +551,15 @@ describe('plugin runtime', () => { const db = createClickHouseExecutor(chEnv) try { await db.command(`CREATE TABLE ${chEnv.database}.${tableName} (id UInt64, event_time DateTime) ENGINE = MergeTree() PARTITION BY toYYYYMMDD(event_time) ORDER BY (event_time, id)`) - await db.command(`INSERT INTO ${chEnv.database}.${tableName} VALUES (1, '2026-01-01 12:00:00'), (2, '2026-01-02 12:00:00'), (3, '2026-01-03 12:00:00')`) - await waitForParts(db, chEnv.database, tableName, 3) + await db.command(`INSERT INTO ${chEnv.database}.${tableName} VALUES (1, '2026-01-01 12:00:00'), (2, '2026-01-02 12:00:00')`) + await waitForParts(db, chEnv.database, tableName, 2) const fixture = await createFixture() const pluginPath = join(fixture.dir, 'backfill-plugin.ts') try { await writeFile( pluginPath, - `import { createBackfillPlugin } from '${BACKFILL_PLUGIN_ENTRY}'\n\nexport default createBackfillPlugin({ maxRetriesPerChunk: 1 })\n`, + `import { createBackfillPlugin } from '${BACKFILL_PLUGIN_ENTRY}'\n\nexport default createBackfillPlugin()\n`, 'utf8' ) await writeFile( @@ -637,15 +577,12 @@ describe('plugin runtime', () => { '--from', '2026-01-01T00:00:00.000Z', '--to', - '2026-01-04T00:00:00.000Z', + '2026-01-03T00:00:00.000Z', '--config', fixture.configPath, '--json', ]) - const planPayload = JSON.parse(planned.stdout) as { planId: string; planPath: string } - const planState = JSON.parse(await readFile(planPayload.planPath, 'utf8')) as { - chunks: Array<{ id: string }> - } + const planPayload = JSON.parse(planned.stdout) as { planId: string } runCli([ 'plugin', @@ -653,16 +590,28 @@ describe('plugin runtime', () => { 'run', '--plan-id', planPayload.planId, - '--simulate-fail-chunk', - planState.chunks[1]?.id as string, - '--simulate-fail-count', - '1', + '--poll-interval', + '1000', '--config', fixture.configPath, '--json', ]) - const cancelled = runCli([ + // Doctor on completed run should report no issues + const doctorOk = runCli([ + 'plugin', + 'backfill', + 'doctor', + '--plan-id', + planPayload.planId, + '--config', + fixture.configPath, + '--json', + ]) + expect(doctorOk.exitCode).toBe(0) + + // Cancel on completed run should fail + const cancelCompleted = runCli([ 'plugin', 'backfill', 'cancel', @@ -672,27 +621,47 @@ describe('plugin runtime', () => { fixture.configPath, '--json', ]) - expect(cancelled.exitCode).toBe(0) - const cancelPayload = JSON.parse(cancelled.stdout) as { status: string } - expect(cancelPayload.status).toBe('cancelled') + expect(cancelCompleted.exitCode).toBe(2) + expect(cancelCompleted.stdout).toContain('already completed') + + // Insert data for the second plan's time range + await db.command(`INSERT INTO ${chEnv.database}.${tableName} VALUES (4, '2026-01-05 12:00:00'), (5, '2026-01-06 12:00:00')`) + await waitForParts(db, chEnv.database, tableName, 4) - const doctor = runCli([ + // Plan a second backfill that we won't run — doctor should flag it + const planned2 = runCli([ + 'plugin', + 'backfill', + 'plan', + '--target', + `${chEnv.database}.${tableName}`, + '--from', + '2026-01-04T00:00:00.000Z', + '--to', + '2026-01-07T00:00:00.000Z', + '--config', + fixture.configPath, + '--json', + ]) + const plan2Payload = JSON.parse(planned2.stdout) as { planId: string } + + const doctor2 = runCli([ 'plugin', 'backfill', 'doctor', '--plan-id', - planPayload.planId, + plan2Payload.planId, '--config', fixture.configPath, '--json', ]) - expect(doctor.exitCode).toBe(1) - const doctorPayload = JSON.parse(doctor.stdout) as { + expect(doctor2.exitCode).toBe(1) + const doctorPayload = JSON.parse(doctor2.stdout) as { issueCodes: string[] recommendations: string[] } - expect(doctorPayload.issueCodes).toContain('backfill_required_pending') - expect(doctorPayload.recommendations.join(' ')).toContain('backfill resume') + expect(doctorPayload.issueCodes).toContain('backfill_plan_missing') + expect(doctorPayload.recommendations.join(' ')).toContain('backfill run') } finally { await rm(fixture.dir, { recursive: true, force: true }) } @@ -700,7 +669,7 @@ describe('plugin runtime', () => { await db.command(`DROP TABLE IF EXISTS ${chEnv.database}.${tableName}`) await db.close() } - }) + }, 120_000) test('chkit codegen writes output file', async () => { const fixture = await createFixture() diff --git a/packages/clickhouse/src/index.ts b/packages/clickhouse/src/index.ts index 8f427df..fdd5c8b 100644 --- a/packages/clickhouse/src/index.ts +++ b/packages/clickhouse/src/index.ts @@ -17,12 +17,35 @@ import { parseUniqueKeyFromCreateTableQuery, } from './create-table-parser.js' +export interface QueryStatus { + status: 'running' | 'finished' | 'failed' | 'unknown' + writtenRows?: number + writtenBytes?: number + durationMs?: number + error?: string +} + export interface ClickHouseExecutor { command(sql: string): Promise query(sql: string): Promise insert>(params: { table: string; values: T[] }): Promise listSchemaObjects(): Promise listTableDetails(databases: string[]): Promise + + /** Submit a query asynchronously. ClickHouse accepts the query and processes it server-side. + * Returns immediately without waiting for completion. + * @param sql - The SQL to execute + * @param queryId - Optional deterministic query_id (useful for resumability). Auto-generated if omitted. + * @returns The query_id assigned to this query. */ + submit(sql: string, queryId?: string): Promise + + /** Check the status of a previously submitted query. + * Checks system.processes first (running?), then system.query_log (finished/failed?). + * @param queryId - The query_id returned by submit() + * @param options.afterTime - Only consider query_log entries for queries started at or after this ISO timestamp. + * Useful when resubmitting with the same query_id to ignore stale entries from previous attempts. */ + queryStatus(queryId: string, options?: { afterTime?: string }): Promise + close(): Promise } @@ -185,6 +208,16 @@ export function createClickHouseExecutor(config: NonNullable { try { @@ -211,7 +244,7 @@ export function createClickHouseExecutor(config: NonNullable(sql: string): Promise { try { - const result = await client.query({ query: sql, format: 'JSONEachRow' }) + const result = await client.query({ query: sql, format: 'JSONEachRow', http_headers: { 'X-DDL': '1' } }) return result.json() } catch (error) { wrapConnectionError(error, config.url) @@ -228,8 +261,74 @@ export function createClickHouseExecutor(config: NonNullable { + const id = queryId ?? crypto.randomUUID() + try { + await fireAndForgetClient.command({ query: sql, query_id: id }) + } catch (error) { + wrapConnectionError(error, config.url) + } + return id + }, + async queryStatus(queryId: string, options?: { afterTime?: string }): Promise { + try { + const running = await client.query({ + query: `SELECT count() AS cnt FROM clusterAllReplicas('parallel_replicas', system.processes) WHERE query_id = {qid:String} SETTINGS skip_unavailable_shards = 1`, + query_params: { qid: queryId }, + format: 'JSONEachRow', + }) + const runningRows = await running.json<{ cnt: string }>() + if (Number(runningRows[0]?.cnt) > 0) { + return { status: 'running' } + } + + const afterTime = options?.afterTime ?? '1970-01-01T00:00:00Z' + const log = await client.query({ + query: `SELECT type, written_rows, written_bytes, query_duration_ms, exception +FROM clusterAllReplicas('parallel_replicas', system.query_log) +WHERE query_id = {qid:String} + AND type IN ('QueryFinish', 'ExceptionWhileProcessing') + AND is_initial_query = 1 + AND query_start_time >= parseDateTimeBestEffort({after:String}) +ORDER BY event_time DESC +LIMIT 1 +SETTINGS skip_unavailable_shards = 1`, + query_params: { qid: queryId, after: afterTime }, + format: 'JSONEachRow', + }) + const logRows = await log.json<{ + type: string + written_rows: string + written_bytes: string + query_duration_ms: string + exception: string + }>() + + if (logRows.length === 0) { + return { status: 'unknown' } + } + + const row = logRows[0]! + if (row.type === 'QueryFinish') { + return { + status: 'finished', + writtenRows: Number(row.written_rows), + writtenBytes: Number(row.written_bytes), + durationMs: Number(row.query_duration_ms), + } + } + + return { + status: 'failed', + durationMs: Number(row.query_duration_ms), + error: row.exception, + } + } catch (error) { + wrapConnectionError(error, config.url) + } + }, async close(): Promise { - await client.close() + await Promise.all([client.close(), fireAndForgetClient.close()]) }, async listSchemaObjects(): Promise { const rows = await this.query( diff --git a/packages/plugin-backfill/package.json b/packages/plugin-backfill/package.json index 169e288..27ee8b7 100644 --- a/packages/plugin-backfill/package.json +++ b/packages/plugin-backfill/package.json @@ -43,6 +43,7 @@ "dependencies": { "@chkit/clickhouse": "workspace:*", "@chkit/core": "workspace:*", + "p-map": "^7.0.4", "zod": "^4.3.6" } } diff --git a/packages/plugin-backfill/src/async-backfill.test.ts b/packages/plugin-backfill/src/async-backfill.test.ts new file mode 100644 index 0000000..d92df05 --- /dev/null +++ b/packages/plugin-backfill/src/async-backfill.test.ts @@ -0,0 +1,314 @@ +import { describe, expect, test } from 'bun:test' +import type { ClickHouseExecutor, QueryStatus } from '@chkit/clickhouse' +import { executeBackfill, syncProgress, type BackfillProgress } from './async-backfill.js' + +const PLAN_ID = 'test-plan' + +/** Match query IDs by chunk prefix (e.g. "backfill-test-plan-c1") */ +function createMockExecutor(statuses: Map): ClickHouseExecutor { + const callCounts = new Map() + + return { + async command() {}, + async query() { return [] }, + async insert() {}, + async listSchemaObjects() { return [] }, + async listTableDetails() { return [] }, + async submit(_sql: string, queryId?: string): Promise { + return queryId ?? crypto.randomUUID() + }, + async queryStatus(queryId: string): Promise { + const list = statuses.get(queryId) ?? [{ status: 'unknown' }] + const count = callCounts.get(queryId) ?? 0 + callCounts.set(queryId, count + 1) + return list[Math.min(count, list.length - 1)] + }, + async close() {}, + } +} + +describe('executeBackfill', () => { + const chunks = [ + { id: 'c1', from: '2024-01-01', to: '2024-01-02' }, + { id: 'c2', from: '2024-01-02', to: '2024-01-03' }, + ] + + test('completes all chunks', async () => { + const statuses = new Map([ + [`backfill-${PLAN_ID}-c1`, [{ status: 'running' }, { status: 'finished', writtenRows: 100, writtenBytes: 500, durationMs: 200 }]], + [`backfill-${PLAN_ID}-c2`, [{ status: 'finished', writtenRows: 50, writtenBytes: 250, durationMs: 100 }]], + ]) + + const result = await executeBackfill({ + executor: createMockExecutor(statuses), + planId: PLAN_ID, + chunks, + buildQuery: (c) => `INSERT INTO t SELECT * FROM s WHERE d >= '${c.from}' AND d < '${c.to}'`, + concurrency: 2, + pollIntervalMs: 10, + }) + + expect(result.total).toBe(2) + expect(result.completed).toBe(2) + expect(result.failed).toBe(0) + expect(result.progress.c1.status).toBe('done') + expect(result.progress.c2.status).toBe('done') + expect(result.progress.c1.writtenRows).toBe(100) + }) + + test('reports failed chunks', async () => { + const statuses = new Map([ + [`backfill-${PLAN_ID}-c1`, [{ status: 'failed', error: 'OOM', durationMs: 50 }]], + [`backfill-${PLAN_ID}-c2`, [{ status: 'finished', writtenRows: 10, writtenBytes: 40, durationMs: 30 }]], + ]) + + const result = await executeBackfill({ + executor: createMockExecutor(statuses), + planId: PLAN_ID, + chunks, + buildQuery: () => 'SELECT 1', + concurrency: 2, + pollIntervalMs: 10, + }) + + expect(result.completed).toBe(1) + expect(result.failed).toBe(1) + expect(result.progress.c1.status).toBe('failed') + expect(result.progress.c1.error).toBe('OOM') + expect(result.progress.c2.status).toBe('done') + }) + + test('respects concurrency limit', async () => { + const submitOrder: string[] = [] + const statuses = new Map([ + [`backfill-${PLAN_ID}-c1`, [{ status: 'running' }, { status: 'finished', writtenRows: 1, writtenBytes: 1, durationMs: 1 }]], + [`backfill-${PLAN_ID}-c2`, [{ status: 'finished', writtenRows: 1, writtenBytes: 1, durationMs: 1 }]], + ]) + + const executor = createMockExecutor(statuses) + const originalSubmit = executor.submit.bind(executor) + executor.submit = async (sql: string, queryId?: string) => { + const id = await originalSubmit(sql, queryId) + submitOrder.push(id) + return id + } + + const result = await executeBackfill({ + executor, + planId: PLAN_ID, + chunks, + buildQuery: () => 'SELECT 1', + concurrency: 1, + pollIntervalMs: 10, + }) + + expect(result.total).toBe(2) + expect(result.completed).toBe(2) + expect(submitOrder[0]).toBe(`backfill-${PLAN_ID}-c1`) + expect(submitOrder[1]).toBe(`backfill-${PLAN_ID}-c2`) + }) + + test('calls onProgress on state changes', async () => { + const statuses = new Map([ + [`backfill-${PLAN_ID}-c1`, [{ status: 'finished', writtenRows: 1, writtenBytes: 1, durationMs: 1 }]], + ]) + + const progressSnapshots: BackfillProgress[] = [] + + await executeBackfill({ + executor: createMockExecutor(statuses), + planId: PLAN_ID, + chunks: [chunks[0]], + buildQuery: () => 'SELECT 1', + pollIntervalMs: 10, + onProgress: (p) => { progressSnapshots.push({ ...p }) }, + }) + + expect(progressSnapshots.length).toBeGreaterThanOrEqual(1) + const lastSnapshot = progressSnapshots[progressSnapshots.length - 1] + expect(lastSnapshot.c1.status).toBe('done') + }) + + test('resumes from saved progress', async () => { + const queryId = `backfill-${PLAN_ID}-c2` + const statuses = new Map([ + [queryId, [{ status: 'finished', writtenRows: 50, writtenBytes: 250, durationMs: 100 }]], + ]) + + const resumeFrom: BackfillProgress = { + c1: { status: 'done', queryId: `backfill-${PLAN_ID}-c1`, writtenRows: 100 }, + c2: { status: 'submitted', queryId, submittedAt: '2024-01-01T00:00:00Z' }, + } + + const result = await executeBackfill({ + executor: createMockExecutor(statuses), + planId: PLAN_ID, + chunks, + buildQuery: () => 'SELECT 1', + pollIntervalMs: 10, + resumeFrom, + }) + + expect(result.completed).toBe(2) + expect(result.failed).toBe(0) + expect(result.progress.c1.status).toBe('done') + expect(result.progress.c1.writtenRows).toBe(100) + }) + + test('handles transient poll errors gracefully', async () => { + let callCount = 0 + const executor = createMockExecutor(new Map()) + executor.queryStatus = async (): Promise => { + callCount++ + if (callCount <= 2) throw new Error('ECONNRESET') + return { status: 'finished', writtenRows: 10, writtenBytes: 40, durationMs: 30 } + } + + const result = await executeBackfill({ + executor, + planId: PLAN_ID, + chunks: [chunks[0]], + buildQuery: () => 'SELECT 1', + pollIntervalMs: 10, + maxPollErrors: 5, + }) + + expect(result.completed).toBe(1) + expect(result.failed).toBe(0) + expect(result.progress.c1.status).toBe('done') + }) + + test('fails chunk after max consecutive poll errors', async () => { + const executor = createMockExecutor(new Map()) + executor.queryStatus = async (): Promise => { + throw new Error('ETIMEDOUT') + } + + const result = await executeBackfill({ + executor, + planId: PLAN_ID, + chunks: [chunks[0]], + buildQuery: () => 'SELECT 1', + pollIntervalMs: 10, + maxPollErrors: 3, + }) + + expect(result.completed).toBe(0) + expect(result.failed).toBe(1) + expect(result.progress.c1.status).toBe('failed') + expect(result.progress.c1.error).toContain('3 consecutive poll errors') + }) + + test('replayFailed resets failed chunks after sync', async () => { + const queryId = `backfill-${PLAN_ID}-c1` + const statuses = new Map([ + [queryId, [{ status: 'finished', writtenRows: 42, writtenBytes: 200, durationMs: 80 }]], + [`backfill-${PLAN_ID}-c2`, [{ status: 'finished', writtenRows: 10, writtenBytes: 50, durationMs: 20 }]], + ]) + + const resumeFrom: BackfillProgress = { + c1: { status: 'failed', queryId, error: 'OOM' }, + c2: { status: 'done', queryId: `backfill-${PLAN_ID}-c2`, writtenRows: 10 }, + } + + const result = await executeBackfill({ + executor: createMockExecutor(statuses), + planId: PLAN_ID, + chunks, + buildQuery: () => 'SELECT 1', + pollIntervalMs: 10, + resumeFrom, + replayFailed: true, + }) + + expect(result.completed).toBe(2) + expect(result.failed).toBe(0) + expect(result.progress.c1.status).toBe('done') + expect(result.progress.c1.writtenRows).toBe(42) + }) +}) + +describe('syncProgress', () => { + const chunks = [ + { id: 'c1' }, + { id: 'c2' }, + { id: 'c3' }, + ] + + test('discovers submitted-but-untracked queries from server', async () => { + const executor = createMockExecutor(new Map()) + // Mock query() to return server-side state + executor.query = async (sql: string): Promise => { + if (sql.includes('system.processes')) { + return [{ query_id: `backfill-${PLAN_ID}-c2` }] as T[] + } + if (sql.includes('system.query_log')) { + return [{ + query_id: `backfill-${PLAN_ID}-c1`, + type: 'QueryFinish', + written_rows: '500', + written_bytes: '2000', + query_duration_ms: '150', + exception: '', + }] as T[] + } + return [] as T[] + } + + const progress: BackfillProgress = { + c1: { status: 'pending' }, + c2: { status: 'pending' }, + c3: { status: 'pending' }, + } + + const synced = await syncProgress(executor, PLAN_ID, chunks, progress) + + // c1 was found completed in query_log + expect(synced.c1.status).toBe('done') + expect(synced.c1.writtenRows).toBe(500) + // c2 was found running in system.processes + expect(synced.c2.status).toBe('running') + expect(synced.c2.queryId).toBe(`backfill-${PLAN_ID}-c2`) + // c3 had no server state — stays pending + expect(synced.c3.status).toBe('pending') + }) + + test('does not downgrade done chunks', async () => { + const executor = createMockExecutor(new Map()) + executor.query = async (): Promise => [] as T[] + + const progress: BackfillProgress = { + c1: { status: 'done', queryId: `backfill-${PLAN_ID}-c1`, writtenRows: 100 }, + } + + const synced = await syncProgress(executor, PLAN_ID, [{ id: 'c1' }], progress) + expect(synced.c1.status).toBe('done') + expect(synced.c1.writtenRows).toBe(100) + }) + + test('updates failed server state for locally submitted chunk', async () => { + const executor = createMockExecutor(new Map()) + executor.query = async (sql: string): Promise => { + if (sql.includes('system.processes')) return [] as T[] + if (sql.includes('system.query_log')) { + return [{ + query_id: `backfill-${PLAN_ID}-c1`, + type: 'ExceptionWhileProcessing', + written_rows: '0', + written_bytes: '0', + query_duration_ms: '10', + exception: 'Memory limit exceeded', + }] as T[] + } + return [] as T[] + } + + const progress: BackfillProgress = { + c1: { status: 'submitted', queryId: `backfill-${PLAN_ID}-c1`, submittedAt: '2024-01-01T00:00:00Z' }, + } + + const synced = await syncProgress(executor, PLAN_ID, [{ id: 'c1' }], progress) + expect(synced.c1.status).toBe('failed') + expect(synced.c1.error).toBe('Memory limit exceeded') + }) +}) diff --git a/packages/plugin-backfill/src/async-backfill.ts b/packages/plugin-backfill/src/async-backfill.ts new file mode 100644 index 0000000..7c92260 --- /dev/null +++ b/packages/plugin-backfill/src/async-backfill.ts @@ -0,0 +1,346 @@ +import type { ClickHouseExecutor, QueryStatus } from '@chkit/clickhouse' +import pMap from 'p-map' + +export interface BackfillOptions { + /** The executor to submit queries to (target ClickHouse) */ + executor: ClickHouseExecutor + /** Plan ID used as a namespace in deterministic query IDs */ + planId: string + /** The chunks to process (from buildChunks) */ + chunks: Array<{ id: string; from: string; to: string; [key: string]: unknown }> + /** Build the SQL for a given chunk. Called once per chunk at submit time. */ + buildQuery: (chunk: { id: string; from: string; to: string }) => string + /** Max concurrent queries running on the server. Default: 3 */ + concurrency?: number + /** Polling interval in ms. Default: 5000 */ + pollIntervalMs?: number + /** Max consecutive poll errors before marking a chunk as failed. Default: 10 */ + maxPollErrors?: number + /** Called whenever progress changes. Use this to persist state. */ + onProgress?: (progress: BackfillProgress) => void | Promise + /** Previously saved progress to resume from. */ + resumeFrom?: BackfillProgress + /** When true, reset chunks confirmed failed (both locally and on server) back to pending. */ + replayFailed?: boolean +} + +export interface BackfillChunkState { + status: 'pending' | 'submitted' | 'running' | 'done' | 'failed' + queryId?: string + submittedAt?: string + finishedAt?: string + durationMs?: number + writtenRows?: number + writtenBytes?: number + error?: string +} + +export type BackfillProgress = Record + +export interface BackfillResult { + total: number + completed: number + failed: number + progress: BackfillProgress +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)) +} + +/** Build the deterministic query ID for a chunk. */ +function chunkQueryId(planId: string, chunkId: string): string { + return `backfill-${planId}-${chunkId}` +} + +function applyQueryStatus( + state: BackfillChunkState, + qs: QueryStatus, +): { state: BackfillChunkState; changed: boolean } { + if (qs.status === 'running') { + return { state: { ...state, status: 'running' }, changed: state.status !== 'running' } + } + if (qs.status === 'finished') { + return { + state: { + ...state, + status: 'done', + finishedAt: new Date().toISOString(), + durationMs: qs.durationMs, + writtenRows: qs.writtenRows, + writtenBytes: qs.writtenBytes, + }, + changed: true, + } + } + if (qs.status === 'failed') { + return { + state: { + ...state, + status: 'failed', + finishedAt: new Date().toISOString(), + durationMs: qs.durationMs, + error: qs.error, + }, + changed: true, + } + } + // 'unknown' — leave status as-is (query_log may not have flushed yet) + return { state, changed: false } +} + +function getChunk(progress: BackfillProgress, id: string): BackfillChunkState { + const state = progress[id] + if (!state) throw new Error(`No progress entry for chunk ${id}`) + return state +} + +function updateChunk( + progress: BackfillProgress, + id: string, + next: BackfillChunkState, +): BackfillProgress { + return { ...progress, [id]: next } +} + +/** + * Reconcile local progress with server-side state. + * + * Queries system.processes and system.query_log for all chunk query IDs + * to discover queries that were submitted but whose status was never + * persisted locally (e.g. client crash between submit and state write). + */ +export async function syncProgress( + executor: ClickHouseExecutor, + planId: string, + chunks: Array<{ id: string }>, + progress: BackfillProgress, +): Promise { + const prefix = `backfill-${planId}-` + + // Collect query IDs for non-terminal chunks that need reconciliation + const chunkIdsToSync: string[] = [] + for (const chunk of chunks) { + const state = progress[chunk.id] + if (!state || state.status === 'done') continue + chunkIdsToSync.push(chunk.id) + } + + if (chunkIdsToSync.length === 0) return progress + + // Escape single-quotes in the prefix for safe SQL embedding + const safePrefix = prefix.replace(/'/g, "''").replace(/%/g, '\\%').replace(/_/g, '\\_') + + const runningRows = await executor.query<{ query_id: string }>( + `SELECT query_id FROM clusterAllReplicas('parallel_replicas', system.processes) WHERE query_id LIKE '${safePrefix}%' SETTINGS skip_unavailable_shards = 1` + ) + const runningSet = new Set(runningRows.map((r) => r.query_id)) + + const logRows = await executor.query<{ + query_id: string + type: string + written_rows: string + written_bytes: string + query_duration_ms: string + exception: string + }>( + `SELECT query_id, type, written_rows, written_bytes, query_duration_ms, exception +FROM clusterAllReplicas('parallel_replicas', system.query_log) +WHERE query_id LIKE '${safePrefix}%' + AND type IN ('QueryFinish', 'ExceptionWhileProcessing') + AND is_initial_query = 1 +ORDER BY event_time DESC +SETTINGS skip_unavailable_shards = 1` + ) + + // Deduplicate: take the latest log entry per query_id (results are ordered by event_time DESC) + const latestLogByQueryId = new Map() + for (const row of logRows) { + if (!latestLogByQueryId.has(row.query_id)) { + latestLogByQueryId.set(row.query_id, row) + } + } + + let updated = { ...progress } + + for (const chunkId of chunkIdsToSync) { + const queryId = chunkQueryId(planId, chunkId) + const current = updated[chunkId] + if (!current) continue + + if (runningSet.has(queryId)) { + updated = updateChunk(updated, chunkId, { ...current, status: 'running', queryId }) + } else { + const logEntry = latestLogByQueryId.get(queryId) + if (logEntry) { + if (logEntry.type === 'QueryFinish') { + updated = updateChunk(updated, chunkId, { + ...current, + status: 'done', + queryId, + finishedAt: new Date().toISOString(), + writtenRows: Number(logEntry.written_rows), + writtenBytes: Number(logEntry.written_bytes), + durationMs: Number(logEntry.query_duration_ms), + }) + } else { + updated = updateChunk(updated, chunkId, { + ...current, + status: 'failed', + queryId, + finishedAt: new Date().toISOString(), + durationMs: Number(logEntry.query_duration_ms), + error: logEntry.exception, + }) + } + } + } + } + + return updated +} + +async function pollChunk( + executor: ClickHouseExecutor, + initial: BackfillChunkState, + pollIntervalMs: number, + maxPollErrors: number, + onChanged: (state: BackfillChunkState) => void | Promise, +): Promise { + let state = initial + let consecutiveErrors = 0 + while (state.status === 'submitted' || state.status === 'running') { + await sleep(pollIntervalMs) + if (!state.queryId) break + let qs: QueryStatus + try { + qs = await executor.queryStatus(state.queryId, { + afterTime: state.submittedAt, + }) + } catch { + consecutiveErrors++ + if (consecutiveErrors >= maxPollErrors) { + state = { + ...state, + status: 'failed', + finishedAt: new Date().toISOString(), + error: `Lost contact with query after ${consecutiveErrors} consecutive poll errors`, + } + await onChanged(state) + break + } + continue + } + consecutiveErrors = 0 + const result = applyQueryStatus(state, qs) + if (result.changed) { + state = result.state + await onChanged(state) + } + } + return state +} + +export async function executeBackfill(options: BackfillOptions): Promise { + const { + executor, + planId, + chunks, + buildQuery, + concurrency = 3, + pollIntervalMs = 5000, + maxPollErrors = 10, + onProgress, + resumeFrom, + replayFailed, + } = options + + let progress: BackfillProgress = Object.fromEntries( + chunks.map((chunk) => { + const resumed = resumeFrom?.[chunk.id] + return [chunk.id, resumed ? { ...resumed } : { status: 'pending' as const }] + }), + ) + + // When resuming, reconcile local state with the server before processing. + // This catches queries that were submitted but whose status was never + // persisted (e.g. client crash between submit() and state file write). + if (resumeFrom) { + progress = await syncProgress(executor, planId, chunks, progress) + } + + // Reset confirmed-failed chunks to pending AFTER sync so we operate on + // ground truth. The deterministic query_id is reused; the afterTime filter + // in queryStatus ensures we ignore stale query_log entries from prior attempts. + if (replayFailed) { + // Stamp submittedAt with a 60s buffer so the afterTime filter in + // queryStatus ignores stale query_log entries from the prior failed + // attempt while tolerating clock skew between client and server. + const replayAfterTime = new Date(Date.now() - 60_000).toISOString() + for (const chunk of chunks) { + const state = progress[chunk.id] + if (state?.status === 'failed') { + progress = updateChunk(progress, chunk.id, { status: 'pending', submittedAt: replayAfterTime }) + } + } + } + + // Persist the reconciled state so the caller's checkpoint is up to date + if (resumeFrom || replayFailed) { + await onProgress?.(progress) + } + + const setChunk = (id: string, next: BackfillChunkState) => { + progress = updateChunk(progress, id, next) + return onProgress?.(progress) + } + + await pMap( + chunks, + async (chunk) => { + const state = getChunk(progress, chunk.id) + + // Already terminal from a previous run + if (state.status === 'done' || state.status === 'failed') return + + // Resumed in-flight: poll to completion + if (state.status === 'submitted' || state.status === 'running') { + if (!state.queryId) { + await setChunk(chunk.id, { ...state, status: 'pending' }) + } else { + await pollChunk(executor, state, pollIntervalMs, maxPollErrors, (s) => setChunk(chunk.id, s)) + return + } + } + + // Submit and poll + // submittedAt is intentionally omitted on first submission — it's only + // used as an afterTime filter to ignore stale query_log entries when + // replaying a previously failed chunk with the same deterministic query_id. + // Setting it to local time here would cause clock-skew issues with the + // ClickHouse server, making the filter exclude valid entries. + const queryId = chunkQueryId(planId, chunk.id) + const sql = buildQuery(chunk) + await executor.submit(sql, queryId) + const submitted: BackfillChunkState = { + ...getChunk(progress, chunk.id), + status: 'submitted', + queryId, + } + await setChunk(chunk.id, submitted) + + await pollChunk(executor, submitted, pollIntervalMs, maxPollErrors, (s) => setChunk(chunk.id, s)) + }, + { concurrency }, + ) + + const completed = chunks.filter((c) => getChunk(progress, c.id).status === 'done').length + const failed = chunks.filter((c) => getChunk(progress, c.id).status === 'failed').length + + return { + total: chunks.length, + completed, + failed, + progress, + } +} diff --git a/packages/plugin-backfill/src/executor.ts b/packages/plugin-backfill/src/executor.ts deleted file mode 100644 index ea106fe..0000000 --- a/packages/plugin-backfill/src/executor.ts +++ /dev/null @@ -1,123 +0,0 @@ -/** - * Generic work-item execution engine with retries, exponential backoff, - * and AbortSignal support. Agnostic to backfill state, SQL, or ClickHouse — - * it simply iterates a list of items and calls an `execute` function for each. - */ - -export interface WorkItem { - id: string - status: 'pending' | 'running' | 'done' | 'failed' | 'skipped' - attempts: number -} - -export interface ExecutorOptions { - maxRetries: number - retryDelayMs: number - // maxParallelChunks: number // TODO: future concurrency support -} - -export type ProgressEvent = 'item_started' | 'item_done' | 'item_retry' | 'item_failed' - -export interface ExecutorHooks { - onProgress?: (item: T, event: ProgressEvent, meta?: { error?: string; attempt?: number; nextAttempt?: number }) => Promise -} - -export interface ExecutorResult { - completed: T[] - failed: T[] - aborted: boolean -} - -function sleep(ms: number, signal?: AbortSignal): Promise { - return new Promise((resolve) => { - if (signal?.aborted) { - resolve() - return - } - const timer = setTimeout(resolve, ms) - signal?.addEventListener('abort', () => { - clearTimeout(timer) - resolve() - }, { once: true }) - }) -} - -async function executeItem( - item: T, - execute: (item: T, signal?: AbortSignal) => Promise, - options: ExecutorOptions, - hooks?: ExecutorHooks, - signal?: AbortSignal, -): Promise { - while (item.attempts < options.maxRetries) { - if (signal?.aborted) return false - - item.status = 'running' - item.attempts += 1 - - await hooks?.onProgress?.(item, 'item_started', { attempt: item.attempts }) - - let error: string | undefined - - try { - await execute(item, signal) - } catch (err) { - error = err instanceof Error ? err.message : String(err) - } - - if (!error) { - item.status = 'done' - await hooks?.onProgress?.(item, 'item_done', { attempt: item.attempts }) - return true - } - - if (item.attempts >= options.maxRetries) { - item.status = 'failed' - await hooks?.onProgress?.(item, 'item_failed', { error, attempt: item.attempts }) - return false - } - - item.status = 'pending' - await hooks?.onProgress?.(item, 'item_retry', { error, attempt: item.attempts, nextAttempt: item.attempts + 1 }) - - if (options.retryDelayMs > 0) { - const delay = options.retryDelayMs * 2 ** (item.attempts - 1) - await sleep(delay, signal) - } - } - - // Should only be reached when maxRetries <= 0 - item.status = 'failed' - return false -} - -export async function executeWorkItems( - items: T[], - execute: (item: T, signal?: AbortSignal) => Promise, - options: ExecutorOptions, - hooks?: ExecutorHooks, - signal?: AbortSignal, -): Promise> { - const completed: T[] = [] - const failed: T[] = [] - let aborted = false - - for (const item of items) { - if (signal?.aborted) { - aborted = true - break - } - - const ok = await executeItem(item, execute, options, hooks, signal) - if (ok) { - completed.push(item) - } else if (signal?.aborted) { - aborted = true - break - } else { - failed.push(item) - } - } - - return { completed, failed, aborted } -} diff --git a/packages/plugin-backfill/src/index.ts b/packages/plugin-backfill/src/index.ts index c2c8446..a781305 100644 --- a/packages/plugin-backfill/src/index.ts +++ b/packages/plugin-backfill/src/index.ts @@ -1,6 +1,13 @@ import './table-config.js' export { backfill, createBackfillPlugin } from './plugin.js' +export { executeBackfill, syncProgress } from './async-backfill.js' +export type { + BackfillOptions, + BackfillChunkState, + BackfillProgress, + BackfillResult, +} from './async-backfill.js' export type { BackfillPlugin, BackfillPluginOptions, BackfillPluginRegistration } from './types.js' export type { PluginConfig } from './options.js' export type { BackfillTableConfig } from './table-config.js' diff --git a/packages/plugin-backfill/src/options.test.ts b/packages/plugin-backfill/src/options.test.ts index 46d8faa..8f10ffe 100644 --- a/packages/plugin-backfill/src/options.test.ts +++ b/packages/plugin-backfill/src/options.test.ts @@ -84,27 +84,20 @@ describe('RunSchema defaults', () => { test('applies documented defaults', () => { const opts = RunSchema.parse({ planId: 'abc123def456789a' }) - expect(opts.replayDone).toBe(false) - expect(opts.replayFailed).toBe(false) - expect(opts.forceOverlap).toBe(false) - expect(opts.forceCompatibility).toBe(false) expect(opts.forceEnvironment).toBe(false) - expect(opts.maxRetriesPerChunk).toBe(3) - expect(opts.retryDelayMs).toBe(1000) - expect(opts.maxParallelChunks).toBe(1) - expect(opts.blockOverlappingRuns).toBe(true) - expect(opts.requireDryRunBeforeRun).toBe(true) - expect(opts.simulateFailCount).toBe(1) + expect(opts.concurrency).toBe(3) + expect(opts.pollIntervalMs).toBe(5000) }) }) describe('ResumeSchema', () => { - test('omits simulation fields', () => { + test('extends RunSchema with replayFailed', () => { const opts = ResumeSchema.parse({ planId: 'abc123def456789a' }) - expect(opts).not.toHaveProperty('simulateFailChunk') - expect(opts).not.toHaveProperty('simulateFailCount') - expect(opts.replayDone).toBe(false) + expect(opts.forceEnvironment).toBe(false) + expect(opts.concurrency).toBe(3) + expect(opts.pollIntervalMs).toBe(5000) + expect(opts.replayFailed).toBe(false) }) }) @@ -131,13 +124,13 @@ describe('resolveOptions', () => { test('runtime options override plugin config', () => { const opts = resolveOptions( RunSchema, - { maxRetriesPerChunk: 2 }, - { maxRetriesPerChunk: 5 }, + { concurrency: 2 }, + { concurrency: 5 }, { '--plan-id': 'abc123def456789a' }, { '--plan-id': { key: 'planId', coerce: (v: string) => v } } ) - expect(opts.maxRetriesPerChunk).toBe(5) + expect(opts.concurrency).toBe(5) }) test('schema defaults apply when no override provided', () => { @@ -149,9 +142,9 @@ describe('resolveOptions', () => { { '--plan-id': { key: 'planId', coerce: (v: string) => v } } ) - expect(opts.maxRetriesPerChunk).toBe(3) - expect(opts.retryDelayMs).toBe(1000) - expect(opts.blockOverlappingRuns).toBe(true) + expect(opts.concurrency).toBe(3) + expect(opts.pollIntervalMs).toBe(5000) + expect(opts.forceEnvironment).toBe(false) }) test('throws on invalid options', () => { diff --git a/packages/plugin-backfill/src/options.ts b/packages/plugin-backfill/src/options.ts index e671325..11120cd 100644 --- a/packages/plugin-backfill/src/options.ts +++ b/packages/plugin-backfill/src/options.ts @@ -107,31 +107,15 @@ export type PlanOptions = z.infer export const RunSchema = z.object({ planId: z.string(), - replayDone: z.boolean().default(false), - replayFailed: z.boolean().default(false), - forceOverlap: z.boolean().default(false), - forceCompatibility: z.boolean().default(false), forceEnvironment: z.boolean().default(false), - simulateFailChunk: z.string().optional(), - simulateFailCount: z.number().int().positive().default(1), - maxRetriesPerChunk: z.number().int().positive().default(3), - retryDelayMs: z.number().nonnegative().default(1000), - maxParallelChunks: z.number().int().positive().default(1), - maxChunkBytes: z.number().positive().default(10 * GiB), - requireIdempotencyToken: z.boolean().default(true), - blockOverlappingRuns: z.boolean().default(true), - requireDryRunBeforeRun: z.boolean().default(true), - requireExplicitWindow: z.boolean().default(true), - failCheckOnRequiredPendingBackfill: z.boolean().default(true), - maxWindowHours: z.number().positive().default(720), - minChunkMinutes: z.number().positive().default(15), + concurrency: z.number().int().positive().default(3), + pollIntervalMs: z.number().nonnegative().default(5000), stateDir: z.string().min(1).optional(), }) export type RunOptions = z.infer -export const ResumeSchema = RunSchema.omit({ - simulateFailChunk: true, - simulateFailCount: true, +export const ResumeSchema = RunSchema.extend({ + replayFailed: z.boolean().default(false), }) export type ResumeOptions = z.infer @@ -153,21 +137,6 @@ export const CheckSchema = z.object({ }) export type CheckOptions = z.infer -/** Fields used by computeCompatibilityToken — shared by RunOptions and ResumeOptions. */ -export type CompatOptions = Pick< - RunOptions, - | 'maxChunkBytes' - | 'maxParallelChunks' - | 'maxRetriesPerChunk' - | 'requireIdempotencyToken' - | 'requireDryRunBeforeRun' - | 'requireExplicitWindow' - | 'blockOverlappingRuns' - | 'failCheckOnRequiredPendingBackfill' - | 'maxWindowHours' - | 'minChunkMinutes' -> - // ───── CLI flag definitions ───── export const PLAN_FLAGS = defineFlags([ @@ -179,22 +148,17 @@ export const PLAN_FLAGS = defineFlags([ export const RUN_FLAGS = defineFlags([ { name: '--plan-id', type: 'string', description: 'Plan ID to execute', placeholder: '' }, - { name: '--replay-done', type: 'boolean', description: 'Re-execute already completed chunks' }, - { name: '--replay-failed', type: 'boolean', description: 'Re-execute failed chunks' }, - { name: '--force-overlap', type: 'boolean', description: 'Allow overlapping runs' }, - { name: '--force-compatibility', type: 'boolean', description: 'Skip compatibility checks' }, { name: '--force-environment', type: 'boolean', description: 'Skip environment mismatch checks' }, - { name: '--simulate-fail-chunk', type: 'string', description: 'Simulate failure on chunk', placeholder: '' }, - { name: '--simulate-fail-count', type: 'string', description: 'Number of simulated failures', placeholder: '' }, + { name: '--concurrency', type: 'string', description: 'Max concurrent async queries', placeholder: '' }, + { name: '--poll-interval', type: 'string', description: 'Polling interval in ms', placeholder: '' }, ] as const) export const RESUME_FLAGS = defineFlags([ { name: '--plan-id', type: 'string', description: 'Plan ID to resume', placeholder: '' }, - { name: '--replay-done', type: 'boolean', description: 'Re-execute already completed chunks' }, - { name: '--replay-failed', type: 'boolean', description: 'Re-execute failed chunks' }, - { name: '--force-overlap', type: 'boolean', description: 'Allow overlapping runs' }, - { name: '--force-compatibility', type: 'boolean', description: 'Skip compatibility checks' }, { name: '--force-environment', type: 'boolean', description: 'Skip environment mismatch checks' }, + { name: '--concurrency', type: 'string', description: 'Max concurrent async queries', placeholder: '' }, + { name: '--poll-interval', type: 'string', description: 'Polling interval in ms', placeholder: '' }, + { name: '--replay-failed', type: 'boolean', description: 'Re-execute failed chunks' }, ] as const) export const PLAN_ID_FLAGS = defineFlags([ @@ -210,35 +174,27 @@ const PLAN_FLAG_MAP: FlagMapping = { '--max-chunk-bytes': { key: 'maxChunkBytes', coerce: parseByteSize }, } +function coercePositiveInt(v: string, flag: string): number { + const n = Number(v) + if (!Number.isFinite(n) || n <= 0 || !Number.isInteger(n)) { + throw new BackfillConfigError(`Invalid value for ${flag}. Expected integer > 0.`) + } + return n +} + const RUN_FLAG_MAP: FlagMapping = { '--plan-id': { key: 'planId', coerce: normalizePlanId }, - '--replay-done': { key: 'replayDone' }, - '--replay-failed': { key: 'replayFailed' }, - '--force-overlap': { key: 'forceOverlap' }, - '--force-compatibility': { key: 'forceCompatibility' }, '--force-environment': { key: 'forceEnvironment' }, - '--simulate-fail-chunk': { key: 'simulateFailChunk' }, - '--simulate-fail-count': { - key: 'simulateFailCount', - coerce: (v) => { - const n = Number(v) - if (!Number.isFinite(n) || n <= 0 || !Number.isInteger(n)) { - throw new BackfillConfigError( - 'Invalid value for --simulate-fail-count. Expected integer > 0.' - ) - } - return n - }, - }, + '--concurrency': { key: 'concurrency', coerce: (v) => coercePositiveInt(v, '--concurrency') }, + '--poll-interval': { key: 'pollIntervalMs', coerce: (v) => coercePositiveInt(v, '--poll-interval') }, } const RESUME_FLAG_MAP: FlagMapping = { '--plan-id': { key: 'planId', coerce: normalizePlanId }, - '--replay-done': { key: 'replayDone' }, - '--replay-failed': { key: 'replayFailed' }, - '--force-overlap': { key: 'forceOverlap' }, - '--force-compatibility': { key: 'forceCompatibility' }, '--force-environment': { key: 'forceEnvironment' }, + '--concurrency': { key: 'concurrency', coerce: (v) => coercePositiveInt(v, '--concurrency') }, + '--poll-interval': { key: 'pollIntervalMs', coerce: (v) => coercePositiveInt(v, '--poll-interval') }, + '--replay-failed': { key: 'replayFailed' }, } const PLAN_ID_FLAG_MAP: FlagMapping = { diff --git a/packages/plugin-backfill/src/payload.ts b/packages/plugin-backfill/src/payload.ts index 570df70..f17e096 100644 --- a/packages/plugin-backfill/src/payload.ts +++ b/packages/plugin-backfill/src/payload.ts @@ -3,7 +3,6 @@ import type { BackfillPlanStatus, BackfillStatusSummary, BuildBackfillPlanOutput, - ExecuteBackfillRunOutput, } from './types.js' export function planPayload(output: BuildBackfillPlanOutput): { @@ -40,44 +39,14 @@ export function planPayload(output: BuildBackfillPlanOutput): { } } -export function runPayload(output: ExecuteBackfillRunOutput): { - ok: boolean - command: 'run' | 'resume' - planId: string - status: BackfillPlanStatus - chunkCounts: BackfillStatusSummary['totals'] - attempts: number - rowsWritten: number - runPath: string - eventPath: string - lastError?: string - noop?: boolean -} { - return { - ok: output.status.status === 'completed', - command: 'run', - planId: output.run.planId, - status: output.status.status, - chunkCounts: output.status.totals, - attempts: output.status.attempts, - rowsWritten: output.status.rowsWritten, - runPath: output.runPath, - eventPath: output.eventPath, - lastError: output.status.lastError, - noop: output.noop, - } -} - export function statusPayload(summary: BackfillStatusSummary): { ok: boolean command: 'status' planId: string status: BackfillPlanStatus chunkCounts: BackfillStatusSummary['totals'] - attempts: number rowsWritten: number runPath: string - eventPath: string updatedAt: string lastError?: string } { @@ -87,10 +56,8 @@ export function statusPayload(summary: BackfillStatusSummary): { planId: summary.planId, status: summary.status, chunkCounts: summary.totals, - attempts: summary.attempts, rowsWritten: summary.rowsWritten, runPath: summary.runPath, - eventPath: summary.eventPath, updatedAt: summary.updatedAt, lastError: summary.lastError, } @@ -103,7 +70,6 @@ export function cancelPayload(summary: BackfillStatusSummary): { status: BackfillPlanStatus chunkCounts: BackfillStatusSummary['totals'] runPath: string - eventPath: string } { return { ok: summary.status === 'cancelled', @@ -112,7 +78,6 @@ export function cancelPayload(summary: BackfillStatusSummary): { status: summary.status, chunkCounts: summary.totals, runPath: summary.runPath, - eventPath: summary.eventPath, } } diff --git a/packages/plugin-backfill/src/plugin.ts b/packages/plugin-backfill/src/plugin.ts index 3e5a12d..53079d8 100644 --- a/packages/plugin-backfill/src/plugin.ts +++ b/packages/plugin-backfill/src/plugin.ts @@ -1,6 +1,7 @@ import { createClickHouseExecutor } from '@chkit/clickhouse' import { wrapPluginRun } from '@chkit/core' +import { executeBackfill, type BackfillProgress } from './async-backfill.js' import { BackfillConfigError } from './errors.js' import { PLAN_FLAGS, @@ -15,15 +16,23 @@ import { resolveStatusOptions, type PluginConfig, } from './options.js' -import { planPayload, runPayload, statusPayload, cancelPayload, doctorPayload } from './payload.js' +import { planPayload, statusPayload, cancelPayload, doctorPayload } from './payload.js' import { buildBackfillPlan } from './planner.js' import { evaluateBackfillCheck } from './check.js' import { cancelBackfillRun, getBackfillDoctorReport, getBackfillStatus } from './queries.js' -import { executeBackfillRun, resumeBackfillRun } from './runtime.js' +import { + backfillPaths, + ensureEnvironmentMatch, + nowIso, + readPlan, + readRun, + summarizeRunStatus, + writeJson, +} from './state.js' import type { BackfillPlugin, BackfillPluginRegistration, - ExecuteBackfillRunOutput, + BackfillRunState, } from './types.js' function formatBytes(bytes: number): string { @@ -34,40 +43,129 @@ function formatBytes(bytes: number): string { return `${bytes} B` } -type BackfillCommandContext = Parameters[0] +async function runBackfill(input: { + planId: string + forceEnvironment: boolean + concurrency: number + pollIntervalMs: number + stateDir?: string + resumeFrom?: BackfillProgress + replayFailed?: boolean + configPath: string + config: Parameters[0]['config'] + clickhouse: NonNullable[0]> + print: (value: unknown) => void + jsonMode: boolean +}): Promise { + const { plan, stateDir } = await readPlan({ + planId: input.planId, + configPath: input.configPath, + config: input.config, + stateDir: input.stateDir, + }) -function formatRunOutput( - output: ExecuteBackfillRunOutput, - command: string, - context: Pick, -): number { - const payload = { - ...runPayload(output), - command, - } - if (payload.noop) { - if (!context.jsonMode) { - context.print( - `Plan ${payload.planId} is already completed (${payload.chunkCounts.done}/${payload.chunkCounts.total} chunks done). Nothing to do.` + ensureEnvironmentMatch({ + plan, + clickhouse: input.clickhouse, + forceEnvironment: input.forceEnvironment, + }) + + const paths = backfillPaths(stateDir, plan.planId) + + // Check for existing run state + const existingRun = await readRun(paths.runPath) + const resumeFrom = input.resumeFrom + + if (existingRun && !resumeFrom) { + // `run` command (no resumeFrom) must not silently continue an existing run. + // Users should use `backfill resume` instead. + const status = existingRun.status + if (status === 'completed') { + throw new BackfillConfigError( + `Run already completed for plan ${plan.planId}. Nothing to do.` ) - } else { - context.print(payload) } - return 0 - } - if (context.jsonMode) { - context.print(payload) - } else { - let line = `Backfill ${command} ${payload.planId}: ${payload.status} (done=${payload.chunkCounts.done}/${payload.chunkCounts.total}, ${payload.rowsWritten} rows written)` - if (payload.lastError) line += ` \u2014 ${payload.lastError}` - context.print(line) - if (payload.status === 'completed' && payload.rowsWritten === 0) { - context.print( - 'Warning: 0 rows written across all chunks. Verify that source data exists in the time range and passes the query\'s WHERE filters.' + if (status === 'cancelled') { + throw new BackfillConfigError( + `Run is cancelled for plan ${plan.planId}. Create a new plan or inspect with backfill doctor.` ) } + throw new BackfillConfigError( + `A run already exists for plan ${plan.planId} (status: ${status}). Use backfill resume to continue.` + ) + } + + const db = createClickHouseExecutor(input.clickhouse) + + try { + const runState: BackfillRunState = { + planId: plan.planId, + target: plan.target, + status: 'running', + startedAt: existingRun?.startedAt ?? nowIso(), + updatedAt: nowIso(), + progress: resumeFrom ?? {}, + } + + await writeJson(paths.runPath, runState) + + const result = await executeBackfill({ + executor: db, + planId: plan.planId, + chunks: plan.chunks.map((c) => ({ id: c.id, from: c.from, to: c.to })), + buildQuery: (chunk) => { + const planChunk = plan.chunks.find((c) => c.id === chunk.id) + if (!planChunk) throw new Error(`Chunk ${chunk.id} not found in plan`) + return planChunk.sqlTemplate + }, + concurrency: input.concurrency, + pollIntervalMs: input.pollIntervalMs, + resumeFrom, + replayFailed: input.replayFailed, + onProgress: async (progress) => { + runState.progress = progress + runState.updatedAt = nowIso() + await writeJson(paths.runPath, runState) + }, + }) + + runState.status = result.failed > 0 ? 'failed' : 'completed' + runState.completedAt = nowIso() + runState.updatedAt = nowIso() + runState.progress = result.progress + if (result.failed > 0) { + const failedEntry = Object.values(result.progress).find((c) => c.status === 'failed') + runState.lastError = failedEntry?.error ?? 'One or more chunks failed' + } + await writeJson(paths.runPath, runState) + + const summary = summarizeRunStatus(runState, paths.runPath, plan) + + if (input.jsonMode) { + input.print({ + ok: result.failed === 0, + planId: plan.planId, + status: runState.status, + chunkCounts: summary.totals, + rowsWritten: summary.rowsWritten, + runPath: paths.runPath, + lastError: runState.lastError, + }) + } else { + let line = `Backfill ${plan.planId}: ${runState.status} (done=${summary.totals.done}/${summary.totals.total}, ${summary.rowsWritten} rows written)` + if (runState.lastError) line += ` \u2014 ${runState.lastError}` + input.print(line) + if (runState.status === 'completed' && summary.rowsWritten === 0) { + input.print( + 'Warning: 0 rows written across all chunks. Verify that source data exists in the time range and passes the query\'s WHERE filters.' + ) + } + } + + return result.failed > 0 ? 1 : 0 + } finally { + await db.close() } - return payload.ok ? 0 : 1 } export function createBackfillPlugin(options: PluginConfig = {}): BackfillPlugin { @@ -138,7 +236,7 @@ export function createBackfillPlugin(options: PluginConfig = {}): BackfillPlugin }, { name: 'run', - description: 'Execute a planned backfill with checkpointed chunk progress', + description: 'Execute a planned backfill with async query submission and polling', flags: RUN_FLAGS, run: async (context) => wrapPluginRun({ @@ -150,23 +248,24 @@ export function createBackfillPlugin(options: PluginConfig = {}): BackfillPlugin fn: async () => { const opts = resolveRunOptions(config, context.options, context.flags) - const db = context.config.clickhouse - ? createClickHouseExecutor(context.config.clickhouse) - : undefined - - try { - const output = await executeBackfillRun({ - opts, - configPath: context.configPath, - config: context.config, - execute: db ? async (sql) => { await db.command(sql); return undefined } : undefined, - clickhouse: context.config.clickhouse, - }) - - return formatRunOutput(output, 'run', context) - } finally { - await db?.close() + if (!context.config.clickhouse) { + throw new BackfillConfigError( + 'ClickHouse connection is required for backfill execution. Configure clickhouse in your clickhouse.config.ts.' + ) } + + return runBackfill({ + planId: opts.planId, + forceEnvironment: opts.forceEnvironment, + concurrency: opts.concurrency, + pollIntervalMs: opts.pollIntervalMs, + stateDir: opts.stateDir, + configPath: context.configPath, + config: context.config, + clickhouse: context.config.clickhouse, + print: context.print, + jsonMode: context.jsonMode, + }) }, }), }, @@ -184,23 +283,48 @@ export function createBackfillPlugin(options: PluginConfig = {}): BackfillPlugin fn: async () => { const opts = resolveResumeOptions(config, context.options, context.flags) - const db = context.config.clickhouse - ? createClickHouseExecutor(context.config.clickhouse) - : undefined - - try { - const output = await resumeBackfillRun({ - opts, - configPath: context.configPath, - config: context.config, - execute: db ? async (sql) => { await db.command(sql); return undefined } : undefined, - clickhouse: context.config.clickhouse, - }) + if (!context.config.clickhouse) { + throw new BackfillConfigError( + 'ClickHouse connection is required for backfill execution. Configure clickhouse in your clickhouse.config.ts.' + ) + } - return formatRunOutput(output, 'resume', context) - } finally { - await db?.close() + const { stateDir } = await readPlan({ + planId: opts.planId, + configPath: context.configPath, + config: context.config, + stateDir: opts.stateDir, + }) + const paths = backfillPaths(stateDir, opts.planId) + const existingRun = await readRun(paths.runPath) + if (!existingRun) { + throw new BackfillConfigError( + `Run state not found for plan ${opts.planId}. Start with backfill run before resume.` + ) } + if (existingRun.status === 'completed') { + if (context.jsonMode) { + context.print({ ok: true, noop: true, planId: opts.planId, status: 'completed', message: 'Run already completed. Nothing to resume.' }) + } else { + context.print(`Backfill ${opts.planId}: already completed. Nothing to resume.`) + } + return 0 + } + + return runBackfill({ + planId: opts.planId, + forceEnvironment: opts.forceEnvironment, + concurrency: opts.concurrency, + pollIntervalMs: opts.pollIntervalMs, + stateDir: opts.stateDir, + resumeFrom: existingRun.progress, + replayFailed: opts.replayFailed, + configPath: context.configPath, + config: context.config, + clickhouse: context.config.clickhouse, + print: context.print, + jsonMode: context.jsonMode, + }) }, }), }, diff --git a/packages/plugin-backfill/src/queries.ts b/packages/plugin-backfill/src/queries.ts index 9b71a27..66780ef 100644 --- a/packages/plugin-backfill/src/queries.ts +++ b/packages/plugin-backfill/src/queries.ts @@ -4,10 +4,10 @@ import { BackfillConfigError } from './errors.js' import { backfillPaths, nowIso, - persistRunAndEvent, readPlan, readRun, summarizeRunStatus, + writeJson, } from './state.js' import type { BackfillDoctorReport, @@ -37,20 +37,18 @@ export async function getBackfillStatus(input: { totals: { total: plan.chunks.length, pending: plan.chunks.length, + submitted: 0, running: 0, done: 0, failed: 0, - skipped: 0, }, - attempts: 0, rowsWritten: 0, updatedAt: plan.createdAt, runPath: paths.runPath, - eventPath: paths.eventPath, } } - return summarizeRunStatus(run, paths.runPath, paths.eventPath) + return summarizeRunStatus(run, paths.runPath, plan) } export async function cancelBackfillRun(input: { @@ -77,29 +75,16 @@ export async function cancelBackfillRun(input: { throw new BackfillConfigError(`Run already completed for plan ${plan.planId}; cannot cancel.`) } if (run.status === 'cancelled') { - return summarizeRunStatus(run, paths.runPath, paths.eventPath) + return summarizeRunStatus(run, paths.runPath, plan) } run.status = 'cancelled' run.completedAt = nowIso() run.lastError = 'Cancelled by operator' - for (const chunk of run.chunks) { - if (chunk.status === 'running') { - chunk.status = 'pending' - } - } - await persistRunAndEvent({ - run, - runPath: paths.runPath, - eventPath: paths.eventPath, - event: { - type: 'run_cancelled', - planId: plan.planId, - }, - }) + await writeJson(paths.runPath, run) - return summarizeRunStatus(run, paths.runPath, paths.eventPath) + return summarizeRunStatus(run, paths.runPath, plan) } export async function getBackfillDoctorReport(input: { @@ -118,25 +103,25 @@ export async function getBackfillDoctorReport(input: { const run = await readRun(paths.runPath) const status = run - ? summarizeRunStatus(run, paths.runPath, paths.eventPath) + ? summarizeRunStatus(run, paths.runPath, plan) : { planId: plan.planId, target: plan.target, status: 'planned' as const, - totals: { total: plan.chunks.length, pending: plan.chunks.length, running: 0, done: 0, failed: 0, skipped: 0 }, - attempts: 0, + totals: { total: plan.chunks.length, pending: plan.chunks.length, submitted: 0, running: 0, done: 0, failed: 0 }, rowsWritten: 0, updatedAt: plan.createdAt, runPath: paths.runPath, - eventPath: paths.eventPath, } const issueCodes: string[] = [] const recommendations: string[] = [] const failedChunkIds: string[] = [] - for (const chunk of run?.chunks ?? []) { - if (chunk.status === 'failed') failedChunkIds.push(chunk.id) + if (run) { + for (const [chunkId, state] of Object.entries(run.progress)) { + if (state.status === 'failed') failedChunkIds.push(chunkId) + } } if (status.status === 'planned') { diff --git a/packages/plugin-backfill/src/runtime.test.ts b/packages/plugin-backfill/src/runtime.test.ts deleted file mode 100644 index 2cfab01..0000000 --- a/packages/plugin-backfill/src/runtime.test.ts +++ /dev/null @@ -1,782 +0,0 @@ -import { describe, expect, test } from 'bun:test' -import { mkdtemp, readFile, rm } from 'node:fs/promises' -import { join } from 'node:path' -import { tmpdir } from 'node:os' - -import { resolveConfig } from '@chkit/core' - -import { PlanSchema, RunSchema, ResumeSchema } from './options.js' -import { buildBackfillPlan } from './planner.js' -import { evaluateBackfillCheck } from './check.js' -import { cancelBackfillRun, getBackfillDoctorReport, getBackfillStatus } from './queries.js' -import { executeBackfillRun, resumeBackfillRun } from './runtime.js' - -function createMockQuery(opts: { - partitions?: Array<{ partition_id: string; total_rows: string; total_bytes: string; min_time: string; max_time: string }> -} = {}): (sql: string) => Promise { - const partitions = opts.partitions ?? [ - { partition_id: '202601a', total_rows: '500', total_bytes: '250000', min_time: '2026-01-01 00:00:00', max_time: '2026-01-01 02:00:00' }, - { partition_id: '202601b', total_rows: '500', total_bytes: '250000', min_time: '2026-01-01 02:00:00', max_time: '2026-01-01 04:00:00' }, - { partition_id: '202601c', total_rows: '500', total_bytes: '250000', min_time: '2026-01-01 04:00:00', max_time: '2026-01-01 06:00:00' }, - ] - - return async (sql: string) => { - if (sql.includes('system.parts')) return partitions as T[] - if (sql.includes('system.tables')) return [{ sorting_key: 'event_time' }] as T[] - if (sql.includes('system.columns')) return [{ type: 'DateTime' }] as T[] - return [] as T[] - } -} - -describe('@chkit/plugin-backfill run lifecycle', () => { - test('runs plan chunks and reports completed status', async () => { - const dir = await mkdtemp(join(tmpdir(), 'chkit-backfill-plugin-')) - const configPath = join(dir, 'clickhouse.config.ts') - - try { - const config = resolveConfig({ - schema: './schema.ts', - metaDir: './chkit/meta', - }) - const planOpts = PlanSchema.parse({ - target: 'app.events', - from: '2026-01-01T00:00:00.000Z', - to: '2026-01-01T06:00:00.000Z', - }) - - const planned = await buildBackfillPlan({ - opts: planOpts, - configPath, - config, - clickhouseQuery: createMockQuery(), - }) - - const runOpts = RunSchema.parse({ planId: planned.plan.planId }) - const ran = await executeBackfillRun({ - opts: runOpts, - configPath, - config, - }) - - expect(ran.status.status).toBe('completed') - expect(ran.status.totals.done).toBe(3) - expect(ran.status.totals.failed).toBe(0) - - const status = await getBackfillStatus({ - planId: planned.plan.planId, - configPath, - config, - }) - expect(status.status).toBe('completed') - expect(status.totals.done).toBe(3) - expect(status.runPath).toContain('/runs/') - expect(status.eventPath).toContain('/events/') - } finally { - await rm(dir, { recursive: true, force: true }) - } - }) - - test('supports fail then resume without replaying done chunks', async () => { - const dir = await mkdtemp(join(tmpdir(), 'chkit-backfill-plugin-')) - const configPath = join(dir, 'clickhouse.config.ts') - - try { - const config = resolveConfig({ - schema: './schema.ts', - metaDir: './chkit/meta', - }) - const planOpts = PlanSchema.parse({ - target: 'app.events', - from: '2026-01-01T00:00:00.000Z', - to: '2026-01-01T06:00:00.000Z', - maxRetriesPerChunk: 1, - }) - - const planned = await buildBackfillPlan({ - opts: planOpts, - configPath, - config, - clickhouseQuery: createMockQuery(), - }) - - const failChunkId = planned.plan.chunks[1]?.id - expect(failChunkId).toBeTruthy() - - const runOpts = RunSchema.parse({ - planId: planned.plan.planId, - maxRetriesPerChunk: 1, - retryDelayMs: 0, - simulateFailChunk: failChunkId, - simulateFailCount: 1, - }) - const firstRun = await executeBackfillRun({ - opts: runOpts, - configPath, - config, - }) - - expect(firstRun.status.status).toBe('failed') - expect(firstRun.status.totals.done).toBe(2) - expect(firstRun.status.totals.failed).toBe(1) - - const resumeOpts = ResumeSchema.parse({ - planId: planned.plan.planId, - maxRetriesPerChunk: 1, - retryDelayMs: 0, - replayFailed: true, - }) - const resumed = await resumeBackfillRun({ - opts: resumeOpts, - configPath, - config, - }) - - expect(resumed.status.status).toBe('completed') - expect(resumed.status.totals.done).toBe(3) - - const runRaw = JSON.parse(await readFile(resumed.runPath, 'utf8')) as { - chunks: Array<{ id: string; attempts: number }> - } - const firstChunk = planned.plan.chunks[0] - const firstChunkState = runRaw.chunks.find((chunk) => chunk.id === firstChunk?.id) - expect(firstChunkState?.attempts).toBe(1) - } finally { - await rm(dir, { recursive: true, force: true }) - } - }) - - test('blocks resume on compatibility mismatch unless forceCompatibility is enabled', async () => { - const dir = await mkdtemp(join(tmpdir(), 'chkit-backfill-plugin-')) - const configPath = join(dir, 'clickhouse.config.ts') - - try { - const config = resolveConfig({ - schema: './schema.ts', - metaDir: './chkit/meta', - }) - const planOpts = PlanSchema.parse({ - target: 'app.events', - from: '2026-01-01T00:00:00.000Z', - to: '2026-01-01T06:00:00.000Z', - maxRetriesPerChunk: 1, - }) - - const planned = await buildBackfillPlan({ - opts: planOpts, - configPath, - config, - clickhouseQuery: createMockQuery(), - }) - const failChunkId = planned.plan.chunks[1]?.id - expect(failChunkId).toBeTruthy() - - const runOpts = RunSchema.parse({ - planId: planned.plan.planId, - maxRetriesPerChunk: 1, - retryDelayMs: 0, - simulateFailChunk: failChunkId, - simulateFailCount: 1, - }) - await executeBackfillRun({ - opts: runOpts, - configPath, - config, - }) - - await expect( - resumeBackfillRun({ - opts: ResumeSchema.parse({ - planId: planned.plan.planId, - maxRetriesPerChunk: 5, - retryDelayMs: 0, - replayFailed: true, - }), - configPath, - config, - }) - ).rejects.toThrow('Run compatibility check failed') - - const resumed = await resumeBackfillRun({ - opts: ResumeSchema.parse({ - planId: planned.plan.planId, - maxRetriesPerChunk: 5, - retryDelayMs: 0, - replayFailed: true, - forceCompatibility: true, - }), - configPath, - config, - }) - expect(resumed.status.status).toBe('completed') - } finally { - await rm(dir, { recursive: true, force: true }) - } - }) - - test('cancel marks run as cancelled and doctor reports actionable remediation', async () => { - const dir = await mkdtemp(join(tmpdir(), 'chkit-backfill-plugin-')) - const configPath = join(dir, 'clickhouse.config.ts') - - try { - const config = resolveConfig({ - schema: './schema.ts', - metaDir: './chkit/meta', - }) - const planOpts = PlanSchema.parse({ - target: 'app.events', - from: '2026-01-01T00:00:00.000Z', - to: '2026-01-01T06:00:00.000Z', - }) - - const planned = await buildBackfillPlan({ - opts: planOpts, - configPath, - config, - clickhouseQuery: createMockQuery(), - }) - await executeBackfillRun({ - opts: RunSchema.parse({ planId: planned.plan.planId }), - configPath, - config, - }) - - await expect( - cancelBackfillRun({ - planId: planned.plan.planId, - configPath, - config, - }) - ).rejects.toThrow('already completed') - - const planOpts2 = PlanSchema.parse({ - target: 'app.events', - from: '2026-01-02T00:00:00.000Z', - to: '2026-01-02T06:00:00.000Z', - maxRetriesPerChunk: 1, - }) - const planned2 = await buildBackfillPlan({ - opts: planOpts2, - configPath, - config, - clickhouseQuery: createMockQuery({ - partitions: [ - { partition_id: '202601d', total_rows: '500', total_bytes: '250000', min_time: '2026-01-02 00:00:00', max_time: '2026-01-02 02:00:00' }, - { partition_id: '202601e', total_rows: '500', total_bytes: '250000', min_time: '2026-01-02 02:00:00', max_time: '2026-01-02 04:00:00' }, - { partition_id: '202601f', total_rows: '500', total_bytes: '250000', min_time: '2026-01-02 04:00:00', max_time: '2026-01-02 06:00:00' }, - ], - }), - }) - await executeBackfillRun({ - opts: RunSchema.parse({ - planId: planned2.plan.planId, - maxRetriesPerChunk: 1, - retryDelayMs: 0, - simulateFailChunk: planned2.plan.chunks[1]?.id, - simulateFailCount: 1, - }), - configPath, - config, - }) - const cancelled = await cancelBackfillRun({ - planId: planned2.plan.planId, - configPath, - config, - }) - expect(cancelled.status).toBe('cancelled') - - const doctor = await getBackfillDoctorReport({ - planId: planned2.plan.planId, - configPath, - config, - }) - expect(doctor.issueCodes).toContain('backfill_required_pending') - expect(doctor.recommendations.join(' ')).toContain('backfill resume') - } finally { - await rm(dir, { recursive: true, force: true }) - } - }) -}) - -describe('@chkit/plugin-backfill execute callback', () => { - test('calls execute for each chunk when provided', async () => { - const dir = await mkdtemp(join(tmpdir(), 'chkit-backfill-plugin-')) - const configPath = join(dir, 'clickhouse.config.ts') - - try { - const config = resolveConfig({ - schema: './schema.ts', - metaDir: './chkit/meta', - }) - - const planned = await buildBackfillPlan({ - opts: PlanSchema.parse({ - target: 'app.events', - from: '2026-01-01T00:00:00.000Z', - to: '2026-01-01T06:00:00.000Z', - }), - configPath, - config, - clickhouseQuery: createMockQuery(), - }) - - const executedSql: string[] = [] - const execute = async (sql: string) => { - executedSql.push(sql) - } - - const ran = await executeBackfillRun({ - opts: RunSchema.parse({ planId: planned.plan.planId }), - configPath, - config, - execute, - }) - - expect(ran.status.status).toBe('completed') - expect(ran.status.totals.done).toBe(3) - expect(executedSql).toHaveLength(3) - for (const sql of executedSql) { - expect(sql).toContain('INSERT INTO app.events') - } - } finally { - await rm(dir, { recursive: true, force: true }) - } - }) - - test('retries and succeeds chunk when execute fails then recovers', async () => { - const dir = await mkdtemp(join(tmpdir(), 'chkit-backfill-plugin-')) - const configPath = join(dir, 'clickhouse.config.ts') - - try { - const config = resolveConfig({ - schema: './schema.ts', - metaDir: './chkit/meta', - }) - - const planned = await buildBackfillPlan({ - opts: PlanSchema.parse({ - target: 'app.events', - from: '2026-01-01T00:00:00.000Z', - to: '2026-01-01T06:00:00.000Z', - maxRetriesPerChunk: 3, - }), - configPath, - config, - clickhouseQuery: createMockQuery(), - }) - - let callCount = 0 - const execute = async (_sql: string) => { - callCount++ - if (callCount === 1) { - throw new Error('Temporary network error') - } - } - - const ran = await executeBackfillRun({ - opts: RunSchema.parse({ - planId: planned.plan.planId, - maxRetriesPerChunk: 3, - retryDelayMs: 0, - }), - configPath, - config, - execute, - }) - - expect(ran.status.status).toBe('completed') - expect(ran.status.totals.done).toBe(3) - expect(ran.status.totals.failed).toBe(0) - - const firstChunkState = ran.run.chunks[0] - expect(firstChunkState?.attempts).toBe(2) - expect(firstChunkState?.lastError).toBeUndefined() - } finally { - await rm(dir, { recursive: true, force: true }) - } - }) - - test('retries and fails chunk when execute throws', async () => { - const dir = await mkdtemp(join(tmpdir(), 'chkit-backfill-plugin-')) - const configPath = join(dir, 'clickhouse.config.ts') - - try { - const config = resolveConfig({ - schema: './schema.ts', - metaDir: './chkit/meta', - }) - - const planned = await buildBackfillPlan({ - opts: PlanSchema.parse({ - target: 'app.events', - from: '2026-01-01T00:00:00.000Z', - to: '2026-01-01T06:00:00.000Z', - maxRetriesPerChunk: 2, - }), - configPath, - config, - clickhouseQuery: createMockQuery(), - }) - - let callCount = 0 - const execute = async (_sql: string) => { - callCount++ - if (callCount <= 2) { - throw new Error('Connection refused') - } - } - - const ran = await executeBackfillRun({ - opts: RunSchema.parse({ - planId: planned.plan.planId, - maxRetriesPerChunk: 2, - retryDelayMs: 0, - }), - configPath, - config, - execute, - }) - - expect(ran.status.status).toBe('failed') - expect(ran.status.totals.failed).toBe(1) - expect(ran.status.lastError).toContain('Connection refused') - } finally { - await rm(dir, { recursive: true, force: true }) - } - }) -}) - -describe('@chkit/plugin-backfill continue past failures', () => { - test('continues to remaining chunks after a chunk fails permanently', async () => { - const dir = await mkdtemp(join(tmpdir(), 'chkit-backfill-plugin-')) - const configPath = join(dir, 'clickhouse.config.ts') - - try { - const config = resolveConfig({ - schema: './schema.ts', - metaDir: './chkit/meta', - }) - - const planned = await buildBackfillPlan({ - opts: PlanSchema.parse({ - target: 'app.events', - from: '2026-01-01T00:00:00.000Z', - to: '2026-01-01T06:00:00.000Z', - maxRetriesPerChunk: 1, - }), - configPath, - config, - clickhouseQuery: createMockQuery(), - }) - - const failChunkId = planned.plan.chunks[0]?.id - expect(failChunkId).toBeTruthy() - - const ran = await executeBackfillRun({ - opts: RunSchema.parse({ - planId: planned.plan.planId, - maxRetriesPerChunk: 1, - retryDelayMs: 0, - simulateFailChunk: failChunkId, - simulateFailCount: 1, - }), - configPath, - config, - }) - - expect(ran.status.status).toBe('failed') - expect(ran.status.totals.done).toBe(2) - expect(ran.status.totals.failed).toBe(1) - expect(ran.run.chunks[0]?.status).toBe('failed') - expect(ran.run.chunks[1]?.status).toBe('done') - expect(ran.run.chunks[2]?.status).toBe('done') - } finally { - await rm(dir, { recursive: true, force: true }) - } - }) - - test('resume retries failed chunks without requiring --replay-failed', async () => { - const dir = await mkdtemp(join(tmpdir(), 'chkit-backfill-plugin-')) - const configPath = join(dir, 'clickhouse.config.ts') - - try { - const config = resolveConfig({ - schema: './schema.ts', - metaDir: './chkit/meta', - }) - - const planned = await buildBackfillPlan({ - opts: PlanSchema.parse({ - target: 'app.events', - from: '2026-01-01T00:00:00.000Z', - to: '2026-01-01T06:00:00.000Z', - maxRetriesPerChunk: 1, - }), - configPath, - config, - clickhouseQuery: createMockQuery(), - }) - - const failChunkId = planned.plan.chunks[1]?.id - expect(failChunkId).toBeTruthy() - - await executeBackfillRun({ - opts: RunSchema.parse({ - planId: planned.plan.planId, - maxRetriesPerChunk: 1, - retryDelayMs: 0, - simulateFailChunk: failChunkId, - simulateFailCount: 1, - }), - configPath, - config, - }) - - const resumed = await resumeBackfillRun({ - opts: ResumeSchema.parse({ - planId: planned.plan.planId, - maxRetriesPerChunk: 1, - retryDelayMs: 0, - }), - configPath, - config, - }) - - expect(resumed.status.status).toBe('completed') - expect(resumed.status.totals.done).toBe(3) - expect(resumed.status.totals.failed).toBe(0) - } finally { - await rm(dir, { recursive: true, force: true }) - } - }) -}) - -describe('@chkit/plugin-backfill check integration', () => { - test('reports pending required backfills when plan exists but run is missing', async () => { - const dir = await mkdtemp(join(tmpdir(), 'chkit-backfill-plugin-')) - const configPath = join(dir, 'clickhouse.config.ts') - - try { - const config = resolveConfig({ - schema: './schema.ts', - metaDir: './chkit/meta', - }) - - await buildBackfillPlan({ - opts: PlanSchema.parse({ - target: 'app.events', - from: '2026-01-01T00:00:00.000Z', - to: '2026-01-01T06:00:00.000Z', - }), - configPath, - config, - clickhouseQuery: createMockQuery(), - }) - - const checkResult = await evaluateBackfillCheck({ - configPath, - config, - failCheckOnRequiredPendingBackfill: true, - }) - - expect(checkResult.ok).toBe(false) - expect(checkResult.findings.map((finding) => finding.code)).toContain('backfill_required_pending') - expect(checkResult.metadata?.requiredCount).toBe(1) - expect(checkResult.metadata?.activeRuns).toBe(0) - expect(checkResult.metadata?.failedRuns).toBe(0) - } finally { - await rm(dir, { recursive: true, force: true }) - } - }) - - test('reports ok after completed run and emits no finding codes', async () => { - const dir = await mkdtemp(join(tmpdir(), 'chkit-backfill-plugin-')) - const configPath = join(dir, 'clickhouse.config.ts') - - try { - const config = resolveConfig({ - schema: './schema.ts', - metaDir: './chkit/meta', - }) - - const planned = await buildBackfillPlan({ - opts: PlanSchema.parse({ - target: 'app.events', - from: '2026-01-01T00:00:00.000Z', - to: '2026-01-01T06:00:00.000Z', - }), - configPath, - config, - clickhouseQuery: createMockQuery(), - }) - await executeBackfillRun({ - opts: RunSchema.parse({ planId: planned.plan.planId }), - configPath, - config, - }) - - const checkResult = await evaluateBackfillCheck({ - configPath, - config, - failCheckOnRequiredPendingBackfill: true, - }) - - expect(checkResult.ok).toBe(true) - expect(checkResult.findings).toEqual([]) - expect(checkResult.metadata?.requiredCount).toBe(0) - expect(checkResult.metadata?.failedRuns).toBe(0) - } finally { - await rm(dir, { recursive: true, force: true }) - } - }) -}) - -describe('@chkit/plugin-backfill environment binding', () => { - test('rejects run against mismatched environment', async () => { - const dir = await mkdtemp(join(tmpdir(), 'chkit-backfill-plugin-')) - const configPath = join(dir, 'clickhouse.config.ts') - - try { - const config = resolveConfig({ - schema: './schema.ts', - metaDir: './chkit/meta', - }) - const stagingCh = { url: 'https://staging.ch.cloud:8443', database: 'analytics' } - const prodCh = { url: 'https://prod.ch.cloud:8443', database: 'analytics' } - - const planned = await buildBackfillPlan({ - opts: PlanSchema.parse({ target: 'app.events', from: '2026-01-01T00:00:00.000Z', to: '2026-01-01T06:00:00.000Z' }), - configPath, - config, - clickhouse: stagingCh, - clickhouseQuery: createMockQuery(), - }) - - await expect( - executeBackfillRun({ - opts: RunSchema.parse({ planId: planned.plan.planId }), - configPath, - config, - clickhouse: prodCh, - }) - ).rejects.toThrow('Environment mismatch') - } finally { - await rm(dir, { recursive: true, force: true }) - } - }) - - test('--force-environment overrides mismatch check', async () => { - const dir = await mkdtemp(join(tmpdir(), 'chkit-backfill-plugin-')) - const configPath = join(dir, 'clickhouse.config.ts') - - try { - const config = resolveConfig({ - schema: './schema.ts', - metaDir: './chkit/meta', - }) - const stagingCh = { url: 'https://staging.ch.cloud:8443', database: 'analytics' } - const prodCh = { url: 'https://prod.ch.cloud:8443', database: 'analytics' } - - const planned = await buildBackfillPlan({ - opts: PlanSchema.parse({ target: 'app.events', from: '2026-01-01T00:00:00.000Z', to: '2026-01-01T06:00:00.000Z' }), - configPath, - config, - clickhouse: stagingCh, - clickhouseQuery: createMockQuery(), - }) - - const ran = await executeBackfillRun({ - opts: RunSchema.parse({ planId: planned.plan.planId, forceEnvironment: true }), - configPath, - config, - clickhouse: prodCh, - }) - - expect(ran.status.status).toBe('completed') - } finally { - await rm(dir, { recursive: true, force: true }) - } - }) - - test('plans without environment connection info can run against any environment', async () => { - const dir = await mkdtemp(join(tmpdir(), 'chkit-backfill-plugin-')) - const configPath = join(dir, 'clickhouse.config.ts') - - try { - const config = resolveConfig({ - schema: './schema.ts', - metaDir: './chkit/meta', - }) - - const planned = await buildBackfillPlan({ - opts: PlanSchema.parse({ target: 'app.events', from: '2026-01-01T00:00:00.000Z', to: '2026-01-01T06:00:00.000Z' }), - configPath, - config, - clickhouseQuery: createMockQuery(), - }) - - expect(planned.plan.environment).toBeUndefined() - - const ran = await executeBackfillRun({ - opts: RunSchema.parse({ planId: planned.plan.planId }), - configPath, - config, - clickhouse: { url: 'https://prod.ch.cloud:8443', database: 'analytics' }, - }) - - expect(ran.status.status).toBe('completed') - } finally { - await rm(dir, { recursive: true, force: true }) - } - }) - - test('rejects resume against mismatched environment', async () => { - const dir = await mkdtemp(join(tmpdir(), 'chkit-backfill-plugin-')) - const configPath = join(dir, 'clickhouse.config.ts') - - try { - const config = resolveConfig({ - schema: './schema.ts', - metaDir: './chkit/meta', - }) - const stagingCh = { url: 'https://staging.ch.cloud:8443', database: 'analytics' } - const prodCh = { url: 'https://prod.ch.cloud:8443', database: 'analytics' } - - const planned = await buildBackfillPlan({ - opts: PlanSchema.parse({ target: 'app.events', from: '2026-01-01T00:00:00.000Z', to: '2026-01-01T06:00:00.000Z', maxRetriesPerChunk: 1 }), - configPath, - config, - clickhouse: stagingCh, - clickhouseQuery: createMockQuery(), - }) - - await executeBackfillRun({ - opts: RunSchema.parse({ - planId: planned.plan.planId, - maxRetriesPerChunk: 1, - retryDelayMs: 0, - simulateFailChunk: planned.plan.chunks[1]?.id, - simulateFailCount: 1, - }), - configPath, - config, - clickhouse: stagingCh, - }) - - await expect( - resumeBackfillRun({ - opts: ResumeSchema.parse({ - planId: planned.plan.planId, - maxRetriesPerChunk: 1, - retryDelayMs: 0, - replayFailed: true, - }), - configPath, - config, - clickhouse: prodCh, - }) - ).rejects.toThrow('Environment mismatch') - } finally { - await rm(dir, { recursive: true, force: true }) - } - }) -}) diff --git a/packages/plugin-backfill/src/runtime.ts b/packages/plugin-backfill/src/runtime.ts deleted file mode 100644 index 837a457..0000000 --- a/packages/plugin-backfill/src/runtime.ts +++ /dev/null @@ -1,423 +0,0 @@ -import type { ResolvedChxConfig } from '@chkit/core' - -import { BackfillConfigError } from './errors.js' -import { executeWorkItems } from './executor.js' -import type { ProgressEvent, WorkItem } from './executor.js' -import type { RunOptions, ResumeOptions } from './options.js' -import { - backfillPaths, - collectActiveRunTargets, - createRunState, - ensureEnvironmentMatch, - ensureRunCompatibility, - nowIso, - persistRunAndEvent, - readPlan, - readRun, - summarizeRunStatus, -} from './state.js' -import type { - BackfillPlanState, - BackfillRunChunkState, - BackfillRunState, - ExecuteBackfillRunOutput, -} from './types.js' - -/** Adapter that bridges a BackfillRunChunkState to the generic WorkItem interface. */ -interface ChunkWorkItem extends WorkItem { - chunk: BackfillRunChunkState - sqlTemplate: string -} - -function toWorkItems(chunks: BackfillRunChunkState[]): ChunkWorkItem[] { - return chunks.map((chunk) => ({ - id: chunk.id, - status: chunk.status, - attempts: chunk.attempts, - chunk, - sqlTemplate: chunk.sqlTemplate, - })) -} - -function syncBackFromWorkItem(item: ChunkWorkItem): void { - item.chunk.status = item.status - item.chunk.attempts = item.attempts -} - -async function executeRunLoop(input: { - plan: BackfillPlanState - run: BackfillRunState - paths: { - runPath: string - eventPath: string - } - replayDone: boolean - replayFailed: boolean - simulateFailChunk?: string - simulateFailCount: number - retryDelayMs: number - execute?: (sql: string) => Promise -}): Promise { - const maxRetries = input.plan.options.maxRetriesPerChunk - const ac = new AbortController() - - const onSignal = () => { ac.abort() } - process.on('SIGINT', onSignal) - process.on('SIGTERM', onSignal) - - try { - input.run.status = 'running' - input.run.replayDone = input.replayDone - input.run.replayFailed = input.replayFailed - - await persistRunAndEvent({ - run: input.run, - runPath: input.paths.runPath, - eventPath: input.paths.eventPath, - event: { - type: 'run_started', - planId: input.plan.planId, - replayDone: input.run.replayDone, - replayFailed: input.run.replayFailed, - }, - }) - - // Prepare chunks: apply skip/reset logic, then filter to executable set. - const executableItems: ChunkWorkItem[] = [] - for (const chunk of input.run.chunks) { - if (chunk.status === 'done' && !input.run.replayDone) continue - - if (chunk.status === 'failed') { - if (!input.run.replayFailed) { - continue - } - chunk.status = 'pending' - chunk.attempts = 0 - chunk.lastError = undefined - chunk.startedAt = undefined - chunk.completedAt = undefined - } - - if (chunk.status === 'running') { - chunk.status = 'pending' - } - - executableItems.push(...toWorkItems([chunk])) - } - - // Simulation support: wrap the execute function to inject failures. - const failureBudget = input.simulateFailCount - const failChunkId = input.simulateFailChunk - - const wrappedExecute = async (item: ChunkWorkItem): Promise => { - const shouldSimulateFailure = - failChunkId === item.id && item.attempts <= failureBudget - - if (shouldSimulateFailure) { - throw new Error(`Simulated failure for chunk ${item.id} attempt ${item.attempts}`) - } - - if (input.execute) { - const result = await input.execute(item.sqlTemplate) - if (result && typeof result === 'object' && typeof result.rowsWritten === 'number') { - item.chunk.rowsWritten = result.rowsWritten - } - } - } - - const result = await executeWorkItems( - executableItems, - wrappedExecute, - { maxRetries, retryDelayMs: input.retryDelayMs }, - { - onProgress: async (item: ChunkWorkItem, event: ProgressEvent, meta) => { - // Keep the chunk state in sync with the work item - syncBackFromWorkItem(item) - - if (event === 'item_started') { - item.chunk.startedAt = nowIso() - await persistRunAndEvent({ - run: input.run, - runPath: input.paths.runPath, - eventPath: input.paths.eventPath, - event: { - type: 'chunk_started', - planId: input.run.planId, - chunkId: item.id, - attempt: item.attempts, - }, - }) - } else if (event === 'item_done') { - item.chunk.completedAt = nowIso() - item.chunk.lastError = undefined - await persistRunAndEvent({ - run: input.run, - runPath: input.paths.runPath, - eventPath: input.paths.eventPath, - event: { - type: 'chunk_done', - planId: input.run.planId, - chunkId: item.id, - attempt: item.attempts, - }, - }) - } else if (event === 'item_retry') { - item.chunk.lastError = meta?.error - await persistRunAndEvent({ - run: input.run, - runPath: input.paths.runPath, - eventPath: input.paths.eventPath, - event: { - type: 'chunk_retry_scheduled', - planId: input.run.planId, - chunkId: item.id, - attempt: item.attempts, - nextAttempt: meta?.nextAttempt, - }, - }) - } else if (event === 'item_failed') { - item.chunk.lastError = meta?.error - await persistRunAndEvent({ - run: input.run, - runPath: input.paths.runPath, - eventPath: input.paths.eventPath, - event: { - type: 'chunk_failed_retry_exhausted', - planId: input.run.planId, - chunkId: item.id, - attempt: item.attempts, - message: meta?.error, - }, - }) - } - }, - }, - ac.signal, - ) - - // Determine final run status - const failedChunks = input.run.chunks.filter((c) => c.status === 'failed') - - if (!result.aborted && failedChunks.length > 0) { - input.run.status = 'failed' - input.run.lastError = - failedChunks[failedChunks.length - 1]?.lastError ?? 'One or more chunks failed' - input.run.completedAt = nowIso() - - await persistRunAndEvent({ - run: input.run, - runPath: input.paths.runPath, - eventPath: input.paths.eventPath, - event: { - type: 'run_completed_with_failures', - planId: input.plan.planId, - failedCount: failedChunks.length, - totalCount: input.run.chunks.length, - }, - }) - - return { - run: input.run, - status: summarizeRunStatus(input.run, input.paths.runPath, input.paths.eventPath), - runPath: input.paths.runPath, - eventPath: input.paths.eventPath, - } - } - - if (!result.aborted) { - input.run.status = 'completed' - input.run.completedAt = nowIso() - input.run.lastError = undefined - - await persistRunAndEvent({ - run: input.run, - runPath: input.paths.runPath, - eventPath: input.paths.eventPath, - event: { - type: 'run_completed', - planId: input.plan.planId, - }, - }) - } - - return { - run: input.run, - status: summarizeRunStatus(input.run, input.paths.runPath, input.paths.eventPath), - runPath: input.paths.runPath, - eventPath: input.paths.eventPath, - } - } finally { - process.removeListener('SIGINT', onSignal) - process.removeListener('SIGTERM', onSignal) - - for (const chunk of input.run.chunks) { - if (chunk.status === 'running') { - chunk.status = 'pending' - } - } - - if (input.run.status === 'running') { - input.run.status = 'paused' - await persistRunAndEvent({ - run: input.run, - runPath: input.paths.runPath, - eventPath: input.paths.eventPath, - event: { - type: 'run_paused', - planId: input.plan.planId, - reason: 'process_exit', - }, - }) - } - } -} - -async function assertNoOverlappingActiveRun(input: { - runsDir: string - planId: string - target: string -}): Promise { - const activeTargets = await collectActiveRunTargets(input.runsDir) - for (const [activePlanId, activeTarget] of activeTargets.entries()) { - if (activePlanId === input.planId) continue - if (activeTarget !== input.target) continue - throw new BackfillConfigError( - `Overlapping active run detected for target ${input.target} (plan ${activePlanId}). Retry with --force-overlap to override.` - ) - } -} - -export async function executeBackfillRun(input: { - opts: RunOptions - configPath: string - config: Pick - execute?: (sql: string) => Promise - clickhouse?: { url: string; database: string } -}): Promise { - const { opts } = input - const { plan, stateDir } = await readPlan({ - planId: opts.planId, - configPath: input.configPath, - config: input.config, - stateDir: opts.stateDir, - }) - - ensureEnvironmentMatch({ - plan, - clickhouse: input.clickhouse, - forceEnvironment: opts.forceEnvironment, - }) - - const paths = backfillPaths(stateDir, plan.planId) - - if (opts.blockOverlappingRuns && !opts.forceOverlap) { - await assertNoOverlappingActiveRun({ - runsDir: paths.runsDir, - planId: plan.planId, - target: plan.target, - }) - } - - let run = await readRun(paths.runPath) - if (!run) { - run = createRunState({ plan, opts }) - } else { - ensureRunCompatibility({ - run, - plan, - opts, - forceCompatibility: opts.forceCompatibility, - }) - } - - if (run.status === 'completed' && !opts.replayDone && !opts.replayFailed) { - return { - run, - status: summarizeRunStatus(run, paths.runPath, paths.eventPath), - runPath: paths.runPath, - eventPath: paths.eventPath, - noop: true, - } - } - if (run.status === 'cancelled') { - throw new BackfillConfigError( - `Run is cancelled for plan ${plan.planId}. Create a new plan or inspect with backfill doctor.` - ) - } - - return executeRunLoop({ - plan, - run, - paths, - replayDone: opts.replayDone, - replayFailed: opts.replayFailed, - simulateFailChunk: opts.simulateFailChunk, - simulateFailCount: opts.simulateFailCount, - retryDelayMs: opts.retryDelayMs, - execute: input.execute, - }) -} - -export async function resumeBackfillRun(input: { - opts: ResumeOptions - configPath: string - config: Pick - execute?: (sql: string) => Promise - clickhouse?: { url: string; database: string } -}): Promise { - const { opts } = input - const { plan, stateDir } = await readPlan({ - planId: opts.planId, - configPath: input.configPath, - config: input.config, - stateDir: opts.stateDir, - }) - - ensureEnvironmentMatch({ - plan, - clickhouse: input.clickhouse, - forceEnvironment: opts.forceEnvironment, - }) - - const paths = backfillPaths(stateDir, plan.planId) - const run = await readRun(paths.runPath) - - if (!run) { - throw new BackfillConfigError( - `Run state not found for plan ${plan.planId}. Start with backfill run before resume.` - ) - } - - ensureRunCompatibility({ - run, - plan, - opts, - forceCompatibility: opts.forceCompatibility, - }) - if (opts.blockOverlappingRuns && !opts.forceOverlap) { - await assertNoOverlappingActiveRun({ - runsDir: paths.runsDir, - planId: plan.planId, - target: plan.target, - }) - } - if (run.status === 'cancelled') { - throw new BackfillConfigError( - `Run is cancelled for plan ${plan.planId}. Create a new plan or inspect with backfill doctor.` - ) - } - - // Resume always retries failed chunks — the whole point of resume is to - // recover from failures. Users shouldn't need --replay-failed for this. - return executeRunLoop({ - plan, - run, - paths, - replayDone: opts.replayDone, - replayFailed: true, - simulateFailChunk: undefined, - simulateFailCount: 0, - retryDelayMs: opts.retryDelayMs, - execute: input.execute, - }) -} diff --git a/packages/plugin-backfill/src/state.ts b/packages/plugin-backfill/src/state.ts index 1254af0..45dd900 100644 --- a/packages/plugin-backfill/src/state.ts +++ b/packages/plugin-backfill/src/state.ts @@ -1,12 +1,11 @@ import { createHash, randomBytes } from 'node:crypto' import { existsSync } from 'node:fs' -import { appendFile, mkdir, readFile, readdir, writeFile } from 'node:fs/promises' +import { mkdir, readFile, readdir, writeFile } from 'node:fs/promises' import { dirname, join, resolve } from 'node:path' import type { ResolvedChxConfig } from '@chkit/core' import { BackfillConfigError } from './errors.js' -import type { CompatOptions } from './options.js' import type { BackfillEnvironment, BackfillPathSet, @@ -24,71 +23,26 @@ export function nowIso(): string { return new Date().toISOString() } -export function stableSerialize(value: unknown): string { - if (value === null || typeof value !== 'object') { - return JSON.stringify(value) - } - - if (Array.isArray(value)) { - return `[${value.map((item) => stableSerialize(item)).join(',')}]` - } - - const entries = Object.entries(value).sort(([a], [b]) => a.localeCompare(b)) - return `{${entries - .map(([key, item]) => `${JSON.stringify(key)}:${stableSerialize(item)}`) - .join(',')}}` -} - -export function computeCompatibilityToken(input: { - plan: BackfillPlanState - opts: CompatOptions -}): string { - return hashId( - stableSerialize({ - planId: input.plan.planId, - target: input.plan.target, - from: input.plan.from, - to: input.plan.to, - planOptions: input.plan.options, - runtimeDefaults: { - maxChunkBytes: input.opts.maxChunkBytes, - maxParallelChunks: input.opts.maxParallelChunks, - maxRetriesPerChunk: input.opts.maxRetriesPerChunk, - requireIdempotencyToken: input.opts.requireIdempotencyToken, - }, - runtimePolicy: { - blockOverlappingRuns: input.opts.blockOverlappingRuns, - failCheckOnRequiredPendingBackfill: input.opts.failCheckOnRequiredPendingBackfill, - requireDryRunBeforeRun: input.opts.requireDryRunBeforeRun, - requireExplicitWindow: input.opts.requireExplicitWindow, - }, - runtimeLimits: { - maxWindowHours: input.opts.maxWindowHours, - minChunkMinutes: input.opts.minChunkMinutes, - }, - }) - ) -} - export function randomPlanId(): string { return randomBytes(8).toString('hex') } export function computeEnvironmentFingerprint( - clickhouse: { url: string; database: string } | undefined + clickhouse: { url: string; database?: string } | undefined ): BackfillEnvironment | undefined { if (!clickhouse) return undefined + const database = clickhouse.database ?? 'default' const origin = new URL(clickhouse.url).origin return { - fingerprint: hashId(`${origin}|${clickhouse.database}`).slice(0, 16), + fingerprint: hashId(`${origin}|${database}`).slice(0, 16), url: origin, - database: clickhouse.database, + database, } } export function ensureEnvironmentMatch(input: { plan: BackfillPlanState - clickhouse: { url: string; database: string } | undefined + clickhouse: { url: string; database?: string } | undefined forceEnvironment: boolean }): void { if (!input.plan.environment) return @@ -121,15 +75,12 @@ export function computeBackfillStateDir( export function backfillPaths(stateDir: string, planId: string): BackfillPathSet { const plansDir = join(stateDir, 'plans') const runsDir = join(stateDir, 'runs') - const eventsDir = join(stateDir, 'events') return { stateDir, plansDir, runsDir, - eventsDir, planPath: join(plansDir, `${planId}.json`), runPath: join(runsDir, `${planId}.json`), - eventPath: join(eventsDir, `${planId}.ndjson`), } } @@ -143,11 +94,6 @@ export async function writeJson(filePath: string, value: unknown): Promise await writeFile(filePath, `${JSON.stringify(value, null, 2)}\n`, 'utf8') } -async function appendEvent(eventPath: string, event: Record): Promise { - await mkdir(dirname(eventPath), { recursive: true }) - await appendFile(eventPath, `${JSON.stringify({ at: nowIso(), ...event })}\n`, 'utf8') -} - export async function readPlan(input: { planId: string configPath: string @@ -171,54 +117,6 @@ export async function readRun(runPath: string): Promise return readJsonMaybe(runPath) } -export function createRunState(input: { - plan: BackfillPlanState - opts: CompatOptions & { replayDone: boolean; replayFailed: boolean } -}): BackfillRunState { - const startedAt = nowIso() - return { - planId: input.plan.planId, - target: input.plan.target, - status: 'planned', - createdAt: startedAt, - startedAt, - updatedAt: startedAt, - replayDone: input.opts.replayDone, - replayFailed: input.opts.replayFailed, - compatibilityToken: computeCompatibilityToken({ - plan: input.plan, - opts: input.opts, - }), - options: input.plan.options, - chunks: input.plan.chunks.map((chunk) => ({ - id: chunk.id, - from: chunk.from, - to: chunk.to, - status: 'pending', - attempts: 0, - idempotencyToken: chunk.idempotencyToken, - sqlTemplate: chunk.sqlTemplate, - })), - } -} - -export async function collectActiveRunTargets(runsDir: string): Promise> { - const active = new Map() - if (!existsSync(runsDir)) return active - - const entries = await readdir(runsDir, { withFileTypes: true }) - for (const entry of entries) { - if (!entry.isFile() || !entry.name.endsWith('.json')) continue - const file = join(runsDir, entry.name) - const run = await readRun(file) - if (!run) continue - if (run.status !== 'running') continue - active.set(run.planId, run.target) - } - - return active -} - export async function listPlanIds(plansDir: string): Promise { if (!existsSync(plansDir)) return [] const entries = await readdir(plansDir, { withFileTypes: true }) @@ -231,69 +129,40 @@ export async function listPlanIds(plansDir: string): Promise { export function summarizeRunStatus( run: BackfillRunState, runPath: string, - eventPath: string + plan: BackfillPlanState, ): BackfillStatusSummary { - const summary = { - total: run.chunks.length, + const totals = { + total: plan.chunks.length, pending: 0, + submitted: 0, running: 0, done: 0, failed: 0, - skipped: 0, } - let attempts = 0 let rowsWritten = 0 - for (const chunk of run.chunks) { - attempts += chunk.attempts - rowsWritten += chunk.rowsWritten ?? 0 - if (chunk.status === 'pending') summary.pending += 1 - if (chunk.status === 'running') summary.running += 1 - if (chunk.status === 'done') summary.done += 1 - if (chunk.status === 'failed') summary.failed += 1 - if (chunk.status === 'skipped') summary.skipped += 1 + for (const chunk of plan.chunks) { + const state = run.progress[chunk.id] + if (!state) { + totals.pending += 1 + continue + } + rowsWritten += state.writtenRows ?? 0 + if (state.status === 'pending') totals.pending += 1 + else if (state.status === 'submitted') totals.submitted += 1 + else if (state.status === 'running') totals.running += 1 + else if (state.status === 'done') totals.done += 1 + else if (state.status === 'failed') totals.failed += 1 } return { planId: run.planId, target: run.target, status: run.status, - totals: summary, - attempts, + totals, rowsWritten, updatedAt: run.updatedAt, runPath, - eventPath, lastError: run.lastError, } } - -export async function persistRunAndEvent(input: { - run: BackfillRunState - runPath: string - eventPath: string - event: Record -}): Promise { - input.run.updatedAt = nowIso() - await writeJson(input.runPath, input.run) - await appendEvent(input.eventPath, input.event) -} - -export function ensureRunCompatibility(input: { - run: BackfillRunState - plan: BackfillPlanState - opts: CompatOptions - forceCompatibility: boolean -}): void { - if (!input.run.compatibilityToken) return - const expected = computeCompatibilityToken({ - plan: input.plan, - opts: input.opts, - }) - if (input.run.compatibilityToken === expected) return - if (input.forceCompatibility) return - - throw new BackfillConfigError( - `Run compatibility check failed for plan ${input.plan.planId}. Runtime options changed since last checkpoint. Retry with --force-compatibility to acknowledge override.` - ) -} diff --git a/packages/plugin-backfill/src/types.ts b/packages/plugin-backfill/src/types.ts index d03243d..f3b50da 100644 --- a/packages/plugin-backfill/src/types.ts +++ b/packages/plugin-backfill/src/types.ts @@ -1,5 +1,6 @@ import type { ChxInlinePluginRegistration, ResolvedChxConfig } from '@chkit/core' +import type { BackfillProgress } from './async-backfill.js' import type { PartitionInfo, SortKeyInfo } from './chunking/types.js' import type { PluginConfig } from './options.js' @@ -65,34 +66,15 @@ export interface BackfillPlanState { } } -export interface BackfillRunChunkState { - id: string - from: string - to: string - status: 'pending' | 'running' | 'done' | 'failed' | 'skipped' - attempts: number - idempotencyToken: string - sqlTemplate: string - startedAt?: string - completedAt?: string - lastError?: string - rowsWritten?: number -} - export interface BackfillRunState { planId: string target: string - status: BackfillPlanStatus - createdAt: string + status: 'running' | 'completed' | 'failed' | 'cancelled' startedAt: string updatedAt: string completedAt?: string lastError?: string - replayDone: boolean - replayFailed: boolean - compatibilityToken: string - options: BackfillPlanState['options'] - chunks: BackfillRunChunkState[] + progress: BackfillProgress } export interface BackfillStatusSummary { @@ -102,16 +84,14 @@ export interface BackfillStatusSummary { totals: { total: number pending: number + submitted: number running: number done: number failed: number - skipped: number } - attempts: number rowsWritten: number updatedAt: string runPath: string - eventPath: string lastError?: string } @@ -158,10 +138,8 @@ export interface BackfillPathSet { stateDir: string plansDir: string runsDir: string - eventsDir: string planPath: string runPath: string - eventPath: string } export interface BackfillDoctorReport { @@ -172,14 +150,6 @@ export interface BackfillDoctorReport { failedChunkIds: string[] } -export interface ExecuteBackfillRunOutput { - run: BackfillRunState - status: BackfillStatusSummary - runPath: string - eventPath: string - noop?: boolean -} - export interface BackfillPluginCommandContext { args: string[] flags: Record