diff --git a/src/db/connection.ts b/src/db/connection.ts index bc8ef827..cf797ff7 100644 --- a/src/db/connection.ts +++ b/src/db/connection.ts @@ -109,7 +109,7 @@ function isProcessAlive(pid: number): boolean { } } -function acquireAdvisoryLock(dbPath: string): void { +export function acquireAdvisoryLock(dbPath: string): void { const lockPath = `${dbPath}.lock`; try { if (fs.existsSync(lockPath)) { @@ -129,7 +129,7 @@ function acquireAdvisoryLock(dbPath: string): void { } } -function releaseAdvisoryLock(lockPath: string): void { +export function releaseAdvisoryLock(lockPath: string): void { try { const content = fs.readFileSync(lockPath, 'utf-8').trim(); if (Number(content) === process.pid) { diff --git a/src/db/index.ts b/src/db/index.ts index c65f89e5..f7438211 100644 --- a/src/db/index.ts +++ b/src/db/index.ts @@ -2,6 +2,7 @@ export type { LockedDatabase, LockedDatabasePair } from './connection.js'; export { + acquireAdvisoryLock, closeDb, closeDbDeferred, closeDbPair, @@ -13,6 +14,7 @@ export { openReadonlyOrFail, openReadonlyWithNative, openRepo, + releaseAdvisoryLock, } from './connection.js'; export { getBuildMeta, initSchema, MIGRATIONS, setBuildMeta } from './migrations.js'; export { diff --git a/src/domain/graph/builder/context.ts b/src/domain/graph/builder/context.ts index d9f0c004..f1a0eb0a 100644 --- a/src/domain/graph/builder/context.ts +++ b/src/domain/graph/builder/context.ts @@ -35,6 +35,8 @@ export class PipelineContext { nativeDb?: NativeDatabase; /** Whether native engine is available (deferred — DB opened only when needed). */ nativeAvailable: boolean = false; + /** True when ctx.db is a NativeDbProxy — single rusqlite connection for the entire pipeline. */ + nativeFirstProxy: boolean = false; // ── File collection (set by collectFiles stage) ──────────────────── allFiles!: string[]; diff --git a/src/domain/graph/builder/native-db-proxy.ts b/src/domain/graph/builder/native-db-proxy.ts new file mode 100644 index 00000000..13b4295b --- /dev/null +++ b/src/domain/graph/builder/native-db-proxy.ts @@ -0,0 +1,98 @@ +/** + * NativeDbProxy — wraps a NativeDatabase (rusqlite via napi-rs) to satisfy the + * BetterSqlite3Database interface. When the native addon is available, the + * build pipeline uses this proxy as `ctx.db` so that every stage operates on a + * single rusqlite connection — no dual-connection WAL corruption, no + * open/close/reopen dance. + * + * When native is unavailable, the pipeline falls back to real better-sqlite3. + */ + +import type { BetterSqlite3Database, NativeDatabase, SqliteStatement } from '../../../types.js'; + +/** Sanitize params for napi-rs: better-sqlite3 treats `undefined` as NULL, + * but serde_json cannot represent `undefined`. Replace with `null`. */ +function sanitize(params: unknown[]): Array { + return params.map((p) => (p === undefined ? null : p)) as Array; +} + +export class NativeDbProxy implements BetterSqlite3Database { + readonly #ndb: NativeDatabase; + /** Advisory lock path — set by the pipeline so closeDb() can release it. */ + __lockPath?: string; + + constructor(nativeDb: NativeDatabase) { + this.#ndb = nativeDb; + } + + prepare(sql: string): SqliteStatement { + const ndb = this.#ndb; + const stmt: SqliteStatement = { + all(...params: unknown[]): TRow[] { + return ndb.queryAll(sql, sanitize(params)) as TRow[]; + }, + get(...params: unknown[]): TRow | undefined { + return (ndb.queryGet(sql, sanitize(params)) ?? undefined) as TRow | undefined; + }, + run(...params: unknown[]): { changes: number; lastInsertRowid: number | bigint } { + ndb.queryAll(sql, sanitize(params)); + // Retrieve last_insert_rowid via SQLite scalar function so callers + // that depend on it (e.g. CFG block edge mapping) get correct values. + const row = ndb.queryGet('SELECT last_insert_rowid() AS rid', []) as { rid: number } | null; + return { changes: 0, lastInsertRowid: row?.rid ?? 0 }; + }, + iterate(): IterableIterator { + throw new Error('iterate() is not supported via NativeDbProxy'); + }, + raw(): SqliteStatement { + return stmt; // no-op — .raw() is not used in the build pipeline + }, + }; + return stmt; + } + + exec(sql: string): this { + this.#ndb.exec(sql); + return this; + } + + pragma(sql: string): unknown { + return this.#ndb.pragma(sql); + } + + close(): void { + // No-op: the pipeline manages the NativeDatabase lifecycle directly. + // closeDbPair() calls nativeDb.close() separately. + } + + get open(): boolean { + return this.#ndb.isOpen; + } + + get name(): string { + return this.#ndb.dbPath; + } + + transaction any>( + fn: F, + ): (...args: F extends (...a: infer A) => unknown ? A : never) => ReturnType { + const ndb = this.#ndb; + return ((...args: unknown[]) => { + // NOTE: nested transactions (savepoints) are not supported — ensure callers + // do not invoke a transaction() wrapper from within an existing transaction. + ndb.exec('BEGIN'); + try { + const result = fn(...args); + ndb.exec('COMMIT'); + return result; + } catch (e) { + try { + ndb.exec('ROLLBACK'); + } catch { + // Ignore rollback errors — the original error is more important + } + throw e; + } + }) as any; + } +} diff --git a/src/domain/graph/builder/pipeline.ts b/src/domain/graph/builder/pipeline.ts index e32b18e2..65cc17c9 100644 --- a/src/domain/graph/builder/pipeline.ts +++ b/src/domain/graph/builder/pipeline.ts @@ -4,14 +4,17 @@ * This is the heart of the builder refactor (ROADMAP 3.9): the monolithic buildGraph() * is decomposed into independently testable stages that communicate via PipelineContext. */ +import fs from 'node:fs'; import path from 'node:path'; import { performance } from 'node:perf_hooks'; import { + acquireAdvisoryLock, closeDbPair, getBuildMeta, initSchema, MIGRATIONS, openDb, + releaseAdvisoryLock, setBuildMeta, } from '../../../db/index.js'; import { detectWorkspaces, loadConfig } from '../../../infrastructure/config.js'; @@ -25,6 +28,7 @@ import { getActiveEngine } from '../../parser.js'; import { setWorkspaces } from '../resolve.js'; import { PipelineContext } from './context.js'; import { loadPathAliases } from './helpers.js'; +import { NativeDbProxy } from './native-db-proxy.js'; import { buildEdges } from './stages/build-edges.js'; import { buildStructure } from './stages/build-structure.js'; // Pipeline stages @@ -110,14 +114,48 @@ function loadAliases(ctx: PipelineContext): void { function setupPipeline(ctx: PipelineContext): void { ctx.rootDir = path.resolve(ctx.rootDir); ctx.dbPath = path.join(ctx.rootDir, '.codegraph', 'graph.db'); - ctx.db = openDb(ctx.dbPath); - initSchema(ctx.db); - // Detect whether native engine is available, but defer opening NativeDatabase. - // The native orchestrator opens it on demand; the JS pipeline defers until - // after change detection — avoiding ~5ms open+initSchema+close on no-op rebuilds. + // Detect whether native engine is available. const enginePref = ctx.opts.engine || 'auto'; - ctx.nativeAvailable = enginePref !== 'wasm' && !!loadNative()?.NativeDatabase; + const native = enginePref !== 'wasm' ? loadNative() : null; + ctx.nativeAvailable = !!native?.NativeDatabase; + + // Native-first: use only rusqlite for the entire pipeline (no better-sqlite3). + // This eliminates the dual-connection WAL corruption problem and enables all + // native fast-paths (bulkInsertNodes, classifyRolesFull, etc.). + // Fallback: if native is unavailable or FORCE_JS is set, use better-sqlite3. + if ( + ctx.nativeAvailable && + native?.NativeDatabase && + process.env.CODEGRAPH_FORCE_JS_PIPELINE !== '1' + ) { + try { + const dir = path.dirname(ctx.dbPath); + if (!fs.existsSync(dir)) fs.mkdirSync(dir, { recursive: true }); + acquireAdvisoryLock(ctx.dbPath); + ctx.nativeDb = native.NativeDatabase.openReadWrite(ctx.dbPath); + ctx.nativeDb.initSchema(); + const proxy = new NativeDbProxy(ctx.nativeDb); + proxy.__lockPath = `${ctx.dbPath}.lock`; + ctx.db = proxy as unknown as typeof ctx.db; + ctx.nativeFirstProxy = true; + } catch (err) { + warn(`NativeDatabase setup failed, falling back to better-sqlite3: ${toErrorMessage(err)}`); + try { + ctx.nativeDb?.close(); + } catch { + /* ignore */ + } + ctx.nativeDb = undefined; + ctx.nativeFirstProxy = false; + releaseAdvisoryLock(`${ctx.dbPath}.lock`); + ctx.db = openDb(ctx.dbPath); + initSchema(ctx.db); + } + } else { + ctx.db = openDb(ctx.dbPath); + initSchema(ctx.db); + } ctx.config = loadConfig(ctx.rootDir); ctx.incremental = @@ -434,16 +472,20 @@ async function runPostNativeAnalysis( analysisFileSymbols = allFileSymbols; } - // Reopen nativeDb for analysis features (suspend/resume WAL pattern). - const native = loadNative(); - if (native?.NativeDatabase) { - try { - ctx.nativeDb = native.NativeDatabase.openReadWrite(ctx.dbPath); - if (ctx.engineOpts) ctx.engineOpts.nativeDb = ctx.nativeDb; - } catch { - ctx.nativeDb = undefined; - if (ctx.engineOpts) ctx.engineOpts.nativeDb = undefined; + // In native-first mode, nativeDb is already open — no reopen needed. + if (!ctx.nativeFirstProxy) { + const native = loadNative(); + if (native?.NativeDatabase) { + try { + ctx.nativeDb = native.NativeDatabase.openReadWrite(ctx.dbPath); + if (ctx.engineOpts) ctx.engineOpts.nativeDb = ctx.nativeDb; + } catch { + ctx.nativeDb = undefined; + if (ctx.engineOpts) ctx.engineOpts.nativeDb = undefined; + } } + } else if (ctx.engineOpts) { + ctx.engineOpts.nativeDb = ctx.nativeDb; } try { @@ -463,8 +505,8 @@ async function runPostNativeAnalysis( warn(`Analysis phases failed after native build: ${toErrorMessage(err)}`); } - // Close nativeDb after analyses - if (ctx.nativeDb) { + // Close nativeDb after analyses (skip in native-first — single connection stays open) + if (ctx.nativeDb && !ctx.nativeFirstProxy) { try { ctx.nativeDb.exec('PRAGMA wal_checkpoint(TRUNCATE)'); } catch { @@ -516,8 +558,8 @@ async function tryNativeOrchestrator( return undefined; } - // Open NativeDatabase on demand for the orchestrator. - // Deferred from setupPipeline so no-op JS pipeline rebuilds skip the overhead. + // In native-first mode, nativeDb is already open from setupPipeline. + // Otherwise, open it on demand (deferred to skip overhead on no-op rebuilds). if (!ctx.nativeDb && ctx.nativeAvailable) { const native = loadNative(); if (native?.NativeDatabase) { @@ -591,7 +633,8 @@ async function tryNativeOrchestrator( const needsStructure = !result.structureHandled; if (needsAnalysis || needsStructure) { - if (!handoffWalAfterNativeBuild(ctx)) { + // In native-first mode the proxy is already wired — no WAL handoff needed. + if (!ctx.nativeFirstProxy && !handoffWalAfterNativeBuild(ctx)) { // DB reopen failed — return partial result return formatNativeTimingResult(p, 0, analysisTiming); } @@ -623,6 +666,30 @@ async function tryNativeOrchestrator( // ── Pipeline stages execution ─────────────────────────────────────────── async function runPipelineStages(ctx: PipelineContext): Promise { + // ── Native-first mode ──────────────────────────────────────────────── + // When ctx.nativeFirstProxy is true, ctx.db is a NativeDbProxy backed by + // the single rusqlite connection (ctx.nativeDb). No dual-connection WAL + // dance is needed — every stage uses the same connection transparently. + if (ctx.nativeFirstProxy) { + // Ensure engineOpts.nativeDb is set so stages can use dedicated native methods. + if (ctx.engineOpts) { + ctx.engineOpts.nativeDb = ctx.nativeDb; + } + + await collectFiles(ctx); + await detectChanges(ctx); + if (ctx.earlyExit) return; + await parseFiles(ctx); + await insertNodes(ctx); + await resolveImports(ctx); + await buildEdges(ctx); + await buildStructure(ctx); + await runAnalyses(ctx); + await finalize(ctx); + return; + } + + // ── Legacy dual-connection mode (WASM / fallback) ──────────────────── // NativeDatabase is deferred — not opened during setup. collectFiles and // detectChanges only need better-sqlite3. If no files changed, we exit // early without ever opening the native connection, saving ~5ms. diff --git a/src/domain/graph/builder/stages/insert-nodes.ts b/src/domain/graph/builder/stages/insert-nodes.ts index 9d2cd031..f069411b 100644 --- a/src/domain/graph/builder/stages/insert-nodes.ts +++ b/src/domain/graph/builder/stages/insert-nodes.ts @@ -159,23 +159,26 @@ function tryNativeInsert(ctx: PipelineContext): boolean { } const fileHashes = buildFileHashes(allSymbols, precomputedData, metadataUpdates, rootDir); - // WAL guard: same suspendJsDb/resumeJsDb pattern used by feature modules - // (ast, cfg, complexity, dataflow). Checkpoint JS side before native write, - // then checkpoint native side after, so neither library reads WAL frames - // written by the other (#696, #709, #715, #717). + // In native-first mode (single rusqlite connection), no WAL dance is needed. + // In dual-connection mode, checkpoint JS side before native write, then + // checkpoint native side after (#696, #709, #715, #717). let result: boolean; - try { - if (ctx.db) { - ctx.db.pragma('wal_checkpoint(TRUNCATE)'); - } + if (ctx.nativeFirstProxy) { result = ctx.nativeDb!.bulkInsertNodes(batches, fileHashes, removed); - } finally { + } else { try { - ctx.nativeDb?.exec('PRAGMA wal_checkpoint(TRUNCATE)'); - } catch (e) { - debug( - `tryNativeInsert: WAL checkpoint failed (nativeDb may already be closed): ${toErrorMessage(e)}`, - ); + if (ctx.db) { + ctx.db.pragma('wal_checkpoint(TRUNCATE)'); + } + result = ctx.nativeDb!.bulkInsertNodes(batches, fileHashes, removed); + } finally { + try { + ctx.nativeDb?.exec('PRAGMA wal_checkpoint(TRUNCATE)'); + } catch (e) { + debug( + `tryNativeInsert: WAL checkpoint failed (nativeDb may already be closed): ${toErrorMessage(e)}`, + ); + } } } return result;