diff --git a/packages/utils/src/lib/errors.ts b/packages/utils/src/lib/errors.ts index 3ce467bfd..83cc78592 100644 --- a/packages/utils/src/lib/errors.ts +++ b/packages/utils/src/lib/errors.ts @@ -30,3 +30,20 @@ export function stringifyError( } return JSON.stringify(error); } + +/** + * Extends an error with a new message and keeps the original as the cause. + * @param error - The error to extend + * @param message - The new message to add to the error + * @returns A new error with the extended message and the original as cause + */ +export function extendError( + error: unknown, + message: string, + { appendMessage = false } = {}, +) { + const errorMessage = appendMessage + ? `${message}\n${stringifyError(error)}` + : message; + return new Error(errorMessage, { cause: error }); +} diff --git a/packages/utils/src/lib/errors.unit.test.ts b/packages/utils/src/lib/errors.unit.test.ts index 6424819ae..ccb84d2c9 100644 --- a/packages/utils/src/lib/errors.unit.test.ts +++ b/packages/utils/src/lib/errors.unit.test.ts @@ -1,7 +1,7 @@ import ansis from 'ansis'; import { z } from 'zod'; import { SchemaValidationError } from '@code-pushup/models'; -import { stringifyError } from './errors.js'; +import { extendError, stringifyError } from './errors.js'; describe('stringifyError', () => { it('should use only message from plain Error instance', () => { @@ -113,3 +113,25 @@ describe('stringifyError', () => { ).toBe(`SchemaValidationError: Invalid ${ansis.bold('User')} […]`); }); }); + +describe('extendError', () => { + it('adds message, appends original error, and keeps cause', () => { + const original = new Error('boom'); + + const extended = extendError(original, 'wrap failed', { + appendMessage: true, + }); + + expect(extended.message).toBe('wrap failed\nboom'); + expect(extended.cause).toBe(original); + }); + + it('uses only the provided message by default', () => { + const original = new Error('boom'); + + const extended = extendError(original, 'wrap failed'); + + expect(extended.message).toBe('wrap failed'); + expect(extended.cause).toBe(original); + }); +}); diff --git a/packages/utils/src/lib/wal-sharded.int.test.ts b/packages/utils/src/lib/wal-sharded.int.test.ts new file mode 100644 index 000000000..1fa37a36e --- /dev/null +++ b/packages/utils/src/lib/wal-sharded.int.test.ts @@ -0,0 +1,259 @@ +import fs from 'node:fs'; +import path from 'node:path'; +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { PROFILER_SHARDER_ID_ENV_VAR } from './profiler/constants.js'; +import { ShardedWal } from './wal-sharded.js'; +import { type WalFormat, type WalRecord, stringCodec } from './wal.js'; + +describe('ShardedWal Integration', () => { + const testDir = path.join( + process.cwd(), + 'tmp', + 'int', + 'utils', + 'wal-sharded', + ); + const makeMockFormat = ( + overrides: Partial>, + ): WalFormat => { + const { + baseName = 'wal', + walExtension = '.log', + finalExtension = '.json', + codec = stringCodec(), + finalizer = records => `${JSON.stringify(records)}\n`, + } = overrides; + + return { + baseName, + walExtension, + finalExtension, + codec, + finalizer, + }; + }; + let shardedWal: ShardedWal; + + beforeEach(() => { + if (fs.existsSync(testDir)) { + fs.rmSync(testDir, { recursive: true, force: true }); + } + fs.mkdirSync(testDir, { recursive: true }); + }); + + afterEach(() => { + if (shardedWal) { + shardedWal.cleanupIfCoordinator(); + } + if (fs.existsSync(testDir)) { + fs.rmSync(testDir, { recursive: true, force: true }); + } + }); + + it('should create and finalize shards correctly', () => { + shardedWal = new ShardedWal({ + debug: false, + dir: testDir, + format: makeMockFormat({ + baseName: 'trace', + }), + coordinatorIdEnvVar: PROFILER_SHARDER_ID_ENV_VAR, + groupId: 'create-finalize', + }); + + const shard1 = shardedWal.shard(); + shard1.open(); + shard1.append('record1'); + shard1.append('record2'); + shard1.close(); + + const shard2 = shardedWal.shard(); + shard2.open(); + shard2.append('record3'); + shard2.close(); + + shardedWal.finalize(); + + const finalFile = path.join( + testDir, + shardedWal.groupId, + `trace.create-finalize.json`, + ); + expect(fs.existsSync(finalFile)).toBeTrue(); + + const content = fs.readFileSync(finalFile, 'utf8'); + const records = JSON.parse(content.trim()); + expect(records).toEqual(['record1', 'record2', 'record3']); + }); + + it('should merge multiple shards correctly', () => { + shardedWal = new ShardedWal({ + debug: false, + dir: testDir, + format: makeMockFormat({ + baseName: 'merged', + }), + coordinatorIdEnvVar: PROFILER_SHARDER_ID_ENV_VAR, + groupId: 'merge-shards', + }); + + // eslint-disable-next-line functional/no-loop-statements + for (let i = 1; i <= 5; i++) { + const shard = shardedWal.shard(); + shard.open(); + shard.append(`record-from-shard-${i}`); + shard.close(); + } + + shardedWal.finalize(); + + const finalFile = path.join( + testDir, + shardedWal.groupId, + `merged.merge-shards.json`, + ); + const content = fs.readFileSync(finalFile, 'utf8'); + const records = JSON.parse(content.trim()); + expect(records).toHaveLength(5); + expect(records[0]).toBe('record-from-shard-1'); + expect(records[4]).toBe('record-from-shard-5'); + }); + + it('should handle invalid entries during if debug true', () => { + shardedWal = new ShardedWal({ + debug: true, + dir: testDir, + format: makeMockFormat({ + baseName: 'test', + }), + coordinatorIdEnvVar: PROFILER_SHARDER_ID_ENV_VAR, + groupId: 'invalid-entries', + }); + + const shard = shardedWal.shard(); + shard.open(); + shard.append('valid1'); + shard.append('invalid'); + shard.append('valid2'); + shard.close(); + + shardedWal.finalize(); + // When debug is true, lastRecover should contain recovery results + expect(shardedWal.stats.lastRecover).toHaveLength(1); + expect(shardedWal.stats.lastRecover[0]).toMatchObject({ + file: expect.stringContaining('test.'), + result: expect.objectContaining({ + records: expect.arrayContaining(['valid1', 'invalid', 'valid2']), + errors: [], + partialTail: null, + }), + }); + + const finalFile = path.join( + testDir, + shardedWal.groupId, + `test.invalid-entries.json`, + ); + const content = fs.readFileSync(finalFile, 'utf8'); + const records = JSON.parse(content.trim()); + expect(records).toEqual(['valid1', 'invalid', 'valid2']); + }); + + it('should cleanup shard files after finalization', () => { + shardedWal = new ShardedWal({ + debug: false, + dir: testDir, + format: makeMockFormat({ + baseName: 'cleanup-test', + }), + coordinatorIdEnvVar: PROFILER_SHARDER_ID_ENV_VAR, + groupId: 'cleanup-test', + }); + + const shard1 = shardedWal.shard(); + shard1.open(); + shard1.append('record1'); + shard1.close(); + + const shard2 = shardedWal.shard(); + shard2.open(); + shard2.append('record2'); + shard2.close(); + + shardedWal.finalize(); + + const finalFile = path.join( + testDir, + shardedWal.groupId, + `cleanup-test.cleanup-test.json`, + ); + expect(fs.existsSync(finalFile)).toBeTrue(); + + shardedWal.cleanupIfCoordinator(); + + const groupDir = path.join(testDir, shardedWal.groupId); + const files = fs.readdirSync(groupDir); + expect(files).not.toContain(expect.stringMatching(/cleanup-test.*\.log$/)); + expect(files).toContain(`cleanup-test.cleanup-test.json`); + }); + + it('should use custom options in finalizer', () => { + shardedWal = new ShardedWal({ + debug: false, + dir: testDir, + format: makeMockFormat({ + baseName: 'custom', + finalizer: (records, opt) => + `${JSON.stringify({ records, metadata: opt })}\n`, + }), + coordinatorIdEnvVar: PROFILER_SHARDER_ID_ENV_VAR, + groupId: 'custom-finalizer', + }); + + const shard = shardedWal.shard(); + shard.open(); + shard.append('record1'); + shard.close(); + + shardedWal.finalize({ version: '2.0', timestamp: Date.now() }); + + const finalFile = path.join( + testDir, + shardedWal.groupId, + `custom.custom-finalizer.json`, + ); + const content = fs.readFileSync(finalFile, 'utf8'); + const result = JSON.parse(content.trim()); + expect(result.records).toEqual(['record1']); + expect(result.metadata).toEqual({ + version: '2.0', + timestamp: expect.any(Number), + }); + }); + + it('should handle empty shards correctly', () => { + shardedWal = new ShardedWal({ + debug: false, + dir: testDir, + format: makeMockFormat({ + baseName: 'empty', + }), + coordinatorIdEnvVar: PROFILER_SHARDER_ID_ENV_VAR, + groupId: 'empty-shards', + }); + + const groupDir = path.join(testDir, shardedWal.groupId); + fs.mkdirSync(groupDir, { recursive: true }); + + shardedWal.finalize(); + + const finalFile = path.join( + testDir, + shardedWal.groupId, + `empty.${shardedWal.groupId}.json`, + ); + expect(fs.existsSync(finalFile)).toBeTrue(); + const content = fs.readFileSync(finalFile, 'utf8'); + expect(content.trim()).toBe('[]'); + }); +}); diff --git a/packages/utils/src/lib/wal-sharded.ts b/packages/utils/src/lib/wal-sharded.ts new file mode 100644 index 000000000..3ea89f3a8 --- /dev/null +++ b/packages/utils/src/lib/wal-sharded.ts @@ -0,0 +1,459 @@ +import * as fs from 'node:fs'; +import path from 'node:path'; +import process from 'node:process'; +import { threadId } from 'node:worker_threads'; +import { extendError } from './errors.js'; +import { + type Counter, + getUniqueInstanceId, + getUniqueTimeId, +} from './process-id.js'; +import { + type InvalidEntry, + type RecoverResult, + type WalFormat, + type WalRecord, + WriteAheadLogFile, + filterValidRecords, +} from './wal.js'; + +/** + * NOTE: this helper is only used in this file. The rest of the repo avoids sync methods so it is not reusable. + * Ensures a directory exists, creating it recursively if necessary using sync methods. + * @param dirPath - The directory path to ensure exists + */ +function ensureDirectoryExistsSync(dirPath: string): void { + if (!fs.existsSync(dirPath)) { + fs.mkdirSync(dirPath, { recursive: true }); + } +} + +/** + * Validates that a groupId is safe to use as a single path segment. + * Rejects path traversal attempts and path separators to prevent writing outside intended directory. + * + * @param groupId - The groupId to validate + * @throws Error if groupId contains unsafe characters or path traversal sequences + */ +function validateGroupId(groupId: string): void { + // Reject empty or whitespace-only groupIds + if (!groupId || groupId.trim().length === 0) { + throw new Error('groupId cannot be empty or whitespace-only'); + } + + // Reject path separators (both forward and backward slashes) + if (groupId.includes('/') || groupId.includes('\\')) { + throw new Error('groupId cannot contain path separators (/ or \\)'); + } + + // Reject relative path components + if (groupId === '..' || groupId === '.') { + throw new Error('groupId cannot be "." or ".."'); + } + + // Reject null bytes which can be used to bypass validation + if (groupId.includes('\0')) { + throw new Error('groupId cannot contain null bytes'); + } + + // Validate that the resolved path stays within the intended directory + // This catches cases where the path library normalizes to a parent directory + const normalized = path.normalize(groupId); + if (normalized !== groupId || normalized.startsWith('..')) { + throw new Error( + `groupId normalization resulted in unsafe path: ${normalized}`, + ); + } +} + +// eslint-disable-next-line functional/no-let +let shardCount = 0; + +/** + * Counter for generating sequential shard IDs. + * Encapsulates the shard count increment logic. + */ +export const ShardedWalCounter: Counter = { + next() { + return ++shardCount; + }, +}; + +/** + * Generates a unique readable instance ID. + * This ID uniquely identifies a shard/file per process/thread combination with a human-readable timestamp. + * Format: readable-timestamp.pid.threadId.counter + * Example: "20240101-120000-000.12345.1.1" + * + * @returns A unique ID string with readable timestamp, process ID, thread ID, and counter + */ +export function getShardId(): string { + return `${getUniqueTimeId()}.${process.pid}.${threadId}.${ShardedWalCounter.next()}`; +} + +/** + * Sharded Write-Ahead Log manager for coordinating multiple WAL shards. + * Handles distributed logging across multiple processes/files with atomic finalization. + */ + +export class ShardedWal { + static instanceCount = 0; + + readonly #id: string = getUniqueInstanceId({ + next() { + return ++ShardedWal.instanceCount; + }, + }); + readonly groupId = getUniqueTimeId(); + readonly #debug: boolean = false; + readonly #format: WalFormat; + readonly #dir: string = process.cwd(); + readonly #coordinatorIdEnvVar: string; + #state: 'active' | 'finalized' | 'cleaned' = 'active'; + #lastRecovery: { + file: string; + result: RecoverResult>; + }[] = []; + #createdShardFiles: string[] = []; + + /** + * Initialize the origin PID environment variable if not already set. + * This must be done as early as possible before any user code runs. + * Sets envVarName to the current process ID if not already defined. + * + * @param envVarName - Environment variable name for storing coordinator ID + * @param profilerID - The profiler ID to set as coordinator + */ + static setCoordinatorProcess(envVarName: string, profilerID: string): void { + if (!process.env[envVarName]) { + process.env[envVarName] = profilerID; + } + } + + /** + * Determines if this process is the leader WAL process using the origin PID heuristic. + * + * The leader is the process that first enabled profiling (the one that set CP_PROFILER_ORIGIN_PID). + * All descendant processes inherit the environment but have different PIDs. + * + * @param envVarName - Environment variable name for storing coordinator ID + * @param profilerID - The profiler ID to check + * @returns true if this is the leader WAL process, false otherwise + */ + static isCoordinatorProcess(envVarName: string, profilerID: string): boolean { + return process.env[envVarName] === profilerID; + } + + /** + * Create a sharded WAL manager. + * + * @param opt.dir - Base directory to store shard files (defaults to process.cwd()) + * @param opt.format - WAL format configuration + * @param opt.groupId - Group ID for sharding (defaults to generated group ID) + * @param opt.coordinatorIdEnvVar - Environment variable name for storing coordinator ID (defaults to CP_SHARDED_WAL_COORDINATOR_ID) + * @param opt.autoCoordinator - Whether to auto-set the coordinator ID on construction (defaults to true) + * @param opt.measureNameEnvVar - Environment variable name for coordinating groupId across processes (optional) + */ + constructor(opt: { + debug?: boolean; + dir?: string; + format: WalFormat; + groupId?: string; + coordinatorIdEnvVar: string; + autoCoordinator?: boolean; + measureNameEnvVar?: string; + }) { + const { + dir, + format, + debug, + groupId, + coordinatorIdEnvVar, + autoCoordinator = true, + measureNameEnvVar, + } = opt; + + if (debug != null) { + this.#debug = debug; + } + + // Determine groupId: use provided, then env var, or generate + // eslint-disable-next-line functional/no-let + let resolvedGroupId: string; + if (groupId != null) { + // User explicitly provided groupId - use it (even if empty, validation will catch it) + resolvedGroupId = groupId; + } else if (measureNameEnvVar && process.env[measureNameEnvVar] != null) { + // Env var is set (by coordinator or previous process) - use it + resolvedGroupId = process.env[measureNameEnvVar]; + } else if (measureNameEnvVar) { + // Env var not set - we're likely the first/coordinator, generate and set it + resolvedGroupId = getUniqueTimeId(); + + process.env[measureNameEnvVar] = resolvedGroupId; + } else { + // No measureNameEnvVar provided - generate unique one (backward compatible) + resolvedGroupId = getUniqueTimeId(); + } + + // Validate groupId for path safety before using it + validateGroupId(resolvedGroupId); + + this.groupId = resolvedGroupId; + + if (dir) { + this.#dir = dir; + } + this.#format = format; + this.#coordinatorIdEnvVar = coordinatorIdEnvVar; + + if (autoCoordinator) { + ShardedWal.setCoordinatorProcess(this.#coordinatorIdEnvVar, this.#id); + } + } + + /** + * Gets the unique instance ID for this ShardedWal. + * + * @returns The unique instance ID + */ + get id(): string { + return this.#id; + } + + /** + * Is this instance the coordinator? + * + * Coordinator status is determined from the coordinatorIdEnvVar environment variable. + * The coordinator handles finalization and cleanup of shard files. + * Checks dynamically to allow coordinator to be set after construction. + * + * @returns true if this instance is the coordinator, false otherwise + */ + isCoordinator(): boolean { + return ShardedWal.isCoordinatorProcess(this.#coordinatorIdEnvVar, this.#id); + } + + /** + * Asserts that the WAL is in 'active' state. + * Throws an error if the WAL has been finalized or cleaned. + * + * @throws Error if WAL is not in 'active' state + */ + private assertActive(): void { + if (this.#state !== 'active') { + throw new Error(`WAL is ${this.#state}, cannot modify`); + } + } + + /** + * Gets the current lifecycle state of the WAL. + * + * @returns Current lifecycle state: 'active', 'finalized', or 'cleaned' + */ + getState(): 'active' | 'finalized' | 'cleaned' { + return this.#state; + } + + /** + * Checks if the WAL has been finalized. + * + * @returns true if WAL is in 'finalized' state, false otherwise + */ + isFinalized(): boolean { + return this.#state === 'finalized'; + } + + /** + * Checks if the WAL has been cleaned. + * + * @returns true if WAL is in 'cleaned' state, false otherwise + */ + isCleaned(): boolean { + return this.#state === 'cleaned'; + } + + /** + * Generates a filename for a shard file using a shard ID. + * Both groupId and shardId are already in readable date format. + * + * Example with baseName "trace" and shardId "20240101-120000-000.12345.1.1": + * Filename: trace.20240101-120000-000.12345.1.1.log + * + * @param shardId - The human-readable shard ID (readable-timestamp.pid.threadId.count format) + * @returns The filename for the shard file + */ + getShardedFileName(shardId: string) { + const { baseName, walExtension } = this.#format; + return `${baseName}.${shardId}${walExtension}`; + } + + /** + * Generates a filename for the final merged output file. + * Uses the groupId as the identifier in the final filename. + * + * Example with baseName "trace" and groupId "20240101-120000-000": + * Filename: trace.20240101-120000-000.json + * + * Example with baseName "trace" and groupId "measureName": + * Filename: trace.measureName.json + * + * @returns The filename for the final merged output file + */ + getFinalFilePath() { + const groupIdDir = path.join(this.#dir, this.groupId); + const { baseName, finalExtension } = this.#format; + + return path.join( + groupIdDir, + `${baseName}.${this.groupId}${finalExtension}`, + ); + } + + shard() { + this.assertActive(); + const filePath = path.join( + this.#dir, + this.groupId, + this.getShardedFileName(getShardId()), + ); + this.#createdShardFiles.push(filePath); + return new WriteAheadLogFile({ + file: filePath, + codec: this.#format.codec, + }); + } + + /** Get all shard file paths matching this WAL's base name */ + private shardFiles() { + if (!fs.existsSync(this.#dir)) { + return []; + } + + const groupDir = path.join(this.#dir, this.groupId); + // create dir if not existing + ensureDirectoryExistsSync(groupDir); + + return fs + .readdirSync(groupDir) + .filter(entry => entry.endsWith(this.#format.walExtension)) + .filter(entry => entry.startsWith(`${this.#format.baseName}`)) + .map(entry => path.join(groupDir, entry)); + } + + /** Get shard file paths created by this instance */ + private getCreatedShardFiles() { + return this.#createdShardFiles.filter(f => fs.existsSync(f)); + } + + /** + * Finalize all shards by merging them into a single output file. + * Recovers all records from all shards, validates no errors, and writes merged result. + * Idempotent: returns early if already finalized or cleaned. + * @throws Error if custom finalizer method throws + */ + finalize(opt?: Record) { + if (this.#state !== 'active') { + return; + } + + // Ensure base directory exists before calling shardFiles() + ensureDirectoryExistsSync(this.#dir); + + const fileRecoveries = this.shardFiles().map(f => ({ + file: f, + result: new WriteAheadLogFile({ + file: f, + codec: this.#format.codec, + }).recover(), + })); + + const records = fileRecoveries.flatMap(({ result }) => result.records); + + if (this.#debug) { + this.#lastRecovery = fileRecoveries; + } + + ensureDirectoryExistsSync(path.dirname(this.getFinalFilePath())); + + try { + fs.writeFileSync( + this.getFinalFilePath(), + this.#format.finalizer(filterValidRecords(records), opt), + ); + } catch (error) { + throw extendError( + error, + 'Could not finalize sharded wal. Finalizer method in format throws.', + { appendMessage: true }, + ); + } + + this.#state = 'finalized'; + } + + /** + * Cleanup shard files by removing them from disk. + * Coordinator-only: throws error if not coordinator to prevent race conditions. + * Idempotent: returns early if already cleaned. + */ + cleanup() { + if (!this.isCoordinator()) { + throw new Error('cleanup() can only be called by coordinator'); + } + + if (this.#state === 'cleaned') { + return; + } + + this.shardFiles() + .filter(f => fs.existsSync(f)) + .forEach(f => { + fs.unlinkSync(f); + }); + + this.#state = 'cleaned'; + } + + get stats() { + // When finalized, count all shard files from filesystem (for multi-process scenarios) + // Otherwise, count only files created by this instance + const shardFileCount = + this.#state === 'finalized' || this.#state === 'cleaned' + ? this.shardFiles().length + : this.getCreatedShardFiles().length; + const shardFilesList = + this.#state === 'finalized' || this.#state === 'cleaned' + ? this.shardFiles() + : this.getCreatedShardFiles(); + + return { + lastRecover: this.#lastRecovery, + state: this.#state, + groupId: this.groupId, + shardCount: this.getCreatedShardFiles().length, + isCoordinator: this.isCoordinator(), + isFinalized: this.isFinalized(), + isCleaned: this.isCleaned(), + finalFilePath: this.getFinalFilePath(), + shardFileCount, + shardFiles: shardFilesList, + }; + } + + finalizeIfCoordinator(opt?: Record) { + if (this.isCoordinator()) { + this.finalize(opt); + } + } + + /** + * Cleanup shard files if this instance is the coordinator. + * Safe to call from any process - only coordinator will execute cleanup. + */ + cleanupIfCoordinator() { + if (this.isCoordinator()) { + this.cleanup(); + } + } +} diff --git a/packages/utils/src/lib/wal-sharded.unit.test.ts b/packages/utils/src/lib/wal-sharded.unit.test.ts new file mode 100644 index 000000000..455cb12a1 --- /dev/null +++ b/packages/utils/src/lib/wal-sharded.unit.test.ts @@ -0,0 +1,575 @@ +import { vol } from 'memfs'; +import { beforeEach, describe, expect, it } from 'vitest'; +import { MEMFS_VOLUME, osAgnosticPath } from '@code-pushup/test-utils'; +import { getUniqueInstanceId } from './process-id.js'; +import { PROFILER_SHARDER_ID_ENV_VAR } from './profiler/constants.js'; +import { ShardedWal } from './wal-sharded.js'; +import { + type WalFormat, + WriteAheadLogFile, + parseWalFormat, + stringCodec, +} from './wal.js'; + +const read = (p: string) => vol.readFileSync(p, 'utf8') as string; + +const getShardedWal = (overrides?: { + dir?: string; + format?: Partial; + measureNameEnvVar?: string; + autoCoordinator?: boolean; + groupId?: string; +}) => { + const { format, ...rest } = overrides ?? {}; + return new ShardedWal({ + debug: false, + dir: '/test/shards', + format: parseWalFormat({ + baseName: 'test-wal', + ...format, + }), + coordinatorIdEnvVar: PROFILER_SHARDER_ID_ENV_VAR, + ...rest, + }); +}; + +describe('ShardedWal', () => { + beforeEach(() => { + vol.reset(); + vol.fromJSON({}, MEMFS_VOLUME); + // Clear coordinator env var for fresh state + // eslint-disable-next-line functional/immutable-data, @typescript-eslint/no-dynamic-delete + delete process.env[PROFILER_SHARDER_ID_ENV_VAR]; + // Clear measure name env var to avoid test pollution + // eslint-disable-next-line functional/immutable-data, @typescript-eslint/no-dynamic-delete + delete process.env.CP_PROFILER_MEASURE_NAME; + }); + + describe('initialization', () => { + it('should create instance with directory and format', () => { + const sw = getShardedWal(); + expect(sw).toBeInstanceOf(ShardedWal); + }); + + it('should expose a stable id via getter', () => { + const sw = getShardedWal(); + const firstId = sw.id; + expect(sw.id).toBe(firstId); + }); + + it('should use groupId from env var when measureNameEnvVar is set', () => { + // eslint-disable-next-line functional/immutable-data + process.env.CP_PROFILER_MEASURE_NAME = 'from-env'; + const sw = getShardedWal({ + measureNameEnvVar: 'CP_PROFILER_MEASURE_NAME', + }); + expect(sw.groupId).toBe('from-env'); + expect(process.env.CP_PROFILER_MEASURE_NAME).toBe('from-env'); + }); + + it('should set env var when measureNameEnvVar is provided and unset', () => { + // eslint-disable-next-line functional/immutable-data + delete process.env.CP_PROFILER_MEASURE_NAME; + const sw = getShardedWal({ + measureNameEnvVar: 'CP_PROFILER_MEASURE_NAME', + }); + expect(process.env.CP_PROFILER_MEASURE_NAME).toBe(sw.groupId); + }); + }); + + describe('path traversal validation', () => { + it('should reject groupId with forward slashes', () => { + expect(() => getShardedWal({ groupId: '../etc/passwd' })).toThrow( + 'groupId cannot contain path separators (/ or \\)', + ); + }); + + it('should reject groupId with backward slashes', () => { + expect(() => getShardedWal({ groupId: '..\\windows\\system32' })).toThrow( + 'groupId cannot contain path separators (/ or \\)', + ); + }); + + it('should reject groupId with parent directory reference', () => { + expect(() => getShardedWal({ groupId: '..' })).toThrow( + 'groupId cannot be "." or ".."', + ); + }); + + it('should reject groupId with current directory reference', () => { + expect(() => getShardedWal({ groupId: '.' })).toThrow( + 'groupId cannot be "." or ".."', + ); + }); + + it('should reject groupId with null bytes', () => { + expect(() => getShardedWal({ groupId: 'test\0malicious' })).toThrow( + 'groupId cannot contain null bytes', + ); + }); + + it('should reject empty groupId', () => { + expect(() => getShardedWal({ groupId: '' })).toThrow( + 'groupId cannot be empty or whitespace-only', + ); + }); + + it('should reject whitespace-only groupId', () => { + expect(() => getShardedWal({ groupId: ' ' })).toThrow( + 'groupId cannot be empty or whitespace-only', + ); + }); + + it('should accept safe alphanumeric groupId', () => { + const sw = getShardedWal({ groupId: 'safe-group-123' }); + expect(sw.groupId).toBe('safe-group-123'); + }); + + it('should accept groupId with underscores and hyphens', () => { + const sw = getShardedWal({ groupId: 'test_group-name' }); + expect(sw.groupId).toBe('test_group-name'); + }); + + it('should reject groupId from env var with path traversal', () => { + // eslint-disable-next-line functional/immutable-data + process.env.CP_PROFILER_MEASURE_NAME = '../malicious'; + expect(() => + getShardedWal({ + measureNameEnvVar: 'CP_PROFILER_MEASURE_NAME', + }), + ).toThrow('groupId cannot contain path separators (/ or \\)'); + }); + }); + + describe('shard management', () => { + it('should create shard with correct file path', () => { + const sw = getShardedWal({ + format: { baseName: 'trace', walExtension: '.log' }, + }); + const shard = sw.shard(); + expect(shard).toBeInstanceOf(WriteAheadLogFile); + // Shard files use getShardId() format (timestamp.pid.threadId.counter) + // The groupId is auto-generated and used in the shard path + // Normalize path before regex matching to handle OS-specific separators + expect(osAgnosticPath(shard.getPath())).toMatch( + /^\/shards\/\d{8}-\d{6}-\d{3}\/trace\.\d{8}-\d{6}-\d{3}(?:\.\d+){3}\.log$/, + ); + expect(shard.getPath()).toEndWithPath('.log'); + }); + + it('should create shard with default shardId when no argument provided', () => { + const sw = getShardedWal({ + format: { baseName: 'trace', walExtension: '.log' }, + }); + const shard = sw.shard(); + expect(shard.getPath()).toStartWithPath( + '/shards/20231114-221320-000/trace.20231114-221320-000.10001', + ); + expect(shard.getPath()).toEndWithPath('.log'); + }); + }); + + describe('file operations', () => { + it('should list no shard files when directory does not exist', () => { + const sw = getShardedWal({ dir: '/nonexistent' }); + const files = (sw as any).shardFiles(); + expect(files).toEqual([]); + }); + + it('should list no shard files when directory is empty', () => { + const sw = getShardedWal({ dir: '/empty' }); + vol.mkdirSync('/empty/20231114-221320-000', { recursive: true }); + const files = (sw as any).shardFiles(); + expect(files).toEqual([]); + }); + + it('should list shard files matching extension', () => { + vol.fromJSON({ + '/shards/20231114-221320-000/trace.19700101-000820-001.1.log': + 'content1', + '/shards/20231114-221320-000/trace.19700101-000820-002.2.log': + 'content2', + '/shards/other.txt': 'not a shard', + }); + + const sw = getShardedWal({ + dir: '/shards', + format: { baseName: 'trace', walExtension: '.log' }, + }); + const files = (sw as any).shardFiles(); + + expect(files).toHaveLength(2); + expect(files).toEqual( + expect.arrayContaining([ + expect.pathToMatch( + '/shards/20231114-221320-000/trace.19700101-000820-001.1.log', + ), + expect.pathToMatch( + '/shards/20231114-221320-000/trace.19700101-000820-002.2.log', + ), + ]), + ); + }); + }); + + describe('finalization', () => { + it('should finalize empty shards to empty result', () => { + const sw = getShardedWal({ + dir: '/shards', + format: { + baseName: 'final', + finalExtension: '.json', + finalizer: records => `${JSON.stringify(records)}\n`, + }, + }); + + vol.mkdirSync('/shards/20231114-221320-000', { recursive: true }); + sw.finalize(); + + expect( + read('/shards/20231114-221320-000/final.20231114-221320-000.json'), + ).toBe('[]\n'); + }); + + it('should finalize multiple shards into single file', () => { + vol.fromJSON({ + '/shards/20231114-221320-000/merged.20240101-120000-001.1.log': + 'record1\n', + '/shards/20231114-221320-000/merged.20240101-120000-002.2.log': + 'record2\n', + }); + + const sw = getShardedWal({ + dir: '/shards', + format: { + baseName: 'merged', + walExtension: '.log', + finalExtension: '.json', + finalizer: records => `${JSON.stringify(records)}\n`, + }, + }); + + sw.finalize(); + + const result = JSON.parse( + read( + '/shards/20231114-221320-000/merged.20231114-221320-000.json', + ).trim(), + ); + expect(result).toEqual(['record1', 'record2']); + }); + + it('should handle invalid entries during finalize', () => { + vol.fromJSON({ + '/shards/20231114-221320-000/final.20240101-120000-001.1.log': + 'valid\n', + '/shards/20231114-221320-000/final.20240101-120000-002.2.log': + 'invalid\n', + }); + + const sw = getShardedWal({ + dir: '/shards', + format: { + baseName: 'final', + walExtension: '.log', + finalExtension: '.json', + codec: stringCodec(), + finalizer: records => `${JSON.stringify(records)}\n`, + }, + }); + + sw.finalize(); + + const result = JSON.parse( + read( + '/shards/20231114-221320-000/final.20231114-221320-000.json', + ).trim(), + ); + expect(result).toHaveLength(2); + expect(result[0]).toBe('valid'); + expect(result[1]).toBe('invalid'); + }); + + it('should use custom options in finalizer', () => { + vol.fromJSON({ + '/shards/20231114-221320-000/final.20231114-221320-000.10001.2.1.log': + 'record1\n', + }); + + const sw = getShardedWal({ + dir: '/shards', + format: { + baseName: 'final', + walExtension: '.log', + finalExtension: '.json', + finalizer: (records, opt) => + `${JSON.stringify({ records, meta: opt })}\n`, + }, + }); + + sw.finalize({ version: '1.0', compressed: true }); + + const result = JSON.parse( + read('/shards/20231114-221320-000/final.20231114-221320-000.json'), + ); + expect(result.records).toEqual(['record1']); + expect(result.meta).toEqual({ version: '1.0', compressed: true }); + }); + }); + + describe('cleanup', () => { + it('should throw error when cleanup is called by non-coordinator', () => { + vol.fromJSON({ + '/shards/20231114-221320-000/test.20231114-221320-000.10001.2.1.log': + 'content1', + }); + const sw = getShardedWal({ + dir: '/shards', + format: { baseName: 'test', walExtension: '.log' }, + autoCoordinator: false, + }); + + // Instance won't be coordinator, so cleanup() should throw + expect(() => sw.cleanup()).toThrow( + 'cleanup() can only be called by coordinator', + ); + }); + + it('should handle cleanupIfCoordinator when not coordinator', () => { + vol.fromJSON({ + '/shards/20231114-221320-000/test.20231114-221320-000.10001.2.1.log': + 'content1', + }); + + const sw = getShardedWal({ + dir: '/shards', + format: { baseName: 'test', walExtension: '.log' }, + autoCoordinator: false, + }); + + // cleanupIfCoordinator should be no-op when not coordinator + sw.cleanupIfCoordinator(); + + // Files should still exist + expect(vol.toJSON()).not.toStrictEqual({}); + expect(sw.getState()).toBe('active'); + }); + + it('should handle cleanup when some shard files do not exist', () => { + vol.fromJSON({ + '/shards/20231114-221320-000/test.20231114-221320-000.10001.2.1.log': + 'content1', + }); + + const sw = getShardedWal({ + dir: '/shards', + format: { baseName: 'test', walExtension: '.log' }, + }); + + vol.unlinkSync( + '/shards/20231114-221320-000/test.20231114-221320-000.10001.2.1.log', + ); + + // cleanupIfCoordinator won't throw even if files don't exist + expect(() => sw.cleanupIfCoordinator()).not.toThrow(); + }); + + it('should ignore directory removal failures during cleanup', () => { + vol.fromJSON({ + '/shards/20231114-221320-000/test.20231114-221320-000.10001.2.1.log': + 'content1', + '/shards/20231114-221320-000/keep.txt': 'keep', + }); + + const sw = getShardedWal({ + dir: '/shards', + format: { baseName: 'test', walExtension: '.log' }, + }); + + expect(() => sw.cleanup()).not.toThrow(); + expect( + vol.readFileSync('/shards/20231114-221320-000/keep.txt', 'utf8'), + ).toBe('keep'); + }); + }); + + describe('lifecycle state', () => { + it('throws with appended finalizer error when finalize fails', () => { + const sw = getShardedWal({ + dir: '/shards', + format: { + baseName: 'test', + finalExtension: '.json', + finalizer: () => { + throw new Error('finalizer boom'); + }, + }, + }); + + expect(() => sw.finalize()).toThrow( + /Could not finalize sharded wal\. Finalizer method in format throws\./, + ); + expect(() => sw.finalize()).toThrow(/finalizer boom/); + expect(sw.getState()).toBe('active'); + }); + + it('should start in active state', () => { + const sw = getShardedWal(); + expect(sw.getState()).toBe('active'); + expect(sw.isFinalized()).toBeFalse(); + expect(sw.isCleaned()).toBeFalse(); + }); + + it('should transition to finalized state after finalize', () => { + vol.mkdirSync('/shards/20231114-221320-000', { recursive: true }); + const sw = getShardedWal({ + dir: '/shards', + format: { + baseName: 'test', + finalExtension: '.json', + finalizer: records => `${JSON.stringify(records)}\n`, + }, + }); + + sw.finalize(); + + expect(sw.getState()).toBe('finalized'); + expect(sw.isFinalized()).toBeTrue(); + expect(sw.isCleaned()).toBeFalse(); + }); + + it('should transition to cleaned state after cleanup (when coordinator)', () => { + vol.fromJSON({ + '/shards/20231114-221320-000/test.20231114-221320-000.10001.2.1.log': + 'content1', + }); + + const sw = getShardedWal({ + dir: '/shards', + format: { baseName: 'test', walExtension: '.log' }, + }); + + sw.cleanupIfCoordinator(); + + const state = sw.getState(); + expect(['active', 'cleaned']).toContain(state); + }); + + it('should make cleanup idempotent for coordinator', () => { + vol.fromJSON({ + '/shards/20231114-221320-000/test.20231114-221320-000.10001.2.1.log': + 'content1', + }); + + const sw = getShardedWal({ + dir: '/shards', + format: { baseName: 'test', walExtension: '.log' }, + }); + + sw.cleanup(); + expect(sw.getState()).toBe('cleaned'); + + expect(() => sw.cleanup()).not.toThrow(); + expect(sw.getState()).toBe('cleaned'); + }); + + it('should prevent shard creation after finalize', () => { + vol.mkdirSync('/shards/20231114-221320-000', { recursive: true }); + const sw = getShardedWal({ + dir: '/shards', + format: { + baseName: 'test', + finalExtension: '.json', + finalizer: records => `${JSON.stringify(records)}\n`, + }, + }); + + sw.finalize(); + + expect(() => sw.shard()).toThrow('WAL is finalized, cannot modify'); + }); + + it('should prevent shard creation after cleanup', () => { + vol.fromJSON({ + '/shards/20231114-221320-000/test.20231114-221320-000.10001.2.1.log': + 'content1', + }); + + // Generate the instance ID that will be used by the constructor + // The constructor increments ShardedWal.instanceCount, so we need to + // generate the ID using the value that will be used (current + 1) + // without actually modifying ShardedWal.instanceCount + const nextCount = ShardedWal.instanceCount + 1; + const instanceId = getUniqueInstanceId({ + next() { + return nextCount; + }, + }); + + // Set coordinator BEFORE creating instance + ShardedWal.setCoordinatorProcess(PROFILER_SHARDER_ID_ENV_VAR, instanceId); + + const sw = getShardedWal({ + dir: '/shards', + format: { baseName: 'test', walExtension: '.log' }, + }); + + sw.cleanupIfCoordinator(); + + expect(() => sw.shard()).toThrow('WAL is cleaned, cannot modify'); + }); + + it('should make finalize idempotent', () => { + vol.mkdirSync('/shards/20231114-221320-000', { recursive: true }); + const sw = getShardedWal({ + dir: '/shards', + format: { + baseName: 'test', + finalExtension: '.json', + finalizer: records => `${JSON.stringify(records)}\n`, + }, + }); + + sw.finalize(); + expect(sw.getState()).toBe('finalized'); + + // Call again - should not throw and should remain finalized + sw.finalize(); + expect(sw.getState()).toBe('finalized'); + }); + + it('should prevent finalize after cleanup', () => { + // Generate the instance ID that will be used by the constructor + // The constructor increments ShardedWal.instanceCount, so we need to + // generate the ID using the value that will be used (current + 1) + // without actually modifying ShardedWal.instanceCount + const nextCount = ShardedWal.instanceCount + 1; + const instanceId = getUniqueInstanceId({ + next() { + return nextCount; + }, + }); + + // Set coordinator BEFORE creating instance + ShardedWal.setCoordinatorProcess(PROFILER_SHARDER_ID_ENV_VAR, instanceId); + + const sw = getShardedWal({ + dir: '/shards', + format: { + baseName: 'test', + walExtension: '.log', + finalExtension: '.json', + finalizer: records => `${JSON.stringify(records)}\n`, + }, + }); + + expect(sw.stats.shardFiles).toHaveLength(0); + sw.shard(); + expect(sw.stats.shardFiles).toHaveLength(0); + + sw.cleanupIfCoordinator(); + expect(sw.getState()).toBe('cleaned'); + expect(sw.stats.shardFiles).toHaveLength(0); + }); + }); +}); diff --git a/packages/utils/src/lib/wal.int.test.ts b/packages/utils/src/lib/wal.int.test.ts new file mode 100644 index 000000000..81c71709b --- /dev/null +++ b/packages/utils/src/lib/wal.int.test.ts @@ -0,0 +1,127 @@ +import fs from 'node:fs/promises'; +import path from 'node:path'; +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { + type Codec, + type WalRecord, + WriteAheadLogFile, + stringCodec, +} from './wal.js'; + +describe('WriteAheadLogFile Integration', () => { + const testDir = path.join(process.cwd(), 'tmp', 'int', 'utils', 'wal'); + let walFile: WriteAheadLogFile; + + beforeEach(async () => { + // Clean up test directory + await fs.rm(testDir, { recursive: true, force: true }); + await fs.mkdir(testDir, { recursive: true }); + }); + + afterEach(async () => { + if (walFile && !walFile.isClosed()) { + walFile.close(); + } + await fs.rm(testDir, { recursive: true, force: true }); + }); + + it('should recover from file with partial write', async () => { + const filePath = path.join(testDir, 'partial.log'); + walFile = new WriteAheadLogFile({ file: filePath, codec: stringCodec() }); + + walFile.open(); + walFile.append('complete1'); + walFile.append('complete2'); + walFile.close(); + + // Simulate partial write by appending incomplete line + await fs.appendFile(filePath, '"partial'); + + const recovered = walFile.recover(); + expect(recovered.records).toEqual(['complete1', 'complete2']); + expect(recovered.partialTail).toBe('"partial'); + }); + + it('should repack file removing invalid entries', () => { + const filePath = path.join(testDir, 'repack.log'); + const tolerantCodec: Codec = { + encode: v => (typeof v === 'string' ? v : JSON.stringify(v)), + decode: (s: string) => { + if (s === 'invalid') throw new Error('Invalid record'); + return s; + }, + }; + + walFile = new WriteAheadLogFile({ file: filePath, codec: tolerantCodec }); + walFile.open(); + walFile.append('valid1'); + walFile.append('invalid'); + walFile.append('valid2'); + walFile.close(); + + walFile.repack(); + + const recovered = walFile.recover(); + expect(recovered.records).toEqual(['valid1', 'valid2']); + }); + + it('should handle error recovery scenarios', () => { + const filePath = path.join(testDir, 'errors.log'); + const failingCodec: Codec = { + encode: v => (typeof v === 'string' ? v : JSON.stringify(v)), + decode: (s: string) => { + if (s === 'bad') throw new Error('Bad record'); + return s; + }, + }; + + walFile = new WriteAheadLogFile({ file: filePath, codec: failingCodec }); + walFile.open(); + walFile.append('good'); + walFile.append('bad'); + walFile.append('good'); + walFile.close(); + + const recovered = walFile.recover(); + expect(recovered.records).toEqual([ + 'good', + { __invalid: true, raw: 'bad' }, + 'good', + ]); + expect(recovered.errors).toEqual([]); + }); + + it('should handle object records correctly', () => { + const filePath = path.join(testDir, 'objects.log'); + walFile = new WriteAheadLogFile({ + file: filePath, + codec: stringCodec(), + }); + + walFile.open(); + walFile.append({ id: 1, name: 'test1' }); + walFile.append({ id: 2, name: 'test2' }); + walFile.close(); + + const recovered = walFile.recover(); + expect(recovered.records).toEqual([ + { id: 1, name: 'test1' }, + { id: 2, name: 'test2' }, + ]); + }); + + it('should perform complete write/recover cycle', () => { + const filePath = path.join(testDir, 'test.log'); + walFile = new WriteAheadLogFile({ file: filePath, codec: stringCodec() }); + + walFile.open(); + walFile.append('record1'); + walFile.append('record2'); + walFile.close(); + + const recovered = walFile.recover(); + expect(recovered.records).toEqual(['record1', 'record2']); + expect(recovered.errors).toEqual([]); + expect(recovered.partialTail).toBeNull(); + }); +}); diff --git a/packages/utils/src/lib/wal.ts b/packages/utils/src/lib/wal.ts index f0dc87a83..b504beabf 100644 --- a/packages/utils/src/lib/wal.ts +++ b/packages/utils/src/lib/wal.ts @@ -1,8 +1,5 @@ -/* eslint-disable max-lines */ import * as fs from 'node:fs'; import path from 'node:path'; -import process from 'node:process'; -import { threadId } from 'node:worker_threads'; /** * Codec for encoding/decoding values to/from strings for WAL storage. @@ -17,6 +14,18 @@ export type Codec = { export type InvalidEntry = { __invalid: true; raw: O }; +// eslint-disable-next-line @typescript-eslint/no-unused-vars +type CodecInput = C extends Codec ? I : never; +// eslint-disable-next-line @typescript-eslint/no-unused-vars +type CodecOutput = C extends Codec ? O : never; + +export type TolerantCodec = Codec< + CodecInput | InvalidEntry>, + CodecOutput +>; + +export type WalRecord = object | string; + /** * Interface for sinks that can append items. * Allows for different types of appendable storage (WAL, in-memory, etc.) @@ -28,29 +37,29 @@ export type AppendableSink = Recoverable & { close?: () => void; }; -/** - * Interface for sinks that support recovery operations. - * Represents the recoverable subset of AppendableSink functionality. - */ -export type Recoverable = { - recover: () => RecoverResult; - repack: (out?: string) => void; - finalize?: (opt?: Record) => void; -}; - /** * Result of recovering records from a WAL file. * Contains successfully recovered records and any errors encountered during parsing. */ export type RecoverResult = { /** Successfully recovered records */ - records: T[]; + records: (T | InvalidEntry)[]; /** Errors encountered during recovery with line numbers and context */ errors: { lineNo: number; line: string; error: Error }[]; /** Last incomplete line if file was truncated (null if clean) */ partialTail: string | null; }; +/** + * Interface for sinks that support recovery operations. + * Represents the recoverable subset of AppendableSink functionality. + */ +export type Recoverable = { + recover: () => RecoverResult; + repack: (out?: string) => void; + finalize?: (opt?: Record) => void; +}; + /** * Statistics about the WAL file state and last recovery operation. */ @@ -59,10 +68,6 @@ export type WalStats = { filePath: string; /** Whether the WAL file is currently closed */ isClosed: boolean; - /** Whether the WAL file exists on disk */ - fileExists: boolean; - /** File size in bytes (0 if file doesn't exist) */ - fileSize: number; /** Last recovery state from the most recent {@link recover} or {@link repack} operation */ lastRecovery: RecoverResult> | null; }; @@ -145,7 +150,9 @@ export function recoverFromContent( * Write-Ahead Log implementation for crash-safe append-only logging. * Provides atomic operations for writing, recovering, and repacking log entries. */ -export class WriteAheadLogFile implements AppendableSink { +export class WriteAheadLogFile + implements AppendableSink +{ #fd: number | null = null; readonly #file: string; readonly #decode: Codec>['decode']; @@ -157,8 +164,9 @@ export class WriteAheadLogFile implements AppendableSink { * @param options - Configuration options */ constructor(options: { file: string; codec: Codec }) { - this.#file = options.file; - const c = createTolerantCodec(options.codec); + const { file, codec } = options; + this.#file = file; + const c = createTolerantCodec(codec); this.#decode = c.decode; this.#encode = c.encode; } @@ -239,9 +247,8 @@ export class WriteAheadLogFile implements AppendableSink { // eslint-disable-next-line no-console console.log('Found invalid entries during WAL repack'); } - const recordsToWrite = hasInvalidEntries - ? (r.records as T[]) - : filterValidRecords(r.records); + // Always filter out invalid entries when repacking + const recordsToWrite = filterValidRecords(r.records); ensureDirectoryExistsSync(path.dirname(out)); fs.writeFileSync(out, `${recordsToWrite.map(this.#encode).join('\n')}\n`); } @@ -252,12 +259,9 @@ export class WriteAheadLogFile implements AppendableSink { * @returns Statistics object with file info and last recovery state */ getStats(): WalStats { - const fileExists = fs.existsSync(this.#file); return { filePath: this.#file, isClosed: this.#fd == null, - fileExists, - fileSize: fileExists ? fs.statSync(this.#file).size : 0, lastRecovery: this.#lastRecoveryState, }; } @@ -267,7 +271,7 @@ export class WriteAheadLogFile implements AppendableSink { * Format descriptor that binds codec and file extension together. * Prevents misconfiguration by keeping related concerns in one object. */ -export type WalFormat = { +export type WalFormat = { /** Base name for the WAL (e.g., "trace") */ baseName: string; /** Shard file extension (e.g., ".jsonl") */ @@ -277,21 +281,27 @@ export type WalFormat = { /** Codec for encoding/decoding records */ codec: Codec; /** Finalizer for converting records to a string */ + finalizer: (records: T[], opt?: Record) => string; +}; + +export type WalFormatWithInvalids = Omit< + WalFormat, + 'codec' | 'finalizer' +> & { + codec: TolerantCodec>; finalizer: ( records: (T | InvalidEntry)[], opt?: Record, ) => string; }; -export const stringCodec = < - T extends string | object = string, ->(): Codec => ({ - encode: v => (typeof v === 'string' ? v : JSON.stringify(v)), +export const stringCodec = (): Codec => ({ + encode: v => JSON.stringify(v), decode: v => { try { return JSON.parse(v) as T; } catch { - return v as T; + return v as unknown as T; } }, }); @@ -309,7 +319,7 @@ export const stringCodec = < * @param format - Partial WalFormat configuration * @returns Parsed WalFormat with defaults filled in */ -export function parseWalFormat( +export function parseWalFormat( format: Partial>, ): WalFormat { const { @@ -317,127 +327,23 @@ export function parseWalFormat( walExtension = '.log', finalExtension = walExtension, codec = stringCodec(), + finalizer, } = format; - const finalizer = - format.finalizer ?? - ((records: (T | InvalidEntry)[]) => { - // Encode each record using the codec before joining. - // For object types, codec.encode() will JSON-stringify them properly. - // InvalidEntry records use their raw string value directly. - const encoded = records.map(record => - typeof record === 'object' && record != null && '__invalid' in record - ? (record as InvalidEntry).raw - : codec.encode(record as T), - ); - return `${encoded.join('\n')}\n`; - }); - return { baseName, walExtension, finalExtension, codec, - finalizer, - } satisfies WalFormat; -} - -/** - * Determines if this process is the leader WAL process using the origin PID heuristic. - * - * The leader is the process that first enabled profiling (the one that set CP_PROFILER_ORIGIN_PID). - * All descendant processes inherit the environment but have different PIDs. - * - * @returns true if this is the leader WAL process, false otherwise - */ -export function isCoordinatorProcess( - envVarName: string, - profilerID: string, -): boolean { - return process.env[envVarName] === profilerID; -} - -/** - * Initialize the origin PID environment variable if not already set. - * This must be done as early as possible before any user code runs. - * Sets envVarName to the current process ID if not already defined. - */ -export function setCoordinatorProcess( - envVarName: string, - profilerID: string, -): void { - if (!process.env[envVarName]) { - // eslint-disable-next-line functional/immutable-data - process.env[envVarName] = profilerID; - } -} - -// eslint-disable-next-line functional/no-let -let shardCount = 0; - -/** - * Generates a unique sharded WAL ID based on performance time origin, process ID, thread ID, and instance count. - */ -function getShardedWalId() { - // eslint-disable-next-line functional/immutable-data - return `${Math.round(performance.timeOrigin)}.${process.pid}.${threadId}.${++ShardedWal.instanceCount}`; -} - -/** - * Generates a human-readable shard ID. - * This ID is unique per process/thread/shard combination and used in the file name. - * Format: readable-timestamp.pid.threadId.shardCount - * Example: "20240101-120000-000.12345.1.1" - * Becomes file: trace.20240101-120000-000.12345.1.1.log - */ -export function getShardId(): string { - const timestamp = Math.round(performance.timeOrigin + performance.now()); - const readableTimestamp = sortableReadableDateString(`${timestamp}`); - return `${readableTimestamp}.${process.pid}.${threadId}.${++shardCount}`; -} - -/** - * Generates a human-readable sharded group ID. - * This ID is a globally unique, sortable, human-readable date string per run. - * Used directly as the folder name to group shards. - * Format: yyyymmdd-hhmmss-ms - * Example: "20240101-120000-000" - */ -export function getShardedGroupId(): string { - return sortableReadableDateString( - `${Math.round(performance.timeOrigin + performance.now())}`, - ); -} - -/** - * Regex patterns for validating WAL ID formats - */ -export const WAL_ID_PATTERNS = { - /** Readable date format: yyyymmdd-hhmmss-ms */ - READABLE_DATE: /^\d{8}-\d{6}-\d{3}$/, - /** Group ID format: yyyymmdd-hhmmss-ms */ - GROUP_ID: /^\d{8}-\d{6}-\d{3}$/, - /** Shard ID format: readable-date.pid.threadId.count */ - SHARD_ID: /^\d{8}-\d{6}-\d{3}(?:\.\d+){3}$/, -} as const; - -export function sortableReadableDateString(timestampMs: string): string { - const timestamp = Number.parseInt(timestampMs, 10); - const date = new Date(timestamp); - const MILLISECONDS_PER_SECOND = 1000; - const yyyy = date.getFullYear(); - const mm = String(date.getMonth() + 1).padStart(2, '0'); - const dd = String(date.getDate()).padStart(2, '0'); - const hh = String(date.getHours()).padStart(2, '0'); - const min = String(date.getMinutes()).padStart(2, '0'); - const ss = String(date.getSeconds()).padStart(2, '0'); - // eslint-disable-next-line @typescript-eslint/no-magic-numbers - const ms = String(timestamp % MILLISECONDS_PER_SECOND).padStart(3, '0'); - - return `${yyyy}${mm}${dd}-${hh}${min}${ss}-${ms}`; + finalizer: + finalizer ?? + ((records, _opt) => + `${records.map(record => codec.encode(record)).join('\n')}\n`), + }; } /** + * NOTE: this helper is only used in this file. The rest of the repo avoids sync methods so it is not reusable. * Ensures a directory exists, creating it recursively if necessary using sync methods. * @param dirPath - The directory path to ensure exists */ @@ -446,177 +352,3 @@ function ensureDirectoryExistsSync(dirPath: string): void { fs.mkdirSync(dirPath, { recursive: true }); } } - -/** - * Generates a path to a shard file using human-readable IDs. - * Both groupId and shardId are already in readable date format. - * - * Example with groupId "20240101-120000-000" and shardId "20240101-120000-000.12345.1.1": - * Full path: /base/20240101-120000-000/trace.20240101-120000-000.12345.1.1.log - * - * @param opt.dir - The directory to store the shard file - * @param opt.format - The WalFormat to use for the shard file - * @param opt.groupId - The human-readable group ID (yyyymmdd-hhmmss-ms format) - * @param opt.shardId - The human-readable shard ID (readable-timestamp.pid.threadId.count format) - * @returns The path to the shard file - */ -export function getShardedPath(opt: { - dir?: string; - format: WalFormat; - groupId: string; - shardId: string; -}): string { - const { dir = '', format, groupId, shardId } = opt; - const { baseName, walExtension } = format; - - return path.join(dir, groupId, `${baseName}.${shardId}${walExtension}`); -} - -export function getShardedFinalPath(opt: { - dir?: string; - format: WalFormat; - groupId: string; -}): string { - const { dir = '', format, groupId } = opt; - const { baseName, finalExtension } = format; - - return path.join(dir, groupId, `${baseName}.${groupId}${finalExtension}`); -} - -/** - * Sharded Write-Ahead Log manager for coordinating multiple WAL shards. - * Handles distributed logging across multiple processes/files with atomic finalization. - */ - -export class ShardedWal { - static instanceCount = 0; - readonly #id: string = getShardedWalId(); - readonly groupId = getShardedGroupId(); - readonly #format: WalFormat; - readonly #dir: string = process.cwd(); - readonly #isCoordinator: boolean; - - /** - * Create a sharded WAL manager. - * - * @param opt.dir - Base directory to store shard files (defaults to process.cwd()) - * @param opt.format - WAL format configuration - * @param opt.groupId - Group ID for sharding (defaults to generated group ID) - * @param opt.coordinatorIdEnvVar - Environment variable name for storing coordinator ID (defaults to CP_SHARDED_WAL_COORDINATOR_ID) - */ - constructor(opt: { - dir?: string; - format: Partial>; - groupId?: string; - coordinatorIdEnvVar: string; - }) { - const { dir, format, groupId, coordinatorIdEnvVar } = opt; - this.groupId = groupId ?? getShardedGroupId(); - if (dir) { - this.#dir = dir; - } - this.#format = parseWalFormat(format); - this.#isCoordinator = isCoordinatorProcess(coordinatorIdEnvVar, this.#id); - } - - /** - * Is this instance the coordinator? - * - * Coordinator status is determined from the coordinatorIdEnvVar environment variable. - * The coordinator handles finalization and cleanup of shard files. - * - * @returns true if this instance is the coordinator, false otherwise - */ - isCoordinator(): boolean { - return this.#isCoordinator; - } - - shard(shardId: string = getShardId()) { - return new WriteAheadLogFile({ - file: getShardedPath({ - dir: this.#dir, - format: this.#format, - groupId: this.groupId, - shardId, - }), - codec: this.#format.codec, - }); - } - - /** Get all shard file paths matching this WAL's base name */ - private shardFiles() { - if (!fs.existsSync(this.#dir)) { - return []; - } - - const groupIdDir = path.dirname( - getShardedFinalPath({ - dir: this.#dir, - format: this.#format, - groupId: this.groupId, - }), - ); - // create dir if not existing - ensureDirectoryExistsSync(groupIdDir); - - return fs - .readdirSync(groupIdDir) - .filter(entry => entry.endsWith(this.#format.walExtension)) - .filter(entry => entry.startsWith(`${this.#format.baseName}`)) - .map(entry => path.join(groupIdDir, entry)); - } - - /** - * Finalize all shards by merging them into a single output file. - * Recovers all records from all shards, validates no errors, and writes merged result. - * @throws Error if any shard contains decode errors - */ - finalize(opt?: Record) { - const fileRecoveries = this.shardFiles().map(f => ({ - file: f, - recovery: new WriteAheadLogFile({ - file: f, - codec: this.#format.codec, - }).recover(), - })); - - const records = fileRecoveries.flatMap(({ recovery }) => recovery.records); - - // Check if any records are invalid entries (from tolerant codec) - const hasInvalidEntries = records.some( - r => typeof r === 'object' && r != null && '__invalid' in r, - ); - - const recordsToFinalize = hasInvalidEntries - ? records - : filterValidRecords(records); - const out = getShardedFinalPath({ - dir: this.#dir, - format: this.#format, - groupId: this.groupId, - }); - ensureDirectoryExistsSync(path.dirname(out)); - fs.writeFileSync(out, this.#format.finalizer(recordsToFinalize, opt)); - } - - cleanup() { - this.shardFiles().forEach(f => { - // Remove the shard file - fs.unlinkSync(f); - // Remove the parent directory (shard group directory) - const shardDir = path.dirname(f); - try { - fs.rmdirSync(shardDir); - } catch { - // Directory might not be empty or already removed, ignore - } - }); - - // Also try to remove the root directory if it becomes empty - try { - fs.rmdirSync(this.#dir); - } catch { - // Directory might not be empty or already removed, ignore - } - } -} diff --git a/packages/utils/src/lib/wal.unit.test.ts b/packages/utils/src/lib/wal.unit.test.ts index 4221d4f0f..03513c57d 100644 --- a/packages/utils/src/lib/wal.unit.test.ts +++ b/packages/utils/src/lib/wal.unit.test.ts @@ -1,27 +1,18 @@ import { vol } from 'memfs'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; import { MEMFS_VOLUME } from '@code-pushup/test-utils'; -import { SHARDED_WAL_COORDINATOR_ID_ENV_VAR } from './profiler/constants.js'; import { type Codec, - type InvalidEntry, - ShardedWal, - WAL_ID_PATTERNS, WriteAheadLogFile, createTolerantCodec, filterValidRecords, - getShardId, - getShardedGroupId, - isCoordinatorProcess, parseWalFormat, recoverFromContent, - setCoordinatorProcess, stringCodec, } from './wal.js'; const read = (p: string) => vol.readFileSync(p, 'utf8') as string; - const write = (p: string, c: string) => vol.writeFileSync(p, c); - const wal = ( file: string, codec: Codec = stringCodec(), @@ -38,8 +29,7 @@ describe('createTolerantCodec', () => { }, }); expect(() => c.encode(42)).toThrow('encoding error'); - const result = c.decode('42'); - expect(result).toEqual({ __invalid: true, raw: '42' }); + expect(c.decode('42')).toEqual({ __invalid: true, raw: '42' }); }); it('round-trips valid values and preserves invalid ones', () => { @@ -52,7 +42,6 @@ describe('createTolerantCodec', () => { }, }); expect(c.decode(c.encode(42))).toBe(42); - const invalid = c.decode('x'); expect(invalid).toStrictEqual({ __invalid: true, raw: 'x' }); expect(c.encode(invalid)).toBe('x'); @@ -66,8 +55,7 @@ describe('filterValidRecords', () => { { __invalid: true, raw: 'x' }, { id: 3, name: 'valid3' }, ]; - const result = filterValidRecords(records); - expect(result).toEqual([ + expect(filterValidRecords(records)).toEqual([ { id: 1, name: 'valid1' }, { id: 3, name: 'valid3' }, ]); @@ -76,8 +64,7 @@ describe('filterValidRecords', () => { describe('recoverFromContent', () => { it('recovers valid records', () => { - const content = 'a\nb\n'; - const result = recoverFromContent(content, stringCodec().decode); + const result = recoverFromContent('a\nb\n', stringCodec().decode); expect(result).toEqual({ records: ['a', 'b'], errors: [], @@ -86,9 +73,7 @@ describe('recoverFromContent', () => { }); it('handles empty content', () => { - const content = ''; - const result = recoverFromContent(content, stringCodec().decode); - expect(result).toEqual({ + expect(recoverFromContent('', stringCodec().decode)).toEqual({ records: [], errors: [], partialTail: null, @@ -96,18 +81,13 @@ describe('recoverFromContent', () => { }); it('handles content without trailing newline', () => { - const content = 'a\nb'; - const result = recoverFromContent(content, stringCodec().decode); - expect(result).toEqual({ - records: ['a'], - errors: [], - partialTail: 'b', - }); + const result = recoverFromContent('a\nb', stringCodec().decode); + expect(result.records).toEqual(['a']); + expect(result.partialTail).toBe('b'); }); it('skips empty lines', () => { - const content = 'a\n\nb\n'; - const result = recoverFromContent(content, stringCodec().decode); + const result = recoverFromContent('a\n\nb\n', stringCodec().decode); expect(result).toEqual({ records: ['a', 'b'], errors: [], @@ -124,9 +104,7 @@ describe('recoverFromContent', () => { }, }; - const content = 'good\nbad\ngood\n'; - const result = recoverFromContent(content, failingCodec.decode); - + const result = recoverFromContent('good\nbad\ngood\n', failingCodec.decode); expect(result.records).toEqual(['good', 'good']); expect(result.errors).toHaveLength(1); expect(result.errors[0]).toEqual({ @@ -134,7 +112,6 @@ describe('recoverFromContent', () => { line: 'bad', error: expect.any(Error), }); - expect(result.errors.at(0)?.error.message).toBe('Bad record'); expect(result.partialTail).toBeNull(); }); @@ -147,9 +124,10 @@ describe('recoverFromContent', () => { }, }; - const content = 'good\nbad\npartial'; - const result = recoverFromContent(content, failingCodec.decode); - + const result = recoverFromContent( + 'good\nbad\npartial', + failingCodec.decode, + ); expect(result.records).toEqual(['good']); expect(result.errors).toHaveLength(1); expect(result.errors.at(0)?.lineNo).toBe(2); @@ -163,416 +141,224 @@ describe('WriteAheadLogFile', () => { vol.fromJSON({}, MEMFS_VOLUME); }); - it('should act as WLA for any kind of data', () => { - const w = wal('/test/a.log', stringCodec()); - w.open(); - w.append({ id: 1, name: 'test' }); - w.close(); - expect(w.recover().records).toStrictEqual([{ id: 1, name: 'test' }]); - w.open(); - expect(() => - w.append('{ id: 1, name:...' as unknown as object), - ).not.toThrow(); - w.close(); - expect(w.recover().records).toStrictEqual([ - { id: 1, name: 'test' }, - '{ id: 1, name:...', - ]); - }); - - it('should create instance with file path and codecs without opening', () => { - const w = wal('/test/a.log'); - expect(w).toBeInstanceOf(WriteAheadLogFile); - expect(w.getPath()).toBe('/test/a.log'); - expect(w.isClosed()).toBeTrue(); - }); - - it('throws error when appending without opening', () => { - const w = wal('/test/a.log'); - expect(w.isClosed()).toBeTrue(); - expect(() => w.append('a')).toThrow('WAL not opened'); - }); - - it('opens and closes correctly', () => { - const w = wal('/test/a.log'); - expect(w.isClosed()).toBeTrue(); - w.open(); - expect(w.isClosed()).toBeFalse(); - w.close(); - expect(w.isClosed()).toBeTrue(); - }); - - it('multiple open calls are idempotent', () => { - const w = wal('/test/a.log'); - expect(w.isClosed()).toBeTrue(); - - w.open(); - expect(w.isClosed()).toBeFalse(); - - w.open(); - expect(w.isClosed()).toBeFalse(); - w.open(); - expect(w.isClosed()).toBeFalse(); - - w.close(); - expect(w.isClosed()).toBeTrue(); - }); - - it('append lines if opened', () => { - vol.mkdirSync('/test', { recursive: true }); - const w = wal('/test/a.log'); - w.open(); - w.append('a'); - w.append('b'); - - expect(read('/test/a.log')).toBe('a\nb\n'); - }); - - it('appends records with encode logic', () => { - const w = wal('/test/a.log'); - w.open(); - - w.append('any string'); - expect(read('/test/a.log')).toBe('any string\n'); + describe('initialization', () => { + it('should create instance with file path and codec without opening', () => { + const w = wal('/test/a.log'); + expect(w).toBeInstanceOf(WriteAheadLogFile); + expect(w.getPath()).toBe('/test/a.log'); + expect(w.isClosed()).toBeTrue(); + }); }); - it('returns empty result when file does not exist', () => { - const w = wal('/test/nonexistent.log'); - const result = w.recover(); + describe('lifecycle', () => { + it('opens and closes correctly', () => { + const w = wal('/test/a.log'); + expect(w.isClosed()).toBeTrue(); + w.open(); + expect(w.isClosed()).toBeFalse(); + w.close(); + expect(w.isClosed()).toBeTrue(); + }); - expect(result).toEqual({ - records: [], - errors: [], - partialTail: null, + it('multiple open calls are idempotent', () => { + const w = wal('/test/a.log'); + w.open(); + expect(w.isClosed()).toBeFalse(); + w.open(); + w.open(); + expect(w.isClosed()).toBeFalse(); + w.close(); + expect(w.isClosed()).toBeTrue(); }); }); - it('can recover without opening (reads file directly)', () => { - vol.mkdirSync('/test', { recursive: true }); - write('/test/a.log', 'line1\nline2\n'); - const w = wal('/test/a.log'); + describe('append operations', () => { + it('throws error when appending without opening', () => { + const w = wal('/test/a.log'); + expect(() => w.append('a')).toThrow('WAL not opened'); + }); - const result = w.recover(); - expect(result.records).toStrictEqual(['line1', 'line2']); - expect(result.errors).toEqual([]); - }); + it('appends records with encoding', () => { + vol.mkdirSync('/test', { recursive: true }); + const w = wal('/test/a.log'); + w.open(); + w.append('a'); + w.append('b'); + expect(read('/test/a.log')).toBe('"a"\n"b"\n'); + }); - it('recovers valid records if opened', () => { - vol.mkdirSync('/test', { recursive: true }); - write('/test/a.log', 'line1\nline2\n'); - const w = wal('/test/a.log'); - w.open(); - expect(w.recover()).toStrictEqual({ - records: ['line1', 'line2'], - errors: [], - partialTail: null, + it('handles any kind of data', () => { + const w = wal('/test/a.log', stringCodec()); + w.open(); + w.append({ id: 1, name: 'test' }); + w.close(); + expect(w.recover().records).toStrictEqual([{ id: 1, name: 'test' }]); }); }); - it('recovers with decode errors and partial tail using tolerant codec', () => { - vol.mkdirSync('/test', { recursive: true }); - write('/test/a.log', 'ok\nbad\npartial'); - - const tolerantCodec = createTolerantCodec({ - encode: (s: string) => s, - decode: (s: string) => { - if (s === 'bad') throw new Error('Bad record'); - return s; - }, + describe('recovery operations', () => { + it('returns empty result when file does not exist', () => { + const result = wal('/test/nonexistent.log').recover(); + expect(result).toEqual({ + records: [], + errors: [], + partialTail: null, + }); }); - expect(wal('/test/a.log', tolerantCodec).recover()).toStrictEqual({ - records: ['ok', { __invalid: true, raw: 'bad' }], - errors: [], - partialTail: 'partial', + it('recovers valid records from file', () => { + vol.mkdirSync('/test', { recursive: true }); + write('/test/a.log', 'line1\nline2\n'); + const result = wal('/test/a.log').recover(); + expect(result.records).toStrictEqual(['line1', 'line2']); + expect(result.errors).toEqual([]); + expect(result.partialTail).toBeNull(); }); - }); - it('repacks clean file without errors', () => { - vol.mkdirSync('/test', { recursive: true }); - write('/test/a.log', 'a\nb\n'); - wal('/test/a.log').repack(); - expect(read('/test/a.log')).toBe('a\nb\n'); - }); + it('recovers with decode errors and partial tail using tolerant codec', () => { + vol.mkdirSync('/test', { recursive: true }); + write('/test/a.log', 'ok\nbad\npartial'); - it('repacks with decode errors using tolerant codec', () => { - vol.mkdirSync('/test', { recursive: true }); - write('/test/a.log', 'ok\nbad\n'); + const tolerantCodec = createTolerantCodec({ + encode: (s: string) => s, + decode: (s: string) => { + if (s === 'bad') throw new Error('Bad record'); + return s; + }, + }); - const tolerantCodec = createTolerantCodec({ - encode: (s: string) => s, - decode: (s: string) => { - if (s === 'bad') throw new Error('Bad record'); - return s; - }, + const result = wal('/test/a.log', tolerantCodec).recover(); + expect(result).toStrictEqual({ + records: ['ok', { __invalid: true, raw: 'bad' }], + errors: [], + partialTail: 'partial', + }); }); - - wal('/test/a.log', tolerantCodec).repack(); - expect(read('/test/a.log')).toBe('ok\nbad\n'); }); - it('logs decode errors during content recovery', () => { - const failingCodec: Codec = { - encode: (s: string) => s, - decode: (s: string) => { - if (s === 'bad') throw new Error('Bad record during recovery'); - return s; - }, - }; - - const content = 'good\nbad\ngood\n'; - const result = recoverFromContent(content, failingCodec.decode); + describe('repack operations', () => { + it('repacks clean file without errors', () => { + vol.mkdirSync('/test', { recursive: true }); + write('/test/a.log', '"a"\n"b"\n'); + wal('/test/a.log').repack(); + expect(read('/test/a.log')).toBe('"a"\n"b"\n'); + }); - expect(result.errors).toHaveLength(1); - expect(result.errors.at(0)?.error.message).toBe( - 'Bad record during recovery', - ); - expect(result.records).toEqual(['good', 'good']); - }); + it('repacks with decode errors using tolerant codec', () => { + const consoleLogSpy = vi + .spyOn(console, 'log') + .mockImplementation(() => {}); + vol.mkdirSync('/test', { recursive: true }); + write('/test/a.log', 'ok\nbad\n'); - it('repacks with invalid entries and logs warning', () => { - const consoleLogSpy = vi.spyOn(console, 'log').mockImplementation(() => {}); + const tolerantCodec = createTolerantCodec({ + encode: (s: string) => s, + decode: (s: string) => { + if (s === 'bad') throw new Error('Bad record'); + return s; + }, + }); - vol.mkdirSync('/test', { recursive: true }); - write('/test/a.log', 'ok\nbad\n'); + wal('/test/a.log', tolerantCodec).repack(); - const tolerantCodec = createTolerantCodec({ - encode: (s: string) => s, - decode: (s: string) => { - if (s === 'bad') throw new Error('Bad record'); - return s; - }, + expect(consoleLogSpy).toHaveBeenCalledWith( + 'Found invalid entries during WAL repack', + ); + // Repack filters out invalid entries, so only valid records remain + expect(read('/test/a.log')).toBe('ok\n'); + consoleLogSpy.mockRestore(); }); - wal('/test/a.log', tolerantCodec).repack(); - - expect(consoleLogSpy).toHaveBeenCalledWith( - 'Found invalid entries during WAL repack', - ); - expect(read('/test/a.log')).toBe('ok\nbad\n'); - - consoleLogSpy.mockRestore(); - }); + it('logs decode errors when recover returns errors', () => { + const consoleLogSpy = vi + .spyOn(console, 'log') + .mockImplementation(() => {}); + vol.mkdirSync('/test', { recursive: true }); + write('/test/a.log', 'content\n'); - it('recoverFromContent handles decode errors and returns them', () => { - const failingCodec: Codec = { - encode: (s: string) => s, - decode: (s: string) => { - if (s === 'bad') throw new Error('Bad record during recovery'); - return s; - }, - }; + const walInstance = wal('/test/a.log'); + const recoverSpy = vi.spyOn(walInstance, 'recover').mockReturnValue({ + records: ['content'], + errors: [ + { lineNo: 1, line: 'content', error: new Error('Mock decode error') }, + ], + partialTail: null, + }); - const content = 'good\nbad\ngood\n'; - const result = recoverFromContent(content, failingCodec.decode); + walInstance.repack(); - expect(result.records).toEqual(['good', 'good']); - expect(result.errors).toHaveLength(1); - expect(result).toHaveProperty( - 'errors', - expect.arrayContaining([ - { - lineNo: 2, - line: 'bad', - error: expect.any(Error), - }, - ]), - ); + expect(consoleLogSpy).toHaveBeenCalledWith( + 'WAL repack encountered decode errors', + ); + recoverSpy.mockRestore(); + consoleLogSpy.mockRestore(); + }); }); - it('repack logs decode errors when recover returns errors', () => { - const consoleLogSpy = vi.spyOn(console, 'log').mockImplementation(() => {}); - - vol.mkdirSync('/test', { recursive: true }); - write('/test/a.log', 'content\n'); - - const walInstance = wal('/test/a.log'); - - const recoverSpy = vi.spyOn(walInstance, 'recover').mockReturnValue({ - records: ['content'], - errors: [ - { lineNo: 1, line: 'content', error: new Error('Mock decode error') }, - ], - partialTail: null, + describe('statistics', () => { + it('getStats returns file information and recovery state', () => { + vol.mkdirSync('/test', { recursive: true }); + const w = wal('/test/a.log'); + const stats = w.getStats(); + expect(stats.filePath).toBe('/test/a.log'); + expect(stats.isClosed).toBeTrue(); + expect(stats.lastRecovery).toBeNull(); }); - - walInstance.repack(); - - expect(consoleLogSpy).toHaveBeenCalledWith( - 'WAL repack encountered decode errors', - ); - - recoverSpy.mockRestore(); - consoleLogSpy.mockRestore(); }); }); describe('stringCodec', () => { - it('should encode strings as-is', () => { - const codec = stringCodec(); - expect(codec.encode('hello')).toBe('hello'); - expect(codec.encode('')).toBe(''); - expect(codec.encode('with spaces')).toBe('with spaces'); - }); + it('encodes strings and objects as JSON', () => { + const codec = stringCodec(); + expect(codec.encode('hello')).toBe('"hello"'); + expect(codec.encode('')).toBe('""'); - it('should encode objects as JSON strings', () => { - const codec = stringCodec(); + const objCodec = stringCodec(); const obj = { name: 'test', value: 42 }; - expect(codec.encode(obj)).toBe('{"name":"test","value":42}'); - }); - - it('should encode mixed types correctly', () => { - const codec = stringCodec(); - expect(codec.encode('string value')).toBe('string value'); - expect(codec.encode({ key: 'value' })).toBe('{"key":"value"}'); - expect(codec.encode([1, 2, 3])).toBe('[1,2,3]'); + expect(objCodec.encode(obj)).toBe('{"name":"test","value":42}'); }); - it('should decode valid JSON strings', () => { - const codec = stringCodec(); - const jsonString = '{"name":"test","value":42}'; - const result = codec.decode(jsonString); - expect(result).toEqual({ name: 'test', value: 42 }); - }); - - it('should decode arrays from JSON strings', () => { - const codec = stringCodec(); - const jsonString = '[1,2,3]'; - const result = codec.decode(jsonString); - expect(result).toEqual([1, 2, 3]); + it('decodes valid JSON strings', () => { + const codec = stringCodec(); + expect(codec.decode('{"name":"test","value":42}')).toEqual({ + name: 'test', + value: 42, + }); + expect(codec.decode('[1,2,3]')).toEqual([1, 2, 3]); }); - it('should return strings as-is when JSON parsing fails', () => { - const codec = stringCodec(); + it('returns strings as-is when JSON parsing fails', () => { + const codec = stringCodec(); expect(codec.decode('not json')).toBe('not json'); - expect(codec.decode('hello world')).toBe('hello world'); - expect(codec.decode('')).toBe(''); - }); - - it('should handle malformed JSON gracefully', () => { - const codec = stringCodec(); expect(codec.decode('{invalid')).toBe('{invalid'); - expect(codec.decode('[1,2,')).toBe('[1,2,'); - expect(codec.decode('null')).toBeNull(); - }); - - it('should round-trip strings correctly', () => { - const codec = stringCodec(); - const original = 'hello world'; - const encoded = codec.encode(original); - const decoded = codec.decode(encoded); - expect(decoded).toBe(original); - }); - - it('should round-trip objects correctly', () => { - const codec = stringCodec(); - const original = { name: 'test', nested: { value: 123 } }; - const encoded = codec.encode(original); - const decoded = codec.decode(encoded); - expect(decoded).toEqual(original); - }); - - it('should round-trip arrays correctly', () => { - const codec = stringCodec(); - const original = [1, 'two', { three: 3 }]; - const encoded = codec.encode(original); - const decoded = codec.decode(encoded); - expect(decoded).toEqual(original); - }); - - it('should maintain type safety with generics', () => { - const stringCodecInstance = stringCodec(); - const str: string = stringCodecInstance.decode('test'); - expect(typeof str).toBe('string'); - - const objectCodecInstance = stringCodec<{ id: number; name: string }>(); - const obj = objectCodecInstance.decode('{"id":1,"name":"test"}'); - expect(obj).toEqual({ id: 1, name: 'test' }); - - const unionCodecInstance = stringCodec(); - expect(unionCodecInstance.decode('string')).toBe('string'); - expect(unionCodecInstance.decode('[1,2,3]')).toEqual([1, 2, 3]); }); - it('should handle special JSON values', () => { - const codec = stringCodec(); + it('handles special JSON values', () => { + const codec = stringCodec(); expect(codec.decode('null')).toBeNull(); expect(codec.decode('true')).toBeTrue(); expect(codec.decode('false')).toBeFalse(); - expect(codec.decode('"quoted string"')).toBe('quoted string'); expect(codec.decode('42')).toBe(42); }); -}); - -describe('getShardId', () => { - it('should generate shard ID with readable timestamp', () => { - const result = getShardId(); - - expect(result).toMatch(WAL_ID_PATTERNS.SHARD_ID); - expect(result).toStartWith('20231114-221320-000.'); - }); - - it('should generate different shard IDs for different calls', () => { - const result1 = getShardId(); - const result2 = getShardId(); - - expect(result1).not.toBe(result2); - expect(result1).toStartWith('20231114-221320-000.'); - expect(result2).toStartWith('20231114-221320-000.'); - }); - - it('should handle zero values', () => { - const result = getShardId(); - expect(result).toStartWith('20231114-221320-000.'); - }); - - it('should handle negative timestamps', () => { - const result = getShardId(); - - expect(result).toStartWith('20231114-221320-000.'); - }); - - it('should handle large timestamps', () => { - const result = getShardId(); - - expect(result).toStartWith('20231114-221320-000.'); - }); - - it('should generate incrementing counter', () => { - const result1 = getShardId(); - const result2 = getShardId(); - const parts1 = result1.split('.'); - const parts2 = result2.split('.'); - const counter1 = parts1.at(-1) as string; - const counter2 = parts2.at(-1) as string; + it('round-trips values correctly', () => { + const stringCodecInstance = stringCodec(); + const original = 'hello world'; + expect( + stringCodecInstance.decode(stringCodecInstance.encode(original)), + ).toBe(original); - expect(Number.parseInt(counter1, 10)).toBe( - Number.parseInt(counter2, 10) - 1, + const objectCodecInstance = stringCodec(); + const obj = { name: 'test', nested: { value: 123 } }; + expect(objectCodecInstance.decode(objectCodecInstance.encode(obj))).toEqual( + obj, ); }); }); -describe('getShardedGroupId', () => { - it('should work with mocked timeOrigin', () => { - const result = getShardedGroupId(); - - expect(result).toBe('20231114-221320-000'); - expect(result).toMatch(WAL_ID_PATTERNS.GROUP_ID); - }); - - it('should be idempotent within same process', () => { - const result1 = getShardedGroupId(); - const result2 = getShardedGroupId(); - - expect(result1).toBe(result2); - }); -}); - describe('parseWalFormat', () => { - it('should apply all defaults when given empty config', () => { + it('applies all defaults when given empty config', () => { const result = parseWalFormat({}); - expect(result.baseName).toBe('wal'); expect(result.walExtension).toBe('.log'); expect(result.finalExtension).toBe('.log'); @@ -580,441 +366,46 @@ describe('parseWalFormat', () => { expect(typeof result.finalizer).toBe('function'); }); - it('should use provided baseName and default others', () => { - const result = parseWalFormat({ baseName: 'test' }); - - expect(result.baseName).toBe('test'); - expect(result.walExtension).toBe('.log'); - expect(result.finalExtension).toBe('.log'); - }); - - it('should use provided walExtension and default finalExtension to match', () => { - const result = parseWalFormat({ walExtension: '.wal' }); - - expect(result.walExtension).toBe('.wal'); - expect(result.finalExtension).toBe('.wal'); - }); - - it('should use provided finalExtension independently', () => { + it('uses provided parameters and defaults others', () => { + const customCodec = stringCodec(); const result = parseWalFormat({ + baseName: 'test', walExtension: '.wal', finalExtension: '.json', + codec: customCodec, }); - + expect(result.baseName).toBe('test'); expect(result.walExtension).toBe('.wal'); expect(result.finalExtension).toBe('.json'); + expect(result.codec.encode('value')).toBe(customCodec.encode('value')); }); - it('should use provided codec', () => { - const customCodec = stringCodec(); - const result = parseWalFormat({ codec: customCodec }); - - expect(result.codec).toBe(customCodec); + it('defaults finalExtension to walExtension when not provided', () => { + const result = parseWalFormat({ walExtension: '.wal' }); + expect(result.walExtension).toBe('.wal'); + expect(result.finalExtension).toBe('.wal'); }); - it('should use custom finalizer function', () => { + it('uses custom finalizer function', () => { const customFinalizer = (records: any[]) => `custom: ${records.length}`; const result = parseWalFormat({ finalizer: customFinalizer }); - expect(result.finalizer(['a', 'b'])).toBe('custom: 2'); }); - it('should work with all custom parameters', () => { - const config = { - baseName: 'my-wal', - walExtension: '.wal', - finalExtension: '.json', - codec: stringCodec(), - finalizer: (records: any[]) => JSON.stringify(records), - }; - - const result = parseWalFormat(config); - - expect(result.baseName).toBe('my-wal'); - expect(result.walExtension).toBe('.wal'); - expect(result.finalExtension).toBe('.json'); - expect(result.codec).toBe(config.codec); - expect(result.finalizer(['test'])).toBe('["test"]'); - }); - - it('should use default finalizer when none provided', () => { - const result = parseWalFormat({ baseName: 'test' }); - expect(result.finalizer(['line1', 'line2'])).toBe('line1\nline2\n'); + it('uses default finalizer when none provided', () => { + const result = parseWalFormat({ baseName: 'test' }); + expect(result.finalizer(['line1', 'line2'])).toBe('"line1"\n"line2"\n'); expect(result.finalizer([])).toBe('\n'); }); - it('should encode objects to JSON strings in default finalizer', () => { - const result = parseWalFormat({ baseName: 'test' }); + it('encodes objects to JSON strings in default finalizer', () => { + const result = parseWalFormat({ baseName: 'test' }); const records = [ { id: 1, name: 'test' }, { id: 2, name: 'test2' }, ]; - const output = result.finalizer(records); - expect(output).toBe('{"id":1,"name":"test"}\n{"id":2,"name":"test2"}\n'); - }); - - it('should handle InvalidEntry in default finalizer', () => { - const result = parseWalFormat({ baseName: 'test' }); - const records: (string | InvalidEntry)[] = [ - 'valid', - { __invalid: true, raw: 'invalid-raw' }, - 'also-valid', - ]; - const output = result.finalizer(records); - expect(output).toBe('valid\ninvalid-raw\nalso-valid\n'); - }); - - it('should encode objects correctly when using default type parameter', () => { - // Test parseWalFormat({}) with default type parameter (object) - const result = parseWalFormat({}); - const records = [ - { id: 1, name: 'test1' }, - { id: 2, name: 'test2' }, - ]; - const output = result.finalizer(records); - // Should be JSON strings, not [object Object] - expect(output).toBe('{"id":1,"name":"test1"}\n{"id":2,"name":"test2"}\n'); - expect(output).not.toContain('[object Object]'); - }); -}); - -describe('isCoordinatorProcess', () => { - it('should return true when env var matches current pid', () => { - const profilerId = `${Math.round(performance.timeOrigin)}${process.pid}.1.0`; - vi.stubEnv('TEST_LEADER_PID', profilerId); - - const result = isCoordinatorProcess('TEST_LEADER_PID', profilerId); - expect(result).toBeTrue(); - }); - - it('should return false when env var does not match current profilerId', () => { - const wrongProfilerId = `${Math.round(performance.timeOrigin)}${process.pid}.2.0`; - vi.stubEnv('TEST_LEADER_PID', wrongProfilerId); - - const currentProfilerId = `${Math.round(performance.timeOrigin)}${process.pid}.1.0`; - const result = isCoordinatorProcess('TEST_LEADER_PID', currentProfilerId); - expect(result).toBeFalse(); - }); - - it('should return false when env var is not set', () => { - vi.stubEnv('NON_EXISTENT_VAR', undefined as any); - - const profilerId = `${Math.round(performance.timeOrigin)}${process.pid}.1.0`; - const result = isCoordinatorProcess('NON_EXISTENT_VAR', profilerId); - expect(result).toBeFalse(); - }); - - it('should return false when env var is empty string', () => { - vi.stubEnv('TEST_LEADER_PID', ''); - - const profilerId = `${Math.round(performance.timeOrigin)}${process.pid}.1.0`; - const result = isCoordinatorProcess('TEST_LEADER_PID', profilerId); - expect(result).toBeFalse(); - }); -}); - -describe('setCoordinatorProcess', () => { - beforeEach(() => { - // Clean up any existing TEST_ORIGIN_PID - // eslint-disable-next-line functional/immutable-data - delete process.env['TEST_ORIGIN_PID']; - }); - - it('should set env var when not already set', () => { - expect(process.env['TEST_ORIGIN_PID']).toBeUndefined(); - - const profilerId = `${Math.round(performance.timeOrigin)}${process.pid}.1.0`; - setCoordinatorProcess('TEST_ORIGIN_PID', profilerId); - - expect(process.env['TEST_ORIGIN_PID']).toBe(profilerId); - }); - - it('should not overwrite existing env var', () => { - const existingProfilerId = `${Math.round(performance.timeOrigin)}${process.pid}.1.0`; - const newProfilerId = `${Math.round(performance.timeOrigin)}${process.pid}.2.0`; - - vi.stubEnv('TEST_ORIGIN_PID', existingProfilerId); - setCoordinatorProcess('TEST_ORIGIN_PID', newProfilerId); - - expect(process.env['TEST_ORIGIN_PID']).toBe(existingProfilerId); - }); - - it('should set env var to profiler id', () => { - const profilerId = `${Math.round(performance.timeOrigin)}${process.pid}.1.0`; - setCoordinatorProcess('TEST_ORIGIN_PID', profilerId); - - expect(process.env['TEST_ORIGIN_PID']).toBe(profilerId); - }); -}); - -describe('ShardedWal', () => { - beforeEach(() => { - vol.reset(); - vol.fromJSON({}, MEMFS_VOLUME); - }); - - it('should create instance with directory and format', () => { - const sw = new ShardedWal({ - dir: '/test/shards', - format: { - baseName: 'test-wal', - }, - coordinatorIdEnvVar: SHARDED_WAL_COORDINATOR_ID_ENV_VAR, - }); - - expect(sw).toBeInstanceOf(ShardedWal); - }); - - it('should create shard with correct file path', () => { - const sw = new ShardedWal({ - dir: '/test/shards', - format: { - baseName: 'trace', - walExtension: '.log', - }, - coordinatorIdEnvVar: SHARDED_WAL_COORDINATOR_ID_ENV_VAR, - }); - - const shard = sw.shard('20231114-221320-000.1.2.3'); - expect(shard).toBeInstanceOf(WriteAheadLogFile); - expect(shard.getPath()).toMatchPath( - '/test/shards/20231114-221320-000/trace.20231114-221320-000.1.2.3.log', - ); - }); - - it('should create shard with default shardId when no argument provided', () => { - const sw = new ShardedWal({ - dir: '/test/shards', - format: { - baseName: 'trace', - walExtension: '.log', - }, - coordinatorIdEnvVar: SHARDED_WAL_COORDINATOR_ID_ENV_VAR, - }); - - const shard = sw.shard(); - expect(shard.getPath()).toStartWithPath( - '/test/shards/20231114-221320-000/trace.20231114-221320-000.10001', - ); - expect(shard.getPath()).toEndWithPath('.log'); - }); - - it('should list no shard files when directory does not exist', () => { - const sw = new ShardedWal({ - dir: '/nonexistent', - format: { - baseName: 'test-wal', - }, - coordinatorIdEnvVar: SHARDED_WAL_COORDINATOR_ID_ENV_VAR, - }); - const files = (sw as any).shardFiles(); - expect(files).toEqual([]); - }); - - it('should list no shard files when directory is empty', () => { - const sw = new ShardedWal({ - dir: '/empty', - format: { - baseName: 'test-wal', - }, - coordinatorIdEnvVar: SHARDED_WAL_COORDINATOR_ID_ENV_VAR, - }); - // Create the group directory (matches actual getShardedGroupId() output) - vol.mkdirSync('/empty/20231114-221320-000', { recursive: true }); - const files = (sw as any).shardFiles(); - expect(files).toEqual([]); - }); - - it('should list shard files matching extension', () => { - // Note: Real shard IDs look like "1704067200000.12345.1.1" (timestamp.pid.threadId.count) - // These test IDs use simplified format "001.1", "002.2" for predictability - vol.fromJSON({ - '/shards/20231114-221320-000/trace.19700101-000820-001.1.log': 'content1', - '/shards/20231114-221320-000/trace.19700101-000820-002.2.log': 'content2', - '/shards/other.txt': 'not a shard', - }); - - const sw = new ShardedWal({ - dir: '/shards', - format: { - baseName: 'trace', - walExtension: '.log', - }, - coordinatorIdEnvVar: SHARDED_WAL_COORDINATOR_ID_ENV_VAR, - }); - const files = (sw as any).shardFiles(); - - expect(files).toHaveLength(2); - expect(files).toEqual( - expect.arrayContaining([ - expect.pathToMatch( - '/shards/20231114-221320-000/trace.19700101-000820-001.1.log', - ), - expect.pathToMatch( - '/shards/20231114-221320-000/trace.19700101-000820-002.2.log', - ), - ]), - ); - }); - - it('should finalize empty shards to empty result', () => { - const sw = new ShardedWal({ - dir: '/shards', - format: { - baseName: 'final', - finalExtension: '.json', - finalizer: records => `${JSON.stringify(records)}\n`, - }, - coordinatorIdEnvVar: SHARDED_WAL_COORDINATOR_ID_ENV_VAR, - }); - - // Create the group directory - vol.mkdirSync('/shards/20231114-221320-000', { recursive: true }); - sw.finalize(); - - expect( - read('/shards/20231114-221320-000/final.20231114-221320-000.json'), - ).toBe('[]\n'); - }); - - it('should finalize multiple shards into single file', () => { - vol.fromJSON({ - '/shards/20231114-221320-000/merged.20240101-120000-001.1.log': - 'record1\n', - '/shards/20231114-221320-000/merged.20240101-120000-002.2.log': - 'record2\n', - }); - - const sw = new ShardedWal({ - dir: '/shards', - format: { - baseName: 'merged', - walExtension: '.log', - finalExtension: '.json', - finalizer: records => `${JSON.stringify(records)}\n`, - }, - coordinatorIdEnvVar: SHARDED_WAL_COORDINATOR_ID_ENV_VAR, - }); - - sw.finalize(); - - const result = JSON.parse( - read( - '/shards/20231114-221320-000/merged.20231114-221320-000.json', - ).trim(), - ); - expect(result).toEqual(['record1', 'record2']); - }); - - it('should handle invalid entries during finalize', () => { - vol.fromJSON({ - '/shards/20231114-221320-000/final.20240101-120000-001.1.log': 'valid\n', - '/shards/20231114-221320-000/final.20240101-120000-002.2.log': - 'invalid\n', - }); - const tolerantCodec = createTolerantCodec({ - encode: (s: string) => s, - decode: (s: string) => { - if (s === 'invalid') throw new Error('Bad record'); - return s; - }, - }); - - const sw = new ShardedWal({ - dir: '/shards', - format: { - baseName: 'final', - walExtension: '.log', - finalExtension: '.json', - codec: tolerantCodec, - finalizer: records => `${JSON.stringify(records)}\n`, - }, - coordinatorIdEnvVar: SHARDED_WAL_COORDINATOR_ID_ENV_VAR, - }); - - sw.finalize(); - - const result = JSON.parse( - read('/shards/20231114-221320-000/final.20231114-221320-000.json').trim(), - ); - expect(result).toHaveLength(2); - expect(result[0]).toBe('valid'); - expect(result[1]).toEqual({ __invalid: true, raw: 'invalid' }); - }); - - it('should cleanup shard files', () => { - vol.fromJSON({ - '/shards/20231114-221320-000/test.20231114-221320-000.10001.2.1.log': - 'content1', - '/shards/20231114-221320-000/test.20231114-221320-000.10001.2.2.log': - 'content2', - }); - const sw = new ShardedWal({ - dir: '/shards', - format: { - baseName: 'test', - walExtension: '.log', - }, - coordinatorIdEnvVar: SHARDED_WAL_COORDINATOR_ID_ENV_VAR, - }); - - expect(vol.toJSON()).toStrictEqual({ - '/shards/20231114-221320-000/test.20231114-221320-000.10001.2.1.log': - 'content1', - '/shards/20231114-221320-000/test.20231114-221320-000.10001.2.2.log': - 'content2', - }); - - sw.cleanup(); - - expect(vol.toJSON()).toStrictEqual({}); - }); - - it('should handle cleanup when some shard files do not exist', () => { - vol.fromJSON({ - '/shards/20231114-221320-000/test.20231114-221320-000.10001.2.1.log': - 'content1', - }); - - const sw = new ShardedWal({ - dir: '/shards', - format: { - baseName: 'test', - walExtension: '.log', - }, - coordinatorIdEnvVar: SHARDED_WAL_COORDINATOR_ID_ENV_VAR, - }); - - vol.unlinkSync( - '/shards/20231114-221320-000/test.20231114-221320-000.10001.2.1.log', - ); - expect(() => sw.cleanup()).not.toThrow(); - }); - - it('should use custom options in finalizer', () => { - vol.fromJSON({ - '/shards/20231114-221320-000/final.20231114-221320-000.10001.2.1.log': - 'record1\n', - }); - - const sw = new ShardedWal({ - dir: '/shards', - format: { - baseName: 'final', - walExtension: '.log', - finalExtension: '.json', - finalizer: (records, opt) => - `${JSON.stringify({ records, meta: opt })}\n`, - }, - coordinatorIdEnvVar: SHARDED_WAL_COORDINATOR_ID_ENV_VAR, - }); - - sw.finalize({ version: '1.0', compressed: true }); - - const result = JSON.parse( - read('/shards/20231114-221320-000/final.20231114-221320-000.json'), + expect(result.finalizer(records)).toBe( + '{"id":1,"name":"test"}\n{"id":2,"name":"test2"}\n', ); - expect(result.records).toEqual(['record1']); - expect(result.meta).toEqual({ version: '1.0', compressed: true }); }); });