Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/db/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ function isProcessAlive(pid: number): boolean {
}
}

function acquireAdvisoryLock(dbPath: string): void {
export function acquireAdvisoryLock(dbPath: string): void {
const lockPath = `${dbPath}.lock`;
try {
if (fs.existsSync(lockPath)) {
Expand All @@ -129,7 +129,7 @@ function acquireAdvisoryLock(dbPath: string): void {
}
}

function releaseAdvisoryLock(lockPath: string): void {
export function releaseAdvisoryLock(lockPath: string): void {
try {
const content = fs.readFileSync(lockPath, 'utf-8').trim();
if (Number(content) === process.pid) {
Expand Down
2 changes: 2 additions & 0 deletions src/db/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

export type { LockedDatabase, LockedDatabasePair } from './connection.js';
export {
acquireAdvisoryLock,
closeDb,
closeDbDeferred,
closeDbPair,
Expand All @@ -13,6 +14,7 @@ export {
openReadonlyOrFail,
openReadonlyWithNative,
openRepo,
releaseAdvisoryLock,
} from './connection.js';
export { getBuildMeta, initSchema, MIGRATIONS, setBuildMeta } from './migrations.js';
export {
Expand Down
2 changes: 2 additions & 0 deletions src/domain/graph/builder/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ export class PipelineContext {
nativeDb?: NativeDatabase;
/** Whether native engine is available (deferred — DB opened only when needed). */
nativeAvailable: boolean = false;
/** True when ctx.db is a NativeDbProxy — single rusqlite connection for the entire pipeline. */
nativeFirstProxy: boolean = false;

// ── File collection (set by collectFiles stage) ────────────────────
allFiles!: string[];
Expand Down
98 changes: 98 additions & 0 deletions src/domain/graph/builder/native-db-proxy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/**
* NativeDbProxy — wraps a NativeDatabase (rusqlite via napi-rs) to satisfy the
* BetterSqlite3Database interface. When the native addon is available, the
* build pipeline uses this proxy as `ctx.db` so that every stage operates on a
* single rusqlite connection — no dual-connection WAL corruption, no
* open/close/reopen dance.
*
* When native is unavailable, the pipeline falls back to real better-sqlite3.
*/

import type { BetterSqlite3Database, NativeDatabase, SqliteStatement } from '../../../types.js';

/** Sanitize params for napi-rs: better-sqlite3 treats `undefined` as NULL,
* but serde_json cannot represent `undefined`. Replace with `null`. */
function sanitize(params: unknown[]): Array<string | number | null> {
return params.map((p) => (p === undefined ? null : p)) as Array<string | number | null>;
}

export class NativeDbProxy implements BetterSqlite3Database {
readonly #ndb: NativeDatabase;
/** Advisory lock path — set by the pipeline so closeDb() can release it. */
__lockPath?: string;

constructor(nativeDb: NativeDatabase) {
this.#ndb = nativeDb;
}

prepare<TRow = unknown>(sql: string): SqliteStatement<TRow> {
const ndb = this.#ndb;
const stmt: SqliteStatement<TRow> = {
all(...params: unknown[]): TRow[] {
return ndb.queryAll(sql, sanitize(params)) as TRow[];
},
get(...params: unknown[]): TRow | undefined {
return (ndb.queryGet(sql, sanitize(params)) ?? undefined) as TRow | undefined;
},
run(...params: unknown[]): { changes: number; lastInsertRowid: number | bigint } {
ndb.queryAll(sql, sanitize(params));
// Retrieve last_insert_rowid via SQLite scalar function so callers
// that depend on it (e.g. CFG block edge mapping) get correct values.
const row = ndb.queryGet('SELECT last_insert_rowid() AS rid', []) as { rid: number } | null;
return { changes: 0, lastInsertRowid: row?.rid ?? 0 };
},
Comment on lines +37 to +43
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 run() stub silently drops changes and lastInsertRowid

RUN_STUB always returns { changes: 0, lastInsertRowid: 0 }, so any caller that inspects these fields — e.g. if (stmt.run(...).changes === 0) throw … — will silently get wrong data. No pipeline stage currently consumes these values, but the proxy advertises full BetterSqlite3Database compatibility, which could surprise future contributors.

Consider delegating to a dedicated exec-style method on NativeDatabase (if one exists or is added) that returns actual rowcount/rowid, or at minimum document the known limitation at the call site.

Suggested change
run(...params: unknown[]): { changes: number; lastInsertRowid: number | bigint } {
ndb.queryAll(sql, params as Array<string | number | null>);
return RUN_STUB;
},
run(...params: unknown[]): { changes: number; lastInsertRowid: number | bigint } {
// NOTE: changes and lastInsertRowid are not available via queryAll —
// callers that rely on these values must use the native fast-paths directly.
ndb.queryAll(sql, params as Array<string | number | null>);
return RUN_STUB;
},

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a NOTE comment on the run() stub clarifying that changes and lastInsertRowid are not available via queryAll and callers relying on these values must use native fast-paths directly. This documents the known limitation at the call site as suggested.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-up: the run() stub was actually causing a real bug — undefined params from callers like block.label threw serde_json::Value errors in napi-rs, and the zero-valued lastInsertRowid broke CFG block-edge mapping. Fixed in de237e4: added sanitize() to coerce undefined to null, and replaced the static stub with a real SELECT last_insert_rowid() query.

iterate(): IterableIterator<TRow> {
throw new Error('iterate() is not supported via NativeDbProxy');
},
raw(): SqliteStatement<TRow> {
return stmt; // no-op — .raw() is not used in the build pipeline
},
};
return stmt;
}

exec(sql: string): this {
this.#ndb.exec(sql);
return this;
}

pragma(sql: string): unknown {
return this.#ndb.pragma(sql);
}

close(): void {
// No-op: the pipeline manages the NativeDatabase lifecycle directly.
// closeDbPair() calls nativeDb.close() separately.
}

get open(): boolean {
return this.#ndb.isOpen;
}

get name(): string {
return this.#ndb.dbPath;
}

transaction<F extends (...args: any[]) => any>(
fn: F,
): (...args: F extends (...a: infer A) => unknown ? A : never) => ReturnType<F> {
const ndb = this.#ndb;
return ((...args: unknown[]) => {
// NOTE: nested transactions (savepoints) are not supported — ensure callers
// do not invoke a transaction() wrapper from within an existing transaction.
ndb.exec('BEGIN');
try {
const result = fn(...args);
ndb.exec('COMMIT');
return result;
} catch (e) {
try {
ndb.exec('ROLLBACK');
} catch {
// Ignore rollback errors — the original error is more important
}
throw e;
}
}) as any;
}
Comment on lines +76 to +97
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 transaction() doesn't support nested transactions

better-sqlite3's .transaction() automatically promotes to a savepoint when called inside an existing transaction, so nesting is safe. This implementation always issues BEGIN, which SQLite will reject with "cannot start a transaction within a transaction" if a transaction is already active.

The current pipeline stages (detectChanges, insertNodes, buildEdges, buildStructure) each call db.transaction(...) at the top level and not recursively, so this isn't a bug today. But since the proxy claims the same interface, a future stage that wraps an existing helper in a transaction will break in native-first mode without an obvious error message. Consider adding a comment documenting this limitation.

Suggested change
transaction<F extends (...args: any[]) => any>(
fn: F,
): (...args: F extends (...a: infer A) => unknown ? A : never) => ReturnType<F> {
const ndb = this.#ndb;
return ((...args: unknown[]) => {
ndb.exec('BEGIN');
try {
const result = fn(...args);
ndb.exec('COMMIT');
return result;
} catch (e) {
try {
ndb.exec('ROLLBACK');
} catch {
// Ignore rollback errors — the original error is more important
}
throw e;
}
}) as any;
}
transaction<F extends (...args: any[]) => any>(
fn: F,
): (...args: F extends (...a: infer A) => unknown ? A : never) => ReturnType<F> {
const ndb = this.#ndb;
return ((...args: unknown[]) => {
// NOTE: nested transactions (savepoints) are not supported — ensure callers
// do not invoke a transaction() wrapper from within an existing transaction.
ndb.exec('BEGIN');
try {
const result = fn(...args);
ndb.exec('COMMIT');
return result;
} catch (e) {
try {
ndb.exec('ROLLBACK');
} catch {
// Ignore rollback errors — the original error is more important
}
throw e;
}
}) as any;
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a NOTE comment on the transaction() method documenting that nested transactions (savepoints) are not supported and callers must not invoke a transaction() wrapper from within an existing transaction. This makes the behavioral divergence from better-sqlite3 explicit as suggested.

}
107 changes: 87 additions & 20 deletions src/domain/graph/builder/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@
* This is the heart of the builder refactor (ROADMAP 3.9): the monolithic buildGraph()
* is decomposed into independently testable stages that communicate via PipelineContext.
*/
import fs from 'node:fs';
import path from 'node:path';
import { performance } from 'node:perf_hooks';
import {
acquireAdvisoryLock,
closeDbPair,
getBuildMeta,
initSchema,
MIGRATIONS,
openDb,
releaseAdvisoryLock,
setBuildMeta,
} from '../../../db/index.js';
import { detectWorkspaces, loadConfig } from '../../../infrastructure/config.js';
Expand All @@ -25,6 +28,7 @@ import { getActiveEngine } from '../../parser.js';
import { setWorkspaces } from '../resolve.js';
import { PipelineContext } from './context.js';
import { loadPathAliases } from './helpers.js';
import { NativeDbProxy } from './native-db-proxy.js';
import { buildEdges } from './stages/build-edges.js';
import { buildStructure } from './stages/build-structure.js';
// Pipeline stages
Expand Down Expand Up @@ -110,14 +114,48 @@ function loadAliases(ctx: PipelineContext): void {
function setupPipeline(ctx: PipelineContext): void {
ctx.rootDir = path.resolve(ctx.rootDir);
ctx.dbPath = path.join(ctx.rootDir, '.codegraph', 'graph.db');
ctx.db = openDb(ctx.dbPath);
initSchema(ctx.db);

// Detect whether native engine is available, but defer opening NativeDatabase.
// The native orchestrator opens it on demand; the JS pipeline defers until
// after change detection — avoiding ~5ms open+initSchema+close on no-op rebuilds.
// Detect whether native engine is available.
const enginePref = ctx.opts.engine || 'auto';
ctx.nativeAvailable = enginePref !== 'wasm' && !!loadNative()?.NativeDatabase;
const native = enginePref !== 'wasm' ? loadNative() : null;
ctx.nativeAvailable = !!native?.NativeDatabase;

// Native-first: use only rusqlite for the entire pipeline (no better-sqlite3).
// This eliminates the dual-connection WAL corruption problem and enables all
// native fast-paths (bulkInsertNodes, classifyRolesFull, etc.).
// Fallback: if native is unavailable or FORCE_JS is set, use better-sqlite3.
if (
ctx.nativeAvailable &&
native?.NativeDatabase &&
process.env.CODEGRAPH_FORCE_JS_PIPELINE !== '1'
) {
try {
const dir = path.dirname(ctx.dbPath);
if (!fs.existsSync(dir)) fs.mkdirSync(dir, { recursive: true });
acquireAdvisoryLock(ctx.dbPath);
ctx.nativeDb = native.NativeDatabase.openReadWrite(ctx.dbPath);
ctx.nativeDb.initSchema();
const proxy = new NativeDbProxy(ctx.nativeDb);
proxy.__lockPath = `${ctx.dbPath}.lock`;
ctx.db = proxy as unknown as typeof ctx.db;
ctx.nativeFirstProxy = true;
} catch (err) {
warn(`NativeDatabase setup failed, falling back to better-sqlite3: ${toErrorMessage(err)}`);
try {
ctx.nativeDb?.close();
} catch {
/* ignore */
}
ctx.nativeDb = undefined;
ctx.nativeFirstProxy = false;
releaseAdvisoryLock(`${ctx.dbPath}.lock`);
ctx.db = openDb(ctx.dbPath);
initSchema(ctx.db);
}
} else {
ctx.db = openDb(ctx.dbPath);
initSchema(ctx.db);
}

ctx.config = loadConfig(ctx.rootDir);
ctx.incremental =
Expand Down Expand Up @@ -434,16 +472,20 @@ async function runPostNativeAnalysis(
analysisFileSymbols = allFileSymbols;
}

// Reopen nativeDb for analysis features (suspend/resume WAL pattern).
const native = loadNative();
if (native?.NativeDatabase) {
try {
ctx.nativeDb = native.NativeDatabase.openReadWrite(ctx.dbPath);
if (ctx.engineOpts) ctx.engineOpts.nativeDb = ctx.nativeDb;
} catch {
ctx.nativeDb = undefined;
if (ctx.engineOpts) ctx.engineOpts.nativeDb = undefined;
// In native-first mode, nativeDb is already open — no reopen needed.
if (!ctx.nativeFirstProxy) {
const native = loadNative();
if (native?.NativeDatabase) {
try {
ctx.nativeDb = native.NativeDatabase.openReadWrite(ctx.dbPath);
if (ctx.engineOpts) ctx.engineOpts.nativeDb = ctx.nativeDb;
} catch {
ctx.nativeDb = undefined;
if (ctx.engineOpts) ctx.engineOpts.nativeDb = undefined;
}
}
} else if (ctx.engineOpts) {
ctx.engineOpts.nativeDb = ctx.nativeDb;
}

try {
Expand All @@ -463,8 +505,8 @@ async function runPostNativeAnalysis(
warn(`Analysis phases failed after native build: ${toErrorMessage(err)}`);
}

// Close nativeDb after analyses
if (ctx.nativeDb) {
// Close nativeDb after analyses (skip in native-first — single connection stays open)
if (ctx.nativeDb && !ctx.nativeFirstProxy) {
try {
ctx.nativeDb.exec('PRAGMA wal_checkpoint(TRUNCATE)');
} catch {
Expand Down Expand Up @@ -516,8 +558,8 @@ async function tryNativeOrchestrator(
return undefined;
}

// Open NativeDatabase on demand for the orchestrator.
// Deferred from setupPipeline so no-op JS pipeline rebuilds skip the overhead.
// In native-first mode, nativeDb is already open from setupPipeline.
// Otherwise, open it on demand (deferred to skip overhead on no-op rebuilds).
if (!ctx.nativeDb && ctx.nativeAvailable) {
const native = loadNative();
if (native?.NativeDatabase) {
Expand Down Expand Up @@ -591,7 +633,8 @@ async function tryNativeOrchestrator(
const needsStructure = !result.structureHandled;

if (needsAnalysis || needsStructure) {
if (!handoffWalAfterNativeBuild(ctx)) {
// In native-first mode the proxy is already wired — no WAL handoff needed.
if (!ctx.nativeFirstProxy && !handoffWalAfterNativeBuild(ctx)) {
// DB reopen failed — return partial result
return formatNativeTimingResult(p, 0, analysisTiming);
}
Expand Down Expand Up @@ -623,6 +666,30 @@ async function tryNativeOrchestrator(
// ── Pipeline stages execution ───────────────────────────────────────────

async function runPipelineStages(ctx: PipelineContext): Promise<void> {
// ── Native-first mode ────────────────────────────────────────────────
// When ctx.nativeFirstProxy is true, ctx.db is a NativeDbProxy backed by
// the single rusqlite connection (ctx.nativeDb). No dual-connection WAL
// dance is needed — every stage uses the same connection transparently.
if (ctx.nativeFirstProxy) {
// Ensure engineOpts.nativeDb is set so stages can use dedicated native methods.
if (ctx.engineOpts) {
ctx.engineOpts.nativeDb = ctx.nativeDb;
}

await collectFiles(ctx);
await detectChanges(ctx);
if (ctx.earlyExit) return;
await parseFiles(ctx);
await insertNodes(ctx);
await resolveImports(ctx);
await buildEdges(ctx);
await buildStructure(ctx);
await runAnalyses(ctx);
await finalize(ctx);
return;
}

// ── Legacy dual-connection mode (WASM / fallback) ────────────────────
// NativeDatabase is deferred — not opened during setup. collectFiles and
// detectChanges only need better-sqlite3. If no files changed, we exit
// early without ever opening the native connection, saving ~5ms.
Expand Down
31 changes: 17 additions & 14 deletions src/domain/graph/builder/stages/insert-nodes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,23 +159,26 @@ function tryNativeInsert(ctx: PipelineContext): boolean {
}
const fileHashes = buildFileHashes(allSymbols, precomputedData, metadataUpdates, rootDir);

// WAL guard: same suspendJsDb/resumeJsDb pattern used by feature modules
// (ast, cfg, complexity, dataflow). Checkpoint JS side before native write,
// then checkpoint native side after, so neither library reads WAL frames
// written by the other (#696, #709, #715, #717).
// In native-first mode (single rusqlite connection), no WAL dance is needed.
// In dual-connection mode, checkpoint JS side before native write, then
// checkpoint native side after (#696, #709, #715, #717).
let result: boolean;
try {
if (ctx.db) {
ctx.db.pragma('wal_checkpoint(TRUNCATE)');
}
if (ctx.nativeFirstProxy) {
result = ctx.nativeDb!.bulkInsertNodes(batches, fileHashes, removed);
} finally {
} else {
try {
ctx.nativeDb?.exec('PRAGMA wal_checkpoint(TRUNCATE)');
} catch (e) {
debug(
`tryNativeInsert: WAL checkpoint failed (nativeDb may already be closed): ${toErrorMessage(e)}`,
);
if (ctx.db) {
ctx.db.pragma('wal_checkpoint(TRUNCATE)');
}
result = ctx.nativeDb!.bulkInsertNodes(batches, fileHashes, removed);
} finally {
try {
ctx.nativeDb?.exec('PRAGMA wal_checkpoint(TRUNCATE)');
} catch (e) {
debug(
`tryNativeInsert: WAL checkpoint failed (nativeDb may already be closed): ${toErrorMessage(e)}`,
);
}
}
}
return result;
Expand Down
Loading