Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/obsessiondb-query-routing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"chkit": patch
"@chkit/plugin-obsessiondb": patch
---

Add query routing through ObsessionDB: core commands (migrate, status, drift, check) use a plugin-provided ClickHouse executor when authenticated with a selected service. Adds `getContext` plugin hook, per-project service binding during login, `select-service` command, and remote executor that proxies queries through the ObsessionDB API.
1 change: 1 addition & 0 deletions packages/cli/src/bin/chkit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ async function main(): Promise<void> {

const initCtx = {
command: commandName,
configPath,
isInteractive: process.stdin.isTTY === true && process.stderr.isTTY === true,
jsonMode: argv.includes('--json'),
}
Expand Down
42 changes: 32 additions & 10 deletions packages/cli/src/bin/command-dispatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,25 @@ async function runCoreOrBuiltinCommand(input: {

const dirs = resolveDirs(input.config)
if (!input.resolved.run) throw new Error(`Command '${input.commandName}' has no run handler`)
await input.resolved.run({
command: input.commandName,
flags,
const ctx = await input.pluginRuntime.resolveContext({
config: input.config,
configPath: input.configPath,
dirs,
pluginRuntime: input.pluginRuntime,
command: input.commandName,
flags,
})
try {
await input.resolved.run({
command: input.commandName,
flags,
config: input.config,
configPath: input.configPath,
dirs,
pluginRuntime: input.pluginRuntime,
ctx,
})
} finally {
await input.pluginRuntime.disposeContext(ctx)
}
return
}

Expand All @@ -171,14 +182,25 @@ async function runCoreOrBuiltinCommand(input: {

const dirs = resolveDirs(input.config)
if (!input.resolved.run) throw new Error(`Command '${input.commandName}' has no run handler`)
await input.resolved.run({
command: input.commandName,
flags,
const ctx = await input.pluginRuntime.resolveContext({
config: input.config,
configPath: input.configPath,
dirs,
pluginRuntime: input.pluginRuntime,
command: input.commandName,
flags,
})
try {
await input.resolved.run({
command: input.commandName,
flags,
config: input.config,
configPath: input.configPath,
dirs,
pluginRuntime: input.pluginRuntime,
ctx,
})
} finally {
await input.pluginRuntime.disposeContext(ctx)
}
}

export async function runResolvedCommand(input: {
Expand Down
16 changes: 8 additions & 8 deletions packages/cli/src/bin/commands/check.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { mkdir } from 'node:fs/promises'

import { summarizeDriftReasons } from '../../drift.js'
import { typedFlags, type CommandDef, type CommandRunContext } from '../../plugins.js'
import { withClickHouseExecutor } from '../clickhouse-resource.js'
import { GLOBAL_FLAGS } from '../global-flags.js'
import { emitJson } from '../json-output.js'
import { createJournalStore } from '../journal-store.js'
Expand All @@ -19,21 +18,22 @@ export const checkCommand: CommandDef = {
run: cmdCheck,
}

async function cmdCheck(ctx: CommandRunContext): Promise<void> {
const { flags, config, configPath, dirs, pluginRuntime } = ctx
async function cmdCheck(runCtx: CommandRunContext): Promise<void> {
const { flags, config, configPath, dirs, pluginRuntime, ctx } = runCtx
const f = typedFlags(flags, [...GLOBAL_FLAGS, { name: '--strict', type: 'boolean', description: 'Enable all policy checks' }] as const)
const strict = f['--strict'] === true
const jsonMode = f['--json'] === true
const tableSelector = f['--table']
const { migrationsDir, metaDir } = dirs
await mkdir(migrationsDir, { recursive: true })

if (!config.clickhouse) {
if (!ctx.hasExecutor) {
throw new Error('clickhouse config is required for check (journal is stored in ClickHouse)')
}
const db = ctx.executor
const database = config.clickhouse?.database

const database = config.clickhouse.database
await withClickHouseExecutor(config.clickhouse, async (db) => {
{
const journalStore = createJournalStore(db)

const files = await listMigrations(migrationsDir)
Expand Down Expand Up @@ -66,7 +66,7 @@ async function cmdCheck(ctx: CommandRunContext): Promise<void> {
}
return
}
const drift = snapshot ? await buildDriftPayload(config, metaDir, snapshot, tableScope) : null
const drift = snapshot ? await buildDriftPayload(config, metaDir, snapshot, tableScope, db) : null

const policy = {
failOnPending: strict ? true : config.check?.failOnPending ?? true,
Expand Down Expand Up @@ -187,5 +187,5 @@ async function cmdCheck(ctx: CommandRunContext): Promise<void> {
console.log(`Failed checks: ${failedChecks.join(', ')}`)
process.exitCode = 1
}
})
}
}
175 changes: 89 additions & 86 deletions packages/cli/src/bin/commands/drift.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import { join } from 'node:path'

import { isUnknownDatabaseError, type SchemaObjectRef } from '@chkit/clickhouse'
import { isUnknownDatabaseError, type ClickHouseExecutor, type SchemaObjectRef } from '@chkit/clickhouse'
import type { ChxConfig, Snapshot, TableDefinition } from '@chkit/core'

import { typedFlags, type CommandDef, type CommandRunContext } from '../../plugins.js'
import { withClickHouseExecutor } from '../clickhouse-resource.js'
import { GLOBAL_FLAGS } from '../global-flags.js'
import {
compareSchemaObjects,
Expand Down Expand Up @@ -42,101 +41,102 @@ export async function buildDriftPayload(
config: ChxConfig,
metaDir: string,
snapshot: Snapshot,
scope?: TableScope
scope?: TableScope,
executor?: ClickHouseExecutor
): Promise<DriftPayload> {
const selectedTables =
scope?.enabled && scope.matchCount > 0 ? new Set(scope.matchedTables) : undefined
if (!config.clickhouse) throw new Error('clickhouse config is required for drift checks')
return withClickHouseExecutor(config.clickhouse, async (db) => {
let actualObjects: SchemaObjectRef[]
try {
actualObjects = await db.listSchemaObjects()
} catch (error) {
if (isUnknownDatabaseError(error)) {
const allExpected = snapshot.definitions
.filter((def) => {
if (!selectedTables) return true
if (def.kind !== 'table') return true
return selectedTables.has(`${def.database}.${def.name}`)
})
.map((def) => `${def.database}.${def.name}`)
return {
scope,
snapshotFile: join(metaDir, 'snapshot.json'),
expectedCount: allExpected.length,
actualCount: 0,
drifted: allExpected.length > 0,
databaseMissing: true,
database: config.clickhouse?.database,
missing: allExpected,
extra: [],
kindMismatches: [],
objectDrift: [],
tableDrift: [],
}
if (!executor) throw new Error('clickhouse config is required for drift checks')
const db = executor

let actualObjects: SchemaObjectRef[]
try {
actualObjects = await db.listSchemaObjects()
} catch (error) {
if (isUnknownDatabaseError(error)) {
const allExpected = snapshot.definitions
.filter((def) => {
if (!selectedTables) return true
if (def.kind !== 'table') return true
return selectedTables.has(`${def.database}.${def.name}`)
})
.map((def) => `${def.database}.${def.name}`)
return {
scope,
snapshotFile: join(metaDir, 'snapshot.json'),
expectedCount: allExpected.length,
actualCount: 0,
drifted: allExpected.length > 0,
databaseMissing: true,
database: config.clickhouse?.database,
missing: allExpected,
extra: [],
kindMismatches: [],
objectDrift: [],
tableDrift: [],
}
throw error
}
const expectedObjects = snapshot.definitions
.filter((def) => {
if (!selectedTables) return true
if (def.kind !== 'table') return true
return selectedTables.has(`${def.database}.${def.name}`)
})
.map((def) => ({
kind: def.kind,
database: def.database,
name: def.name,
}))
const expectedDatabases = new Set(expectedObjects.map((def) => def.database))
const actualInScope = actualObjects.filter((item) => expectedDatabases.has(item.database))

const { missing, extra, kindMismatches, objectDrift } = compareSchemaObjects(
expectedObjects,
actualInScope
)

const expectedTables = snapshot.definitions.filter((def): def is TableDefinition => {
if (def.kind !== 'table') return false
throw error
}
const expectedObjects = snapshot.definitions
.filter((def) => {
if (!selectedTables) return true
if (def.kind !== 'table') return true
return selectedTables.has(`${def.database}.${def.name}`)
})
const expectedTableMap = new Map(
expectedTables.map((table) => [`${table.database}.${table.name}`, table])
)
const actualTables = await db.listTableDetails([...expectedDatabases])
const tableDrift = actualTables
.map((actual) => {
const expected = expectedTableMap.get(`${actual.database}.${actual.name}`)
if (!expected) return null
return compareTableShape(expected, actual)
})
.filter((item): item is NonNullable<typeof item> => item !== null)
.sort((a, b) => a.table.localeCompare(b.table))
.map((def) => ({
kind: def.kind,
database: def.database,
name: def.name,
}))
const expectedDatabases = new Set(expectedObjects.map((def) => def.database))
const actualInScope = actualObjects.filter((item) => expectedDatabases.has(item.database))

const drifted =
missing.length > 0 ||
extra.length > 0 ||
kindMismatches.length > 0 ||
objectDrift.length > 0 ||
tableDrift.length > 0
return {
scope,
snapshotFile: join(metaDir, 'snapshot.json'),
expectedCount: expectedObjects.length,
actualCount: actualInScope.length,
drifted,
missing,
extra,
kindMismatches,
objectDrift,
tableDrift,
}
const { missing, extra, kindMismatches, objectDrift } = compareSchemaObjects(
expectedObjects,
actualInScope
)

const expectedTables = snapshot.definitions.filter((def): def is TableDefinition => {
if (def.kind !== 'table') return false
if (!selectedTables) return true
return selectedTables.has(`${def.database}.${def.name}`)
})
const expectedTableMap = new Map(
expectedTables.map((table) => [`${table.database}.${table.name}`, table])
)
const actualTables = await db.listTableDetails([...expectedDatabases])
const tableDrift = actualTables
.map((actual) => {
const expected = expectedTableMap.get(`${actual.database}.${actual.name}`)
if (!expected) return null
return compareTableShape(expected, actual)
})
.filter((item): item is NonNullable<typeof item> => item !== null)
.sort((a, b) => a.table.localeCompare(b.table))

const drifted =
missing.length > 0 ||
extra.length > 0 ||
kindMismatches.length > 0 ||
objectDrift.length > 0 ||
tableDrift.length > 0
return {
scope,
snapshotFile: join(metaDir, 'snapshot.json'),
expectedCount: expectedObjects.length,
actualCount: actualInScope.length,
drifted,
missing,
extra,
kindMismatches,
objectDrift,
tableDrift,
}
}

async function cmdDrift(ctx: CommandRunContext): Promise<void> {
const { flags, config, dirs } = ctx
async function cmdDrift(runCtx: CommandRunContext): Promise<void> {
const { flags, config, dirs, ctx } = runCtx
const f = typedFlags(flags, GLOBAL_FLAGS)
const tableSelector = f['--table']
const jsonMode = f['--json'] === true
Expand Down Expand Up @@ -169,7 +169,10 @@ async function cmdDrift(ctx: CommandRunContext): Promise<void> {
console.log(`No tables matched selector "${scope.selector ?? ''}". Drift check is a no-op.`)
return
}
const payload = await buildDriftPayload(config, metaDir, snapshot, scope)
if (!ctx.hasExecutor) {
throw new Error('clickhouse config is required for drift checks')
}
const payload = await buildDriftPayload(config, metaDir, snapshot, scope, ctx.executor)

if (jsonMode) {
emitJson('drift', payload)
Expand Down
12 changes: 6 additions & 6 deletions packages/cli/src/bin/commands/migrate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import { createInterface } from 'node:readline/promises'

import { defineFlags, typedFlags, type CommandDef, type CommandRunContext } from '../../plugins.js'
import { waitForDDLPropagation } from '@chkit/clickhouse'
import { withClickHouseExecutor } from '../clickhouse-resource.js'
import { GLOBAL_FLAGS } from '../global-flags.js'
import { emitJson } from '../json-output.js'
import { createJournalStore } from '../journal-store.js'
Expand Down Expand Up @@ -95,8 +94,8 @@ async function filterPendingByScope(
return inScope
}

async function cmdMigrate(ctx: CommandRunContext): Promise<void> {
const { flags, config, configPath, dirs, pluginRuntime } = ctx
async function cmdMigrate(runCtx: CommandRunContext): Promise<void> {
const { flags, config, configPath, dirs, pluginRuntime, ctx } = runCtx
const f = typedFlags(flags, [...GLOBAL_FLAGS, ...MIGRATE_FLAGS] as const)
const executeRequested = f['--apply'] === true || f['--execute'] === true
const allowDestructive = f['--allow-destructive'] === true
Expand All @@ -105,11 +104,12 @@ async function cmdMigrate(ctx: CommandRunContext): Promise<void> {

const { migrationsDir, metaDir } = dirs

if (!config.clickhouse) {
if (!ctx.hasExecutor) {
throw new Error('clickhouse config is required for migrate (journal is stored in ClickHouse)')
}
const db = ctx.executor

await withClickHouseExecutor(config.clickhouse, async (db) => {
{
const journalStore = createJournalStore(db)
const snapshot = await readSnapshot(metaDir)
const tableScope = resolveTableScope(tableSelector, tableKeysFromDefinitions(snapshot?.definitions ?? []))
Expand Down Expand Up @@ -327,5 +327,5 @@ async function cmdMigrate(ctx: CommandRunContext): Promise<void> {
}

console.log(`\nMigrations recorded in ClickHouse _chkit_migrations table.`)
})
}
}
Loading
Loading