diff --git a/.changeset/obsessiondb-query-routing.md b/.changeset/obsessiondb-query-routing.md new file mode 100644 index 0000000..cf11ee0 --- /dev/null +++ b/.changeset/obsessiondb-query-routing.md @@ -0,0 +1,6 @@ +--- +"chkit": patch +"@chkit/plugin-obsessiondb": patch +--- + +Add query routing through ObsessionDB: core commands (migrate, status, drift, check) use a plugin-provided ClickHouse executor when authenticated with a selected service. Adds `getContext` plugin hook, per-project service binding during login, `select-service` command, and remote executor that proxies queries through the ObsessionDB API. diff --git a/packages/cli/src/bin/chkit.ts b/packages/cli/src/bin/chkit.ts index 67f1c6f..cc3ec5b 100644 --- a/packages/cli/src/bin/chkit.ts +++ b/packages/cli/src/bin/chkit.ts @@ -120,6 +120,7 @@ async function main(): Promise { const initCtx = { command: commandName, + configPath, isInteractive: process.stdin.isTTY === true && process.stderr.isTTY === true, jsonMode: argv.includes('--json'), } diff --git a/packages/cli/src/bin/command-dispatch.ts b/packages/cli/src/bin/command-dispatch.ts index 0acc540..73d9831 100644 --- a/packages/cli/src/bin/command-dispatch.ts +++ b/packages/cli/src/bin/command-dispatch.ts @@ -154,14 +154,25 @@ async function runCoreOrBuiltinCommand(input: { const dirs = resolveDirs(input.config) if (!input.resolved.run) throw new Error(`Command '${input.commandName}' has no run handler`) - await input.resolved.run({ - command: input.commandName, - flags, + const ctx = await input.pluginRuntime.resolveContext({ config: input.config, configPath: input.configPath, - dirs, - pluginRuntime: input.pluginRuntime, + command: input.commandName, + flags, }) + try { + await input.resolved.run({ + command: input.commandName, + flags, + config: input.config, + configPath: input.configPath, + dirs, + pluginRuntime: input.pluginRuntime, + ctx, + }) + } finally { + await input.pluginRuntime.disposeContext(ctx) + } return } @@ -171,14 +182,25 @@ async function runCoreOrBuiltinCommand(input: { const dirs = resolveDirs(input.config) if (!input.resolved.run) throw new Error(`Command '${input.commandName}' has no run handler`) - await input.resolved.run({ - command: input.commandName, - flags, + const ctx = await input.pluginRuntime.resolveContext({ config: input.config, configPath: input.configPath, - dirs, - pluginRuntime: input.pluginRuntime, + command: input.commandName, + flags, }) + try { + await input.resolved.run({ + command: input.commandName, + flags, + config: input.config, + configPath: input.configPath, + dirs, + pluginRuntime: input.pluginRuntime, + ctx, + }) + } finally { + await input.pluginRuntime.disposeContext(ctx) + } } export async function runResolvedCommand(input: { diff --git a/packages/cli/src/bin/commands/check.ts b/packages/cli/src/bin/commands/check.ts index 89d68ff..5e0e94d 100644 --- a/packages/cli/src/bin/commands/check.ts +++ b/packages/cli/src/bin/commands/check.ts @@ -2,7 +2,6 @@ import { mkdir } from 'node:fs/promises' import { summarizeDriftReasons } from '../../drift.js' import { typedFlags, type CommandDef, type CommandRunContext } from '../../plugins.js' -import { withClickHouseExecutor } from '../clickhouse-resource.js' import { GLOBAL_FLAGS } from '../global-flags.js' import { emitJson } from '../json-output.js' import { createJournalStore } from '../journal-store.js' @@ -19,8 +18,8 @@ export const checkCommand: CommandDef = { run: cmdCheck, } -async function cmdCheck(ctx: CommandRunContext): Promise { - const { flags, config, configPath, dirs, pluginRuntime } = ctx +async function cmdCheck(runCtx: CommandRunContext): Promise { + const { flags, config, configPath, dirs, pluginRuntime, ctx } = runCtx const f = typedFlags(flags, [...GLOBAL_FLAGS, { name: '--strict', type: 'boolean', description: 'Enable all policy checks' }] as const) const strict = f['--strict'] === true const jsonMode = f['--json'] === true @@ -28,12 +27,13 @@ async function cmdCheck(ctx: CommandRunContext): Promise { const { migrationsDir, metaDir } = dirs await mkdir(migrationsDir, { recursive: true }) - if (!config.clickhouse) { + if (!ctx.hasExecutor) { throw new Error('clickhouse config is required for check (journal is stored in ClickHouse)') } + const db = ctx.executor + const database = config.clickhouse?.database - const database = config.clickhouse.database - await withClickHouseExecutor(config.clickhouse, async (db) => { + { const journalStore = createJournalStore(db) const files = await listMigrations(migrationsDir) @@ -66,7 +66,7 @@ async function cmdCheck(ctx: CommandRunContext): Promise { } return } - const drift = snapshot ? await buildDriftPayload(config, metaDir, snapshot, tableScope) : null + const drift = snapshot ? await buildDriftPayload(config, metaDir, snapshot, tableScope, db) : null const policy = { failOnPending: strict ? true : config.check?.failOnPending ?? true, @@ -187,5 +187,5 @@ async function cmdCheck(ctx: CommandRunContext): Promise { console.log(`Failed checks: ${failedChecks.join(', ')}`) process.exitCode = 1 } - }) + } } diff --git a/packages/cli/src/bin/commands/drift.ts b/packages/cli/src/bin/commands/drift.ts index 48e0df4..f1cad00 100644 --- a/packages/cli/src/bin/commands/drift.ts +++ b/packages/cli/src/bin/commands/drift.ts @@ -1,10 +1,9 @@ import { join } from 'node:path' -import { isUnknownDatabaseError, type SchemaObjectRef } from '@chkit/clickhouse' +import { isUnknownDatabaseError, type ClickHouseExecutor, type SchemaObjectRef } from '@chkit/clickhouse' import type { ChxConfig, Snapshot, TableDefinition } from '@chkit/core' import { typedFlags, type CommandDef, type CommandRunContext } from '../../plugins.js' -import { withClickHouseExecutor } from '../clickhouse-resource.js' import { GLOBAL_FLAGS } from '../global-flags.js' import { compareSchemaObjects, @@ -42,101 +41,102 @@ export async function buildDriftPayload( config: ChxConfig, metaDir: string, snapshot: Snapshot, - scope?: TableScope + scope?: TableScope, + executor?: ClickHouseExecutor ): Promise { const selectedTables = scope?.enabled && scope.matchCount > 0 ? new Set(scope.matchedTables) : undefined - if (!config.clickhouse) throw new Error('clickhouse config is required for drift checks') - return withClickHouseExecutor(config.clickhouse, async (db) => { - let actualObjects: SchemaObjectRef[] - try { - actualObjects = await db.listSchemaObjects() - } catch (error) { - if (isUnknownDatabaseError(error)) { - const allExpected = snapshot.definitions - .filter((def) => { - if (!selectedTables) return true - if (def.kind !== 'table') return true - return selectedTables.has(`${def.database}.${def.name}`) - }) - .map((def) => `${def.database}.${def.name}`) - return { - scope, - snapshotFile: join(metaDir, 'snapshot.json'), - expectedCount: allExpected.length, - actualCount: 0, - drifted: allExpected.length > 0, - databaseMissing: true, - database: config.clickhouse?.database, - missing: allExpected, - extra: [], - kindMismatches: [], - objectDrift: [], - tableDrift: [], - } + if (!executor) throw new Error('clickhouse config is required for drift checks') + const db = executor + + let actualObjects: SchemaObjectRef[] + try { + actualObjects = await db.listSchemaObjects() + } catch (error) { + if (isUnknownDatabaseError(error)) { + const allExpected = snapshot.definitions + .filter((def) => { + if (!selectedTables) return true + if (def.kind !== 'table') return true + return selectedTables.has(`${def.database}.${def.name}`) + }) + .map((def) => `${def.database}.${def.name}`) + return { + scope, + snapshotFile: join(metaDir, 'snapshot.json'), + expectedCount: allExpected.length, + actualCount: 0, + drifted: allExpected.length > 0, + databaseMissing: true, + database: config.clickhouse?.database, + missing: allExpected, + extra: [], + kindMismatches: [], + objectDrift: [], + tableDrift: [], } - throw error } - const expectedObjects = snapshot.definitions - .filter((def) => { - if (!selectedTables) return true - if (def.kind !== 'table') return true - return selectedTables.has(`${def.database}.${def.name}`) - }) - .map((def) => ({ - kind: def.kind, - database: def.database, - name: def.name, - })) - const expectedDatabases = new Set(expectedObjects.map((def) => def.database)) - const actualInScope = actualObjects.filter((item) => expectedDatabases.has(item.database)) - - const { missing, extra, kindMismatches, objectDrift } = compareSchemaObjects( - expectedObjects, - actualInScope - ) - - const expectedTables = snapshot.definitions.filter((def): def is TableDefinition => { - if (def.kind !== 'table') return false + throw error + } + const expectedObjects = snapshot.definitions + .filter((def) => { if (!selectedTables) return true + if (def.kind !== 'table') return true return selectedTables.has(`${def.database}.${def.name}`) }) - const expectedTableMap = new Map( - expectedTables.map((table) => [`${table.database}.${table.name}`, table]) - ) - const actualTables = await db.listTableDetails([...expectedDatabases]) - const tableDrift = actualTables - .map((actual) => { - const expected = expectedTableMap.get(`${actual.database}.${actual.name}`) - if (!expected) return null - return compareTableShape(expected, actual) - }) - .filter((item): item is NonNullable => item !== null) - .sort((a, b) => a.table.localeCompare(b.table)) + .map((def) => ({ + kind: def.kind, + database: def.database, + name: def.name, + })) + const expectedDatabases = new Set(expectedObjects.map((def) => def.database)) + const actualInScope = actualObjects.filter((item) => expectedDatabases.has(item.database)) - const drifted = - missing.length > 0 || - extra.length > 0 || - kindMismatches.length > 0 || - objectDrift.length > 0 || - tableDrift.length > 0 - return { - scope, - snapshotFile: join(metaDir, 'snapshot.json'), - expectedCount: expectedObjects.length, - actualCount: actualInScope.length, - drifted, - missing, - extra, - kindMismatches, - objectDrift, - tableDrift, - } + const { missing, extra, kindMismatches, objectDrift } = compareSchemaObjects( + expectedObjects, + actualInScope + ) + + const expectedTables = snapshot.definitions.filter((def): def is TableDefinition => { + if (def.kind !== 'table') return false + if (!selectedTables) return true + return selectedTables.has(`${def.database}.${def.name}`) }) + const expectedTableMap = new Map( + expectedTables.map((table) => [`${table.database}.${table.name}`, table]) + ) + const actualTables = await db.listTableDetails([...expectedDatabases]) + const tableDrift = actualTables + .map((actual) => { + const expected = expectedTableMap.get(`${actual.database}.${actual.name}`) + if (!expected) return null + return compareTableShape(expected, actual) + }) + .filter((item): item is NonNullable => item !== null) + .sort((a, b) => a.table.localeCompare(b.table)) + + const drifted = + missing.length > 0 || + extra.length > 0 || + kindMismatches.length > 0 || + objectDrift.length > 0 || + tableDrift.length > 0 + return { + scope, + snapshotFile: join(metaDir, 'snapshot.json'), + expectedCount: expectedObjects.length, + actualCount: actualInScope.length, + drifted, + missing, + extra, + kindMismatches, + objectDrift, + tableDrift, + } } -async function cmdDrift(ctx: CommandRunContext): Promise { - const { flags, config, dirs } = ctx +async function cmdDrift(runCtx: CommandRunContext): Promise { + const { flags, config, dirs, ctx } = runCtx const f = typedFlags(flags, GLOBAL_FLAGS) const tableSelector = f['--table'] const jsonMode = f['--json'] === true @@ -169,7 +169,10 @@ async function cmdDrift(ctx: CommandRunContext): Promise { console.log(`No tables matched selector "${scope.selector ?? ''}". Drift check is a no-op.`) return } - const payload = await buildDriftPayload(config, metaDir, snapshot, scope) + if (!ctx.hasExecutor) { + throw new Error('clickhouse config is required for drift checks') + } + const payload = await buildDriftPayload(config, metaDir, snapshot, scope, ctx.executor) if (jsonMode) { emitJson('drift', payload) diff --git a/packages/cli/src/bin/commands/migrate.ts b/packages/cli/src/bin/commands/migrate.ts index 282add7..4aba62b 100644 --- a/packages/cli/src/bin/commands/migrate.ts +++ b/packages/cli/src/bin/commands/migrate.ts @@ -5,7 +5,6 @@ import { createInterface } from 'node:readline/promises' import { defineFlags, typedFlags, type CommandDef, type CommandRunContext } from '../../plugins.js' import { waitForDDLPropagation } from '@chkit/clickhouse' -import { withClickHouseExecutor } from '../clickhouse-resource.js' import { GLOBAL_FLAGS } from '../global-flags.js' import { emitJson } from '../json-output.js' import { createJournalStore } from '../journal-store.js' @@ -95,8 +94,8 @@ async function filterPendingByScope( return inScope } -async function cmdMigrate(ctx: CommandRunContext): Promise { - const { flags, config, configPath, dirs, pluginRuntime } = ctx +async function cmdMigrate(runCtx: CommandRunContext): Promise { + const { flags, config, configPath, dirs, pluginRuntime, ctx } = runCtx const f = typedFlags(flags, [...GLOBAL_FLAGS, ...MIGRATE_FLAGS] as const) const executeRequested = f['--apply'] === true || f['--execute'] === true const allowDestructive = f['--allow-destructive'] === true @@ -105,11 +104,12 @@ async function cmdMigrate(ctx: CommandRunContext): Promise { const { migrationsDir, metaDir } = dirs - if (!config.clickhouse) { + if (!ctx.hasExecutor) { throw new Error('clickhouse config is required for migrate (journal is stored in ClickHouse)') } + const db = ctx.executor - await withClickHouseExecutor(config.clickhouse, async (db) => { + { const journalStore = createJournalStore(db) const snapshot = await readSnapshot(metaDir) const tableScope = resolveTableScope(tableSelector, tableKeysFromDefinitions(snapshot?.definitions ?? [])) @@ -327,5 +327,5 @@ async function cmdMigrate(ctx: CommandRunContext): Promise { } console.log(`\nMigrations recorded in ClickHouse _chkit_migrations table.`) - }) + } } diff --git a/packages/cli/src/bin/commands/status.ts b/packages/cli/src/bin/commands/status.ts index 7fa190f..c258299 100644 --- a/packages/cli/src/bin/commands/status.ts +++ b/packages/cli/src/bin/commands/status.ts @@ -1,7 +1,6 @@ import { mkdir } from 'node:fs/promises' import type { CommandDef, CommandRunContext } from '../../plugins.js' -import { withClickHouseExecutor } from '../clickhouse-resource.js' import { emitJson } from '../json-output.js' import { createJournalStore } from '../journal-store.js' import { findChecksumMismatches, listMigrations } from '../migration-store.js' @@ -13,60 +12,58 @@ export const statusCommand: CommandDef = { run: cmdStatus, } -async function cmdStatus(ctx: CommandRunContext): Promise { - const { flags, config, dirs } = ctx +async function cmdStatus(runCtx: CommandRunContext): Promise { + const { flags, config, dirs, ctx } = runCtx const jsonMode = flags['--json'] === true const { migrationsDir } = dirs - if (!config.clickhouse) { + if (!ctx.hasExecutor) { throw new Error('clickhouse config is required for status (journal is stored in ClickHouse)') } + const db = ctx.executor + const database = config.clickhouse?.database + const journalStore = createJournalStore(db) - const database = config.clickhouse.database - await withClickHouseExecutor(config.clickhouse, async (db) => { - const journalStore = createJournalStore(db) + await mkdir(migrationsDir, { recursive: true }) + const files = await listMigrations(migrationsDir) + const journal = await journalStore.readJournal() + const appliedNames = new Set(journal.applied.map((entry) => entry.name)) + const pending = files.filter((f) => !appliedNames.has(f)) + const checksumMismatches = await findChecksumMismatches(migrationsDir, journal) - await mkdir(migrationsDir, { recursive: true }) - const files = await listMigrations(migrationsDir) - const journal = await journalStore.readJournal() - const appliedNames = new Set(journal.applied.map((entry) => entry.name)) - const pending = files.filter((f) => !appliedNames.has(f)) - const checksumMismatches = await findChecksumMismatches(migrationsDir, journal) - - const databaseMissing = journalStore.databaseMissing - const payload = { - migrationsDir, - total: files.length, - applied: journal.applied.length, - pending: pending.length, - pendingMigrations: pending, - checksumMismatchCount: checksumMismatches.length, - checksumMismatches, - ...(databaseMissing ? { databaseMissing: true, database } : {}), - } + const databaseMissing = journalStore.databaseMissing + const payload = { + migrationsDir, + total: files.length, + applied: journal.applied.length, + pending: pending.length, + pendingMigrations: pending, + checksumMismatchCount: checksumMismatches.length, + checksumMismatches, + ...(databaseMissing ? { databaseMissing: true, database } : {}), + } - if (jsonMode) { - emitJson('status', payload) - return - } + if (jsonMode) { + emitJson('status', payload) + return + } - if (databaseMissing) { - console.log(`\u26A0 Database "${database}" does not exist on the target server.`) - console.log(' It will be created when you run: chkit migrate --apply\n') - } + if (databaseMissing) { + console.log(`\u26A0 Database "${database ?? ''}" does not exist on the target server.`) + console.log(' It will be created when you run: chkit migrate --apply\n') + } - console.log(`Migrations directory: ${migrationsDir}`) - console.log(`Total migrations: ${files.length}`) - console.log(`Applied: ${journal.applied.length}`) - console.log(`Pending: ${pending.length}`) + console.log(`Migrations directory: ${migrationsDir}`) + console.log(`Total migrations: ${files.length}`) + console.log(`Applied: ${journal.applied.length}`) + console.log(`Pending: ${pending.length}`) - if (pending.length > 0) { - console.log('\nPending migrations:') - for (const item of pending) console.log(`- ${item}`) - } - if (checksumMismatches.length > 0) { - console.log('\nChecksum mismatches on applied migrations:') - for (const item of checksumMismatches) console.log(`- ${item.name}`) - } - }) + if (pending.length > 0) { + console.log('\nPending migrations:') + for (const item of pending) console.log(`- ${item}`) + } + if (checksumMismatches.length > 0) { + console.log('\nChecksum mismatches on applied migrations:') + for (const item of checksumMismatches) console.log(`- ${item.name}`) + } } diff --git a/packages/cli/src/bin/plugin-runtime.ts b/packages/cli/src/bin/plugin-runtime.ts index 89b2133..275df4e 100644 --- a/packages/cli/src/bin/plugin-runtime.ts +++ b/packages/cli/src/bin/plugin-runtime.ts @@ -9,8 +9,10 @@ import { type ResolvedChxConfig, type SchemaDefinition, } from '@chkit/core' +import { createClickHouseExecutor, type ClickHouseExecutor } from '@chkit/clickhouse' import type { + ChxGetContextInput, ChxOnCheckContext, ChxOnCheckResult, ChxOnAfterApplyContext, @@ -25,6 +27,7 @@ import type { ChxPlugin, ChxPluginCommand, ChxPluginCommandContext, + PluginContext, } from '../plugins.js' import { isInlinePluginRegistration } from '../plugins.js' import type { TableScope } from './table-scope.js' @@ -43,6 +46,8 @@ const UNFILTERED_TABLE_SCOPE: TableScope = { export interface PluginRuntime { plugins: ReadonlyArray getCommand(pluginName: string, commandName: string): { command: ChxPluginCommand; plugin: LoadedPlugin } | null + resolveContext(input: Omit): Promise + disposeContext(ctx: PluginContext): Promise runOnInit(context: Omit): Promise runOnComplete(context: Omit & { exitCode?: number }): Promise runOnConfigLoaded( @@ -241,8 +246,59 @@ export async function loadPluginRuntime(input: { return { handled: false } } + const NULL_EXECUTOR: ClickHouseExecutor = (() => { + const err = () => { + throw new Error('No ClickHouse connection configured and no plugin provided an executor') + } + return { + command: err, + query: err, + insert: err, + submit: err, + queryStatus: err, + listSchemaObjects: err, + listTableDetails: err, + close: () => Promise.resolve(), + } + })() + return { plugins: loaded, + async resolveContext(input) { + const hasClickhouseConfig = !!input.config.clickhouse + const defaults: PluginContext = { + executor: hasClickhouseConfig + ? createClickHouseExecutor(input.config.clickhouse!) + : NULL_EXECUTOR, + hasExecutor: hasClickhouseConfig, + } + let ctx = defaults + for (const item of loaded) { + const hook = item.plugin.hooks?.getContext + if (!hook) continue + try { + const result = await hook({ ...input, defaults }) + if (result && typeof result === 'object' && 'executor' in result && result.executor) { + // Plugin returned an executor override — close the default one + if (ctx.executor !== defaults.executor) { + // A previous plugin already overrode — close that one + await ctx.executor.close() + } else if (ctx === defaults && defaults.executor !== NULL_EXECUTOR) { + // First override — close the default executor + await defaults.executor.close() + } + ctx = { ...ctx, ...result, hasExecutor: true } + } + } catch (error) { + await ctx.executor.close() + throw formatPluginError(item.plugin.manifest.name, 'getContext', error) + } + } + return ctx + }, + async disposeContext(ctx) { + await ctx.executor.close() + }, getCommand(pluginName, commandName) { const item = byName.get(pluginName) if (!item) return null diff --git a/packages/cli/src/plugins.ts b/packages/cli/src/plugins.ts index 13df641..0313707 100644 --- a/packages/cli/src/plugins.ts +++ b/packages/cli/src/plugins.ts @@ -7,6 +7,7 @@ import type { ResolvedChxConfig, SchemaDefinition, } from '@chkit/core' +import type { ClickHouseExecutor } from '@chkit/clickhouse' import type { PluginRuntime } from './bin/plugin-runtime.js' import type { TableScope } from './bin/table-scope.js' @@ -21,6 +22,20 @@ export { type ParsedFlags, } from '@chkit/core' +export interface PluginContext { + executor: ClickHouseExecutor + /** True when an executor was provided (either from config or a plugin). False when using the null executor. */ + hasExecutor: boolean +} + +export interface ChxGetContextInput { + config: ResolvedChxConfig + configPath: string + command: string + flags: ParsedFlags + defaults: PluginContext +} + export interface CommandDef { name: string description: string @@ -35,6 +50,7 @@ export interface CommandRunContext { configPath: string dirs: { outDir: string; migrationsDir: string; metaDir: string } pluginRuntime: PluginRuntime + ctx: PluginContext } export interface CommandExtension { @@ -88,6 +104,7 @@ export interface ChxOnAfterApplyContext extends ChxPluginHookContextBase { export interface ChxOnInitContext { command: string + configPath: string isInteractive: boolean jsonMode: boolean options: Record @@ -160,6 +177,9 @@ export interface ChxPluginCommand { } export interface ChxPluginHooks { + getContext?: ( + input: ChxGetContextInput + ) => PluginContext | Partial | void | Promise | void> onInit?: (context: ChxOnInitContext) => void | Promise onComplete?: (context: ChxOnCompleteContext) => void | Promise onBeforePluginCommand?: ( diff --git a/packages/plugin-obsessiondb/src/api-request.ts b/packages/plugin-obsessiondb/src/api-request.ts new file mode 100644 index 0000000..4594348 --- /dev/null +++ b/packages/plugin-obsessiondb/src/api-request.ts @@ -0,0 +1,38 @@ +import type { Credentials } from './auth/index.js' + +export class SessionExpiredError extends Error { + constructor() { + super('Session expired. Run `chkit obsessiondb login` to re-authenticate.') + } +} + +export function isSessionExpiredError(error: unknown): boolean { + return error instanceof SessionExpiredError +} + +export async function apiRequest( + path: string, + creds: Credentials, + body?: unknown +): Promise { + const res = await fetch(`${creds.base_url}${path}`, { + method: body !== undefined ? 'POST' : 'GET', + headers: { + Authorization: `Bearer ${creds.access_token}`, + 'Content-Type': 'application/json', + 'User-Agent': 'chkit-cli', + }, + ...(body !== undefined ? { body: JSON.stringify(body) } : {}), + }) + + if (res.status === 401) { + throw new SessionExpiredError() + } + + if (!res.ok) { + const text = await res.text() + throw new Error(`API error: ${res.status} ${text}`) + } + + return (await res.json()) as T +} diff --git a/packages/plugin-obsessiondb/src/auth/commands.ts b/packages/plugin-obsessiondb/src/auth/commands.ts index f601801..4aef16a 100644 --- a/packages/plugin-obsessiondb/src/auth/commands.ts +++ b/packages/plugin-obsessiondb/src/auth/commands.ts @@ -8,6 +8,7 @@ function resolveBaseUrlFromFlags(flags: Record print: (value: unknown) => void } @@ -31,7 +32,7 @@ export const LOGIN_COMMAND: PluginCommand = { ], async run(context) { const baseUrl = resolveBaseUrlFromFlags(context.flags) - return runLogin(baseUrl, (msg) => context.print(msg)) + return runLogin(baseUrl, context.configPath, (msg) => context.print(msg)) }, } diff --git a/packages/plugin-obsessiondb/src/auth/login.ts b/packages/plugin-obsessiondb/src/auth/login.ts index 06130aa..c43d56a 100644 --- a/packages/plugin-obsessiondb/src/auth/login.ts +++ b/packages/plugin-obsessiondb/src/auth/login.ts @@ -2,7 +2,10 @@ import { execFile } from 'node:child_process' import { platform } from 'node:os' import { getSession, pollDeviceToken, requestDeviceCode } from './api-client.js' -import { clearCredentials, loadCredentials, saveCredentials } from './credentials.js' +import { clearCredentials, loadCredentials, resolveBaseUrl, saveCredentials } from './credentials.js' +import { listServices } from '../service/api.js' +import { selectServiceInteractive } from '../service/select.js' +import { loadSelectedService, saveSelectedService } from '../service/storage.js' function openBrowser(url: string): void { const cmd = platform() === 'darwin' ? 'open' : platform() === 'win32' ? 'start' : 'xdg-open' @@ -11,12 +14,46 @@ function openBrowser(url: string): void { }) } -export async function runLogin(baseUrl: string, print: (msg: string) => void): Promise { +async function promptServiceSelection( + baseUrl: string, + token: string, + configPath: string, + print: (msg: string) => void +): Promise { + const effectiveCreds = { access_token: token, base_url: resolveBaseUrl(baseUrl) } + try { + const services = await listServices(effectiveCreds) + const selected = await selectServiceInteractive(services, print) + if (selected) { + await saveSelectedService(configPath, { + service_id: selected.id, + service_name: selected.name, + }) + print(`Service selected: ${selected.name}`) + } + } catch (error) { + const msg = error instanceof Error ? error.message : String(error) + print(`Could not fetch services: ${msg}`) + print('You can select a service later with: chkit obsessiondb select-service') + } +} + +export async function runLogin( + baseUrl: string, + configPath: string, + print: (msg: string) => void +): Promise { const existing = await loadCredentials() if (existing) { try { const session = await getSession(existing.base_url, existing.access_token) print(`Already logged in as ${session.user.email}`) + + // Check if service is selected; if not, prompt + const service = await loadSelectedService(configPath) + if (!service) { + await promptServiceSelection(existing.base_url, existing.access_token, configPath, print) + } return 0 } catch { // Token expired or invalid — proceed with fresh login @@ -40,6 +77,9 @@ export async function runLogin(baseUrl: string, print: (msg: string) => void): P const session = await getSession(baseUrl, token) print(`Logged in as ${session.user.email}`) + // Prompt for service selection after successful auth + await promptServiceSelection(baseUrl, token, configPath, print) + return 0 } diff --git a/packages/plugin-obsessiondb/src/backfill/api-client.ts b/packages/plugin-obsessiondb/src/backfill/api-client.ts index 308a48c..12e6e9a 100644 --- a/packages/plugin-obsessiondb/src/backfill/api-client.ts +++ b/packages/plugin-obsessiondb/src/backfill/api-client.ts @@ -1,4 +1,7 @@ import type { Credentials } from '../auth/index.js' +import { apiRequest, isSessionExpiredError } from '../api-request.js' + +export { isSessionExpiredError } export interface RemotePlanResponse { ok: boolean @@ -33,43 +36,6 @@ export interface RemoteDoctorResponse { [key: string]: unknown } -class SessionExpiredError extends Error { - constructor() { - super('Session expired. Run `chkit obsessiondb login` to re-authenticate.') - } -} - -async function apiRequest( - path: string, - creds: Credentials, - body?: unknown -): Promise { - const res = await fetch(`${creds.base_url}${path}`, { - method: body !== undefined ? 'POST' : 'GET', - headers: { - Authorization: `Bearer ${creds.access_token}`, - 'Content-Type': 'application/json', - 'User-Agent': 'chkit-cli', - }, - ...(body !== undefined ? { body: JSON.stringify(body) } : {}), - }) - - if (res.status === 401) { - throw new SessionExpiredError() - } - - if (!res.ok) { - const text = await res.text() - throw new Error(`Remote backfill API error: ${res.status} ${text}`) - } - - return (await res.json()) as T -} - -export function isSessionExpiredError(error: unknown): boolean { - return error instanceof SessionExpiredError -} - export async function submitBackfillPlan( input: Record, creds: Credentials diff --git a/packages/plugin-obsessiondb/src/index.ts b/packages/plugin-obsessiondb/src/index.ts index 15077db..c25122c 100644 --- a/packages/plugin-obsessiondb/src/index.ts +++ b/packages/plugin-obsessiondb/src/index.ts @@ -4,8 +4,11 @@ import type { SchemaDefinition, } from '@chkit/core' -import { AUTH_COMMANDS, loadCredentials } from './auth/index.js' +import { AUTH_COMMANDS, loadCredentials, resolveBaseUrl } from './auth/index.js' import { BACKFILL_EXTEND_COMMANDS, handleBackfillCommand } from './backfill/index.js' +import { createRemoteExecutor } from './query/remote-executor.js' +import { SELECT_SERVICE_COMMAND } from './service/commands.js' +import { loadSelectedService } from './service/storage.js' export type ObsessionDBPluginOptions = Record @@ -33,6 +36,14 @@ interface BeforePluginCommandResult { exitCode?: number } +interface GetContextInput { + config: ResolvedChxConfig + configPath: string + command: string + flags: Record + defaults: Record +} + interface ObsessionDBPlugin { manifest: { name: 'obsessiondb'; apiVersion: 1 } commands: PluginCommand[] @@ -45,7 +56,8 @@ interface ObsessionDBPlugin { }> }> hooks: { - onInit(context: { command: string; isInteractive: boolean; jsonMode: boolean }): Promise + getContext(input: GetContextInput): Promise<{ executor: unknown } | void> + onInit(context: { command: string; configPath: string; isInteractive: boolean; jsonMode: boolean }): Promise onSchemaLoaded(context: { config: ResolvedChxConfig flags: Record @@ -138,7 +150,7 @@ export function rewriteSharedEngines(definitions: SchemaDefinition[]): { function createObsessionDBPlugin(_options: ObsessionDBPluginOptions): ObsessionDBPlugin { return { manifest: { name: 'obsessiondb', apiVersion: 1 }, - commands: AUTH_COMMANDS as unknown as PluginCommand[], + commands: [...AUTH_COMMANDS, SELECT_SERVICE_COMMAND] as unknown as PluginCommand[], extendCommands: [ { command: ['generate', 'migrate', 'status', 'drift', 'check'], @@ -158,12 +170,31 @@ function createObsessionDBPlugin(_options: ObsessionDBPluginOptions): ObsessionD ...BACKFILL_EXTEND_COMMANDS, ], hooks: { + async getContext({ configPath }) { + const creds = await loadCredentials() + if (!creds) return + const service = await loadSelectedService(configPath) + if (!service) return + const effectiveCreds = { ...creds, base_url: resolveBaseUrl(creds.base_url) } + return { + executor: createRemoteExecutor({ + credentials: effectiveCreds, + serviceId: service.service_id, + }), + } + }, async onInit(context) { if (context.jsonMode) return const creds = await loadCredentials() - if (creds) { + if (!creds) return + const service = await loadSelectedService(context.configPath) + if (service) { + console.log( + `obsessiondb: routing queries through ${service.service_name}`, + ) + } else { console.log( - 'obsessiondb: authenticated, backfill commands will execute remotely (use --local to override)', + 'obsessiondb: authenticated but no service selected (run `chkit obsessiondb select-service`)', ) } }, diff --git a/packages/plugin-obsessiondb/src/query/api-client.ts b/packages/plugin-obsessiondb/src/query/api-client.ts new file mode 100644 index 0000000..24cecfe --- /dev/null +++ b/packages/plugin-obsessiondb/src/query/api-client.ts @@ -0,0 +1,88 @@ +import type { Credentials } from '../auth/index.js' +import { apiRequest } from '../api-request.js' + +function queryPath(serviceId: string, action: string): string { + return `/api/v1/services/${serviceId}/query/${action}` +} + +export async function remoteCommand( + serviceId: string, + sql: string, + creds: Credentials +): Promise { + await apiRequest<{ ok: boolean }>(queryPath(serviceId, 'command'), creds, { sql }) +} + +export async function remoteQuery( + serviceId: string, + sql: string, + creds: Credentials +): Promise { + const res = await apiRequest<{ rows: T[] }>(queryPath(serviceId, 'query'), creds, { sql }) + return res.rows +} + +export async function remoteInsert>( + serviceId: string, + params: { table: string; values: T[] }, + creds: Credentials +): Promise { + await apiRequest<{ ok: boolean }>(queryPath(serviceId, 'insert'), creds, params) +} + +export async function remoteSubmit( + serviceId: string, + sql: string, + creds: Credentials, + queryId?: string +): Promise { + const res = await apiRequest<{ query_id: string }>(queryPath(serviceId, 'submit'), creds, { + sql, + query_id: queryId, + }) + return res.query_id +} + +export async function remoteQueryStatus( + serviceId: string, + queryId: string, + creds: Credentials, + options?: { afterTime?: string } +): Promise<{ + status: 'running' | 'finished' | 'failed' | 'unknown' + readRows?: number + readBytes?: number + writtenRows?: number + writtenBytes?: number + elapsedMs?: number + durationMs?: number + error?: string +}> { + return apiRequest(queryPath(serviceId, 'status'), creds, { + query_id: queryId, + ...options, + }) +} + +export async function remoteListSchemaObjects( + serviceId: string, + creds: Credentials +): Promise> { + const res = await apiRequest<{ + objects: Array<{ kind: 'table' | 'view' | 'materialized_view'; database: string; name: string }> + }>(queryPath(serviceId, 'schema-objects'), creds) + return res.objects +} + +export async function remoteListTableDetails( + serviceId: string, + databases: string[], + creds: Credentials +): Promise { + const res = await apiRequest<{ tables: unknown[] }>( + queryPath(serviceId, 'table-details'), + creds, + { databases } + ) + return res.tables +} diff --git a/packages/plugin-obsessiondb/src/query/remote-executor.ts b/packages/plugin-obsessiondb/src/query/remote-executor.ts new file mode 100644 index 0000000..678c18e --- /dev/null +++ b/packages/plugin-obsessiondb/src/query/remote-executor.ts @@ -0,0 +1,47 @@ +import type { ClickHouseExecutor } from '@chkit/clickhouse' +import type { Credentials } from '../auth/index.js' +import { + remoteCommand, + remoteInsert, + remoteListSchemaObjects, + remoteListTableDetails, + remoteQuery, + remoteQueryStatus, + remoteSubmit, +} from './api-client.js' + +export function createRemoteExecutor(deps: { + credentials: Credentials + serviceId: string +}): ClickHouseExecutor { + const { credentials, serviceId } = deps + + return { + async command(sql) { + await remoteCommand(serviceId, sql, credentials) + }, + async query(sql: string) { + return remoteQuery(serviceId, sql, credentials) + }, + async insert>(params: { table: string; values: T[] }) { + await remoteInsert(serviceId, params, credentials) + }, + async submit(sql, queryId?) { + return remoteSubmit(serviceId, sql, credentials, queryId) + }, + async queryStatus(queryId, options?) { + return remoteQueryStatus(serviceId, queryId, credentials, options) + }, + async listSchemaObjects() { + return remoteListSchemaObjects(serviceId, credentials) + }, + async listTableDetails(databases) { + return remoteListTableDetails(serviceId, databases, credentials) as ReturnType< + ClickHouseExecutor['listTableDetails'] + > + }, + async close() { + // No-op — remote executor has no persistent connection + }, + } +} diff --git a/packages/plugin-obsessiondb/src/service/api.ts b/packages/plugin-obsessiondb/src/service/api.ts new file mode 100644 index 0000000..15e5803 --- /dev/null +++ b/packages/plugin-obsessiondb/src/service/api.ts @@ -0,0 +1,8 @@ +import type { Credentials } from '../auth/index.js' +import { apiRequest } from '../api-request.js' +import type { Service } from './types.js' + +export async function listServices(creds: Credentials): Promise { + const res = await apiRequest<{ services: Service[] }>('/api/v1/services', creds) + return res.services +} diff --git a/packages/plugin-obsessiondb/src/service/commands.ts b/packages/plugin-obsessiondb/src/service/commands.ts new file mode 100644 index 0000000..55a054b --- /dev/null +++ b/packages/plugin-obsessiondb/src/service/commands.ts @@ -0,0 +1,44 @@ +import { loadCredentials, resolveBaseUrl } from '../auth/index.js' +import { listServices } from './api.js' +import { selectServiceInteractive } from './select.js' +import { saveSelectedService } from './storage.js' + +interface PluginCommandContext { + configPath: string + flags: Record + print: (value: unknown) => void +} + +interface PluginCommand { + name: string + description: string + run: (context: PluginCommandContext) => Promise +} + +export const SELECT_SERVICE_COMMAND: PluginCommand = { + name: 'select-service', + description: 'Select an ObsessionDB service for query routing', + async run(context) { + const print = (msg: string) => context.print(msg) + + const creds = await loadCredentials() + if (!creds) { + print('Not logged in. Run `chkit obsessiondb login` to authenticate.') + return 1 + } + + const effectiveCreds = { ...creds, base_url: resolveBaseUrl(creds.base_url) } + + const services = await listServices(effectiveCreds) + const selected = await selectServiceInteractive(services, print) + if (!selected) return 1 + + await saveSelectedService(context.configPath, { + service_id: selected.id, + service_name: selected.name, + }) + + print(`Service selected: ${selected.name}`) + return 0 + }, +} diff --git a/packages/plugin-obsessiondb/src/service/select.ts b/packages/plugin-obsessiondb/src/service/select.ts new file mode 100644 index 0000000..1f1588d --- /dev/null +++ b/packages/plugin-obsessiondb/src/service/select.ts @@ -0,0 +1,37 @@ +import { createInterface } from 'node:readline/promises' + +import type { Service } from './types.js' + +export async function selectServiceInteractive( + services: Service[], + print: (msg: string) => void +): Promise { + if (services.length === 0) { + print('No services found.') + return null + } + + if (services.length === 1) { + const service = services[0]! + print(`Auto-selected service: ${service.name}${service.region ? ` (${service.region})` : ''}`) + return service + } + + print('\nAvailable services:') + for (const [i, service] of services.entries()) { + print(` ${i + 1}. ${service.name}${service.region ? ` (${service.region})` : ''}`) + } + + const rl = createInterface({ input: process.stdin, output: process.stdout }) + try { + const answer = await rl.question(`\nSelect service [1-${services.length}]: `) + const index = parseInt(answer.trim(), 10) - 1 + if (isNaN(index) || index < 0 || index >= services.length) { + print('Invalid selection.') + return null + } + return services[index]! + } finally { + rl.close() + } +} diff --git a/packages/plugin-obsessiondb/src/service/storage.ts b/packages/plugin-obsessiondb/src/service/storage.ts new file mode 100644 index 0000000..10be45a --- /dev/null +++ b/packages/plugin-obsessiondb/src/service/storage.ts @@ -0,0 +1,43 @@ +import { mkdir, readFile, writeFile, unlink } from 'node:fs/promises' +import { dirname, join, resolve } from 'node:path' + +import type { SelectedService } from './types.js' + +export function getServicePath(configPath: string): string { + const configDir = resolve(configPath, '..') + return join(configDir, '.chkit', 'obsessiondb.json') +} + +export async function loadSelectedService(configPath: string): Promise { + try { + const raw = await readFile(getServicePath(configPath), 'utf8') + const parsed = JSON.parse(raw) as unknown + if ( + typeof parsed === 'object' && + parsed !== null && + 'service_id' in parsed && + 'service_name' in parsed && + typeof (parsed as SelectedService).service_id === 'string' && + typeof (parsed as SelectedService).service_name === 'string' + ) { + return parsed as SelectedService + } + return null + } catch { + return null + } +} + +export async function saveSelectedService(configPath: string, service: SelectedService): Promise { + const filePath = getServicePath(configPath) + await mkdir(dirname(filePath), { recursive: true }) + await writeFile(filePath, JSON.stringify(service, null, 2) + '\n') +} + +export async function clearSelectedService(configPath: string): Promise { + try { + await unlink(getServicePath(configPath)) + } catch { + // Already gone — no-op + } +} diff --git a/packages/plugin-obsessiondb/src/service/types.ts b/packages/plugin-obsessiondb/src/service/types.ts new file mode 100644 index 0000000..c7d30e0 --- /dev/null +++ b/packages/plugin-obsessiondb/src/service/types.ts @@ -0,0 +1,10 @@ +export interface Service { + id: string + name: string + region?: string +} + +export interface SelectedService { + service_id: string + service_name: string +}