From 1f252e722beb490e2c62fbb067a956750ea4a852 Mon Sep 17 00:00:00 2001 From: Andreas Bergmaier Date: Tue, 10 Mar 2026 23:43:57 +0100 Subject: [PATCH 1/6] fix(lds): Parse wal2json string values according to their type [wal2json serialises all (non-numeric, non-boolean, non-null) database values as strings](https://github.com/eulerto/wal2json/blob/f8bab055fafc196539c9652b6b5584228498c6eb/wal2json.c#L1337-L1384) into the JSON, and so far we failed to take their types into account when converting them to records. This now * makes wal2json output strings as far as possible (in particular for numeric types whose values did not always have a JSON number representation) * parses the values [just like `node-postgres` does it](https://github.com/brianc/node-postgres/blob/32ec730b51a1fd73bf97d65105c949729fa9ec80/packages/pg/lib/result.js#L68-L73) * instructs wal2json to output the type oids instead of the type names for this --- packages/lds/src/pg-logical-decoding.ts | 27 ++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/packages/lds/src/pg-logical-decoding.ts b/packages/lds/src/pg-logical-decoding.ts index 62f5dc8cf..3f1ef2c7d 100644 --- a/packages/lds/src/pg-logical-decoding.ts +++ b/packages/lds/src/pg-logical-decoding.ts @@ -17,6 +17,7 @@ declare module "pg" { interface Keys { keynames: Array; keytypes: Array; + keytypeoids: Array; keyvalues: Array; } @@ -34,7 +35,8 @@ export interface InsertChange extends Change { // https://github.com/eulerto/wal2json/blob/f81bf7af09324da656be87dfd53d20741c01e1e0/wal2json.c#L969 columnnames: Array; - columntypes: Array; + columntypes: Array; // with `include-types` option (default true) + columntypeoids: Array; // with `include-type-oids` option (default false) columnvalues: Array; } @@ -43,7 +45,8 @@ export interface UpdateChange extends Change { // https://github.com/eulerto/wal2json/blob/f81bf7af09324da656be87dfd53d20741c01e1e0/wal2json.c#L973 columnnames: Array; - columntypes: Array; + columntypes: Array; // with `include-types` option (default true) + columntypeoids: Array; // with `include-type-oids` option (default false) columnvalues: Array; // https://github.com/eulerto/wal2json/blob/f81bf7af09324da656be87dfd53d20741c01e1e0/wal2json.c#L992-L1003 @@ -57,16 +60,26 @@ export interface DeleteChange extends Change { oldkeys: Keys; } +const parse = (value: any, typeOid: number) => { + if (value === null) return null; + // wal2json always outputs `bool`s as boolean, not as string, irregardless of `numeric-data-types-as-string`. + if (typeOid === pg.types.builtins.BOOL) return value; + // FIXME: this should use `client.getTypeParser` or have some other non-global option to configure type parsing + const parser = pg.types.getTypeParser(typeOid, "text"); + return parser(value); +}; + export const changeToRecord = (change: InsertChange | UpdateChange) => { - const { columnnames, columnvalues } = change; - return columnnames.reduce((memo, name, i) => { - memo[name] = columnvalues[i]; + const { columnnames, columnvalues, columntypeoids } = change; + return columnnames.reduce>((memo, name, i) => { + memo[name] = parse(columnvalues[i], columntypeoids[i]); return memo; }, {}); }; export const changeToPk = (change: UpdateChange | DeleteChange) => { - return change.oldkeys.keyvalues; + const { keyvalues, keytypeoids } = change.oldkeys; + return keyvalues.map((value, i) => parse(value, keytypeoids[i])); }; interface Payload { @@ -173,7 +186,7 @@ export default class PgLogicalDecoding extends EventEmitter { await this.trackSelf(client); try { const { rows } = await client.query({ - text: `SELECT lsn, data FROM pg_catalog.pg_logical_slot_get_changes($1, $2, $3, 'add-tables', $4::text)`, + text: `SELECT lsn, data FROM pg_catalog.pg_logical_slot_get_changes($1, $2, $3, 'add-tables', $4::text, 'include-types', 'f', 'include-type-oids', 't', 'numeric-data-types-as-string', 't')`, values: [this.slotName, uptoLsn, uptoNchanges, this.tablePattern], rowMode: "array", }); From 6af5c2bbeb443f10ace6a733376b33c853e4769a Mon Sep 17 00:00:00 2001 From: Andreas Bergmaier Date: Thu, 12 Mar 2026 00:53:46 +0100 Subject: [PATCH 2/6] refactor(lds)/docs(lds): forward options from `subscribeToLogicalDecoding` to `PgLogicalDecoding` and add doc comments to describe them --- packages/lds/src/index.ts | 24 +++++++----------------- packages/lds/src/pg-logical-decoding.ts | 7 +++++-- 2 files changed, 12 insertions(+), 19 deletions(-) diff --git a/packages/lds/src/index.ts b/packages/lds/src/index.ts index 4e963da2d..51049fcaa 100755 --- a/packages/lds/src/index.ts +++ b/packages/lds/src/index.ts @@ -2,14 +2,13 @@ import PgLogicalDecoding, { changeToRecord, changeToPk, + LdsOptions, } from "./pg-logical-decoding"; import FatalError from "./fatal-error"; -export interface Options { - slotName?: string; - tablePattern?: string; +export interface Options extends LdsOptions { + /** Number of milliseconds between polls. Defaults to `200`. */ sleepDuration?: number; - temporary?: boolean; } const DROP_STALE_SLOTS_INTERVAL = 15 * 60 * 1000; @@ -57,18 +56,9 @@ export default async function subscribeToLogicalDecoding( callback: AnnounceCallback, options: Options = {} ): Promise { - const { - slotName = "postgraphile", - tablePattern = "*.*", - sleepDuration = 200, - temporary = false, - } = options; + const { sleepDuration = 200 } = options; let lastLsn: string | null = null; - const client = new PgLogicalDecoding(connectionString, { - tablePattern, - slotName, - temporary, - }); + const client = new PgLogicalDecoding(connectionString, options); // We must do this before we create the temporary slot, since errors will release a temporary slot immediately await client.dropStaleSlots(); @@ -81,7 +71,7 @@ export default async function subscribeToLogicalDecoding( } else if (e.code === "42710") { // Slot already exists; ignore. } else if (e.code === "42602") { - throw new FatalError(`Invalid slot name '${slotName}'?`, e); + throw new FatalError(`Invalid slot name '${client.slotName}'?`, e); } else { console.error( "An unhandled error occurred when attempting to create the replication slot:" @@ -151,7 +141,7 @@ export default async function subscribeToLogicalDecoding( } } } - if (!temporary && nextStaleCheck < Date.now()) { + if (!client.temporary && nextStaleCheck < Date.now()) { // Roughly every 15 minutes, drop stale slots. nextStaleCheck = Date.now() + DROP_STALE_SLOTS_INTERVAL; client.dropStaleSlots().catch(e => { diff --git a/packages/lds/src/pg-logical-decoding.ts b/packages/lds/src/pg-logical-decoding.ts index 3f1ef2c7d..4aafe7c91 100644 --- a/packages/lds/src/pg-logical-decoding.ts +++ b/packages/lds/src/pg-logical-decoding.ts @@ -94,9 +94,12 @@ const toLsnData = ([lsn, data]: [string, string]): Payload => ({ data: JSON.parse(data), }); -interface Options { +export interface LdsOptions { + /** The 'add-tables' wal2json parameter. Defaults to `*.*`. */ tablePattern?: string; + /** The [replication slot](https://www.postgresql.org/docs/current/logicaldecoding-explanation.html#LOGICALDECODING-REPLICATION-SLOTS) identifier. Defaults to `postgraphile`. */ slotName?: string; + /** Whether `.createSlot()` should create a temporary replication slot which will be limited to the `client` session and gets cleaned up automatically. Defaults to `false`. */ temporary?: boolean; } @@ -108,7 +111,7 @@ export default class PgLogicalDecoding extends EventEmitter { private pool: pg.Pool | null; private client: Promise | null; - constructor(connectionString: string, options?: Options) { + constructor(connectionString: string, options?: LdsOptions) { super(); this.connectionString = connectionString; const { From 6f19c6ffc59c721e15b110efe4cfe7278857389f Mon Sep 17 00:00:00 2001 From: Andreas Bergmaier Date: Thu, 12 Mar 2026 00:59:21 +0100 Subject: [PATCH 3/6] feat(lds): make type parsing configurable * Refactor `changeToRecord` and `changeToPk` from exports into methods of `PgLogicalDecoding` * add `LdsOptions.types` to choose parsers, use noop identity function by default * call `changeToRecord` only once on updates --- packages/lds/src/index.ts | 17 ++++---- packages/lds/src/pg-logical-decoding.ts | 52 +++++++++++++++---------- 2 files changed, 38 insertions(+), 31 deletions(-) diff --git a/packages/lds/src/index.ts b/packages/lds/src/index.ts index 51049fcaa..795fa1d05 100755 --- a/packages/lds/src/index.ts +++ b/packages/lds/src/index.ts @@ -1,9 +1,5 @@ /* eslint-disable no-console,curly */ -import PgLogicalDecoding, { - changeToRecord, - changeToPk, - LdsOptions, -} from "./pg-logical-decoding"; +import PgLogicalDecoding, { LdsOptions } from "./pg-logical-decoding"; import FatalError from "./fatal-error"; export interface Options extends LdsOptions { @@ -108,23 +104,24 @@ export default async function subscribeToLogicalDecoding( _: "insertC", schema, table, - data: changeToRecord(change), + data: client.changeToRecord(change), }; callback(announcement); } else if (change.kind === "update") { + const data = client.changeToRecord(change); const rowAnnouncement: UpdateRowAnnouncement = { _: "update", schema, table, - keys: changeToPk(change), - data: changeToRecord(change), + keys: client.changeToPk(change), + data, }; callback(rowAnnouncement); const collectionAnnouncement: UpdateCollectionAnnouncement = { _: "updateC", schema, table, - data: changeToRecord(change), + data, }; callback(collectionAnnouncement); } else if (change.kind === "delete") { @@ -132,7 +129,7 @@ export default async function subscribeToLogicalDecoding( _: "delete", schema, table, - keys: changeToPk(change), + keys: client.changeToPk(change), }; callback(announcement); } else { diff --git a/packages/lds/src/pg-logical-decoding.ts b/packages/lds/src/pg-logical-decoding.ts index 4aafe7c91..95eb4b1d6 100644 --- a/packages/lds/src/pg-logical-decoding.ts +++ b/packages/lds/src/pg-logical-decoding.ts @@ -60,27 +60,7 @@ export interface DeleteChange extends Change { oldkeys: Keys; } -const parse = (value: any, typeOid: number) => { - if (value === null) return null; - // wal2json always outputs `bool`s as boolean, not as string, irregardless of `numeric-data-types-as-string`. - if (typeOid === pg.types.builtins.BOOL) return value; - // FIXME: this should use `client.getTypeParser` or have some other non-global option to configure type parsing - const parser = pg.types.getTypeParser(typeOid, "text"); - return parser(value); -}; - -export const changeToRecord = (change: InsertChange | UpdateChange) => { - const { columnnames, columnvalues, columntypeoids } = change; - return columnnames.reduce>((memo, name, i) => { - memo[name] = parse(columnvalues[i], columntypeoids[i]); - return memo; - }, {}); -}; - -export const changeToPk = (change: UpdateChange | DeleteChange) => { - const { keyvalues, keytypeoids } = change.oldkeys; - return keyvalues.map((value, i) => parse(value, keytypeoids[i])); -}; +const id = (value: T): T => value; interface Payload { lsn: string; @@ -101,6 +81,8 @@ export interface LdsOptions { slotName?: string; /** Whether `.createSlot()` should create a temporary replication slot which will be limited to the `client` session and gets cleaned up automatically. Defaults to `false`. */ temporary?: boolean; + /** (Custom) [type parsers](https://node-postgres.com/features/queries#types) to deserialise the wal2json column string values. Pass `pg.types` to get the default type parsing. Defaults to `undefined`, that is raw values will get emitted. */ + types?: pg.CustomTypesConfig; } export default class PgLogicalDecoding extends EventEmitter { @@ -110,6 +92,7 @@ export default class PgLogicalDecoding extends EventEmitter { private tablePattern: string; private pool: pg.Pool | null; private client: Promise | null; + private readonly parse: (value: any, typeOid: number) => any; constructor(connectionString: string, options?: LdsOptions) { super(); @@ -118,10 +101,20 @@ export default class PgLogicalDecoding extends EventEmitter { tablePattern = "*.*", slotName = "postgraphile", temporary = false, + types, } = options || {}; this.tablePattern = tablePattern; this.slotName = slotName; this.temporary = temporary; + this.parse = types + ? (value: any, typeOid: number) => { + if (value === null) return null; + // wal2json always outputs `bool`s as boolean, not as string, irregardless of `numeric-data-types-as-string`. + if (typeOid === pg.types.builtins.BOOL) return value; + const parser = types.getTypeParser(typeOid, "text"); + return parser(value); + } + : id; // We just use the pool to get better error handling this.pool = new pg.Pool({ connectionString: this.connectionString, @@ -208,6 +201,23 @@ export default class PgLogicalDecoding extends EventEmitter { } } + public changeToRecord( + change: InsertChange | UpdateChange + ): Record { + const { columnnames, columnvalues, columntypeoids } = change; + return columnnames.reduce>((memo, name, i) => { + memo[name] = this.parse(columnvalues[i], columntypeoids[i]); + return memo; + }, {}); + } + + public changeToPk(change: UpdateChange | DeleteChange): any[] { + const { keyvalues, keytypeoids } = change.oldkeys; + return this.parse == id + ? keyvalues + : keyvalues.map((value, i) => this.parse(value, keytypeoids[i])); + } + public async close() { if (!this.temporary) { const client = await this.getClient(); From b51377a111507091ff173aca55d0acd8b5ed6203 Mon Sep 17 00:00:00 2001 From: Andreas Bergmaier Date: Thu, 12 Mar 2026 01:43:50 +0100 Subject: [PATCH 4/6] feat(lds): make wal2json parameters configurable * add `LdsOptions.params` * fix `numeric-data-types-as-string` to be set only if `types` are passed * adjust the `parse` function accordingly, to handle `number` values without a type parser * set `include-type-oids` only if `types` are passed * adjust `changeToRecord`/`changeToPk` to call `parse` only if type oids are present --- packages/lds/src/pg-logical-decoding.ts | 74 ++++++++++++++++--------- 1 file changed, 47 insertions(+), 27 deletions(-) diff --git a/packages/lds/src/pg-logical-decoding.ts b/packages/lds/src/pg-logical-decoding.ts index 95eb4b1d6..5e7fa0fa2 100644 --- a/packages/lds/src/pg-logical-decoding.ts +++ b/packages/lds/src/pg-logical-decoding.ts @@ -16,8 +16,8 @@ declare module "pg" { */ interface Keys { keynames: Array; - keytypes: Array; - keytypeoids: Array; + keytypes?: Array; // with `include-types` option (default true) + keytypeoids?: Array; // with `include-type-oids` option (default false) keyvalues: Array; } @@ -35,8 +35,8 @@ export interface InsertChange extends Change { // https://github.com/eulerto/wal2json/blob/f81bf7af09324da656be87dfd53d20741c01e1e0/wal2json.c#L969 columnnames: Array; - columntypes: Array; // with `include-types` option (default true) - columntypeoids: Array; // with `include-type-oids` option (default false) + columntypes?: Array; // with `include-types` option (default true) + columntypeoids?: Array; // with `include-type-oids` option (default false) columnvalues: Array; } @@ -45,8 +45,8 @@ export interface UpdateChange extends Change { // https://github.com/eulerto/wal2json/blob/f81bf7af09324da656be87dfd53d20741c01e1e0/wal2json.c#L973 columnnames: Array; - columntypes: Array; // with `include-types` option (default true) - columntypeoids: Array; // with `include-type-oids` option (default false) + columntypes?: Array; // with `include-types` option (default true) + columntypeoids?: Array; // with `include-type-oids` option (default false) columnvalues: Array; // https://github.com/eulerto/wal2json/blob/f81bf7af09324da656be87dfd53d20741c01e1e0/wal2json.c#L992-L1003 @@ -60,8 +60,6 @@ export interface DeleteChange extends Change { oldkeys: Keys; } -const id = (value: T): T => value; - interface Payload { lsn: string; data: { @@ -83,44 +81,58 @@ export interface LdsOptions { temporary?: boolean; /** (Custom) [type parsers](https://node-postgres.com/features/queries#types) to deserialise the wal2json column string values. Pass `pg.types` to get the default type parsing. Defaults to `undefined`, that is raw values will get emitted. */ types?: pg.CustomTypesConfig; + /** Extra [parameters to be passed to wal2json](https://github.com/eulerto/wal2json?tab=readme-ov-file#parameters). Use e.g. `{'numeric-data-types-as-string', 't'}` to make the type parsers apply to numeric values. */ + params?: Partial>; } export default class PgLogicalDecoding extends EventEmitter { public readonly slotName: string; public readonly temporary: boolean; - private connectionString: string; - private tablePattern: string; + private readonly getChangesQueryText: string; + private readonly parse: (value: any, typeOid: number) => any; private pool: pg.Pool | null; private client: Promise | null; - private readonly parse: (value: any, typeOid: number) => any; constructor(connectionString: string, options?: LdsOptions) { super(); - this.connectionString = connectionString; const { tablePattern = "*.*", slotName = "postgraphile", temporary = false, types, + params, } = options || {}; - this.tablePattern = tablePattern; this.slotName = slotName; this.temporary = temporary; + const parametersSql = Object.entries({ + "add-tables": tablePattern != "*.*" ? tablePattern : null, + "include-types": "f", // type names are unnecessary + "include-type-oids": types ? "t" : null, + "numeric-data-types-as-string": types ? "t" : null, + ...params, + }) + .flatMap(entry => (typeof entry[1] == "string" ? entry : [])) + .map(pg.Client.prototype.escapeLiteral) + .join(", "); + this.getChangesQueryText = `SELECT lsn, data FROM pg_catalog.pg_logical_slot_get_changes($1, $2, $3, ${parametersSql})`; this.parse = types ? (value: any, typeOid: number) => { if (value === null) return null; - // wal2json always outputs `bool`s as boolean, not as string, irregardless of `numeric-data-types-as-string`. - if (typeOid === pg.types.builtins.BOOL) return value; + // wal2json always outputs `bool`s as boolean + if (typeof value === "boolean") return value; // assert: typeOid === pg.types.builtins.BOOL + // wal2json outputs numeric data as numbers, unless `numeric-data-types-as-string` is set + if (typeof value === "number") return value; const parser = types.getTypeParser(typeOid, "text"); return parser(value); } - : id; + : (value, _) => value; // We just use the pool to get better error handling this.pool = new pg.Pool({ - connectionString: this.connectionString, + connectionString, max: 1, }); this.pool.on("error", this.onPoolError); + this.client = null; } public async dropStaleSlots() { @@ -181,9 +193,9 @@ export default class PgLogicalDecoding extends EventEmitter { const client = await this.getClient(); await this.trackSelf(client); try { - const { rows } = await client.query({ - text: `SELECT lsn, data FROM pg_catalog.pg_logical_slot_get_changes($1, $2, $3, 'add-tables', $4::text, 'include-types', 'f', 'include-type-oids', 't', 'numeric-data-types-as-string', 't')`, - values: [this.slotName, uptoLsn, uptoNchanges, this.tablePattern], + const { rows } = await client.query<[lsn: string, data: string]>({ + text: this.getChangesQueryText, + values: [this.slotName, uptoLsn, uptoNchanges], rowMode: "array", }); return rows.map(toLsnData); @@ -205,17 +217,25 @@ export default class PgLogicalDecoding extends EventEmitter { change: InsertChange | UpdateChange ): Record { const { columnnames, columnvalues, columntypeoids } = change; - return columnnames.reduce>((memo, name, i) => { - memo[name] = this.parse(columnvalues[i], columntypeoids[i]); - return memo; - }, {}); + return columnnames.reduce>( + columntypeoids + ? (memo, name, i) => { + memo[name] = this.parse(columnvalues[i], columntypeoids[i]); + return memo; + } + : (memo, name, i) => { + memo[name] = columnvalues[i]; + return memo; + }, + {} + ); } public changeToPk(change: UpdateChange | DeleteChange): any[] { const { keyvalues, keytypeoids } = change.oldkeys; - return this.parse == id - ? keyvalues - : keyvalues.map((value, i) => this.parse(value, keytypeoids[i])); + return keytypeoids + ? keyvalues.map((value, i) => this.parse(value, keytypeoids[i])) + : keyvalues; } public async close() { From a5226de1ff46e1765ed93570dd9cea579fce7026 Mon Sep 17 00:00:00 2001 From: Andreas Bergmaier Date: Thu, 12 Mar 2026 04:56:43 +0100 Subject: [PATCH 5/6] test(lds): add tests for `LdsOptions.types` and `.params` --- packages/lds/__tests__/helpers.ts | 6 +- packages/lds/__tests__/index.test.ts | 9 +- .../lds/__tests__/pg-logical-decoding.test.ts | 99 ++++++++++++++++++- 3 files changed, 106 insertions(+), 8 deletions(-) diff --git a/packages/lds/__tests__/helpers.ts b/packages/lds/__tests__/helpers.ts index c659cc0e6..af786ec85 100644 --- a/packages/lds/__tests__/helpers.ts +++ b/packages/lds/__tests__/helpers.ts @@ -1,5 +1,5 @@ import * as pg from "pg"; -import PgLogicalDecoding from "../src/pg-logical-decoding"; +import PgLogicalDecoding, { LdsOptions } from "../src/pg-logical-decoding"; export const DATABASE_URL = process.env.LDS_TEST_DATABASE_URL || "lds_test"; export { PoolClient } from "pg"; @@ -46,12 +46,14 @@ export async function withLdAndClient( } export async function withLd( - callback: (ld: PgLogicalDecoding) => Promise + callback: (ld: PgLogicalDecoding) => Promise, + options?: LdsOptions ): Promise { const slotName = "get_ld"; const ld = new PgLogicalDecoding(DATABASE_URL, { slotName, temporary: true, + ...options, }); await ld.createSlot(); try { diff --git a/packages/lds/__tests__/index.test.ts b/packages/lds/__tests__/index.test.ts index 101ed660e..716d84a14 100644 --- a/packages/lds/__tests__/index.test.ts +++ b/packages/lds/__tests__/index.test.ts @@ -28,11 +28,12 @@ test("gets expected data, cleans up, doesn't receive data after cleanup", async await sub.close(); expect(mockCallback).toHaveBeenCalledTimes(4); // Now run a new mutation, and expect the mockCallback not to have been called - await withClient(DATABASE_URL, pgClient => - pgClient.query( + await withClient(DATABASE_URL, async pgClient => { + await pgClient.query( "insert into app_public.foo(name) values ('temp') returning id" - ) - ); + ); + await sleep(100); + }); expect(mockCallback).toHaveBeenCalledTimes(4); const { diff --git a/packages/lds/__tests__/pg-logical-decoding.test.ts b/packages/lds/__tests__/pg-logical-decoding.test.ts index 4960b7f74..3968c221f 100644 --- a/packages/lds/__tests__/pg-logical-decoding.test.ts +++ b/packages/lds/__tests__/pg-logical-decoding.test.ts @@ -1,5 +1,13 @@ -import PgLogicalDecoding from "../src/pg-logical-decoding"; -import { tryDropSlot, DATABASE_URL, query, withLdAndClient } from "./helpers"; +import * as assert from "assert"; +import * as pg from "pg"; +import PgLogicalDecoding, { LdsOptions } from "../src/pg-logical-decoding"; +import { + tryDropSlot, + DATABASE_URL, + query, + withLdAndClient, + withLd, +} from "./helpers"; const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); @@ -271,3 +279,90 @@ test("multiple notifications", () => const changes3 = await ld.getChanges(); expect(changes3.length).toEqual(0); })); + +describe("parse results for", () => { + async function getUpdate(options: LdsOptions) { + const { + rows: [{ id }], + } = await query( + `insert into app_public.foo(name) values ('john doe') returning id;` + ); + return withLd(async ld => { + await query( + `update app_public.foo set name = 'jane doe' where id = $1;`, + [id] + ); + const rows = await ld.getChanges(); + const change = rows[0].data.change[0]; + assert.strictEqual(change.kind, "update" as const); + return { + id, + keys: ld.changeToPk(change), + data: ld.changeToRecord(change), + change, + }; + }, options); + } + + test("options without `types` should contain raw wal2json output", async () => { + const { id, keys, data } = await getUpdate({}); + expect(keys).toEqual([id]); + expect(data).toEqual({ + id, + name: "jane doe", + created_at: expect.any(String), // .stringMatching(isoDateRegex) + updated_at: expect.any(String), // .stringMatching(isoDateRegex) + }); + }); + test("options with `types` set to pg-types should parse output", async () => { + const getTypeParser = jest.fn(pg.types.getTypeParser); // like jest.spyOn(pg.types, "getTypeParser") but not globally shared + const { id, keys, data } = await getUpdate({ + types: { getTypeParser }, + }); + expect(keys).toEqual([id]); + expect(data.name).toEqual("jane doe"); + expect(data.created_at).toEqual(expect.any(Date)); + expect(data.updated_at).toEqual(expect.any(Date)); + expect(getTypeParser).toHaveBeenCalledTimes(5); + expect(getTypeParser.mock.calls).toEqual([ + // in changeToPk (id) + [pg.types.builtins.INT4, "text"], + // in changeToRecord (id, name, created_at, updated_at) + [pg.types.builtins.INT4, "text"], + [pg.types.builtins.TEXT, "text"], + [pg.types.builtins.TIMESTAMPTZ, "text"], + [pg.types.builtins.TIMESTAMPTZ, "text"], + ]); + }); + test("options with `include-type-oids` overwritten should not have been parsed by `types`", async () => { + const getTypeParser = jest.fn(); + const { id, keys } = await getUpdate({ + types: { getTypeParser }, + params: { "include-type-oids": "f" }, + }); + expect(keys).toEqual([String(id)]); // `numeric-data-types-as-string` still enabled + expect(getTypeParser).not.toHaveBeenCalled(); + }); + test("options with `types` set to pg-types should ignore numbers in output", async () => { + const { id, keys, data } = await getUpdate({ + types: pg.types, + params: { "numeric-data-types-as-string": "f" }, + }); + expect(keys).toEqual([id]); + expect(data.name).toEqual("jane doe"); + expect(data.created_at).toEqual(expect.any(Date)); + }); + test("options with `include-pk` and `include-types` set the change should have the respective properties", async () => { + const { change } = await getUpdate({ + params: { "include-pk": "t", "include-types": "t" }, + }); + expect(change.columntypes).toEqual([ + "integer", + "text", + "timestamp with time zone", + "timestamp with time zone", + ]); + expect(change).toHaveProperty("pk"); + expect((change as any).pk.pknames).toEqual(["id"]); + }); +}); From 49ec36f397f9c18a2a70c9e4695c4e64f36c6477 Mon Sep 17 00:00:00 2001 From: Andreas Bergmaier Date: Thu, 12 Mar 2026 05:22:29 +0100 Subject: [PATCH 6/6] test(lds): Upgrade wal2json version to 2.6 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit … and try to ignore the `software-properties-common` package that is not available on Debian 13 and later? (I have no idea what I'm doing) --- .../ci/docker-entrypoint-initdb.d/020-wal2json.sh | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci/docker-entrypoint-initdb.d/020-wal2json.sh b/.github/workflows/ci/docker-entrypoint-initdb.d/020-wal2json.sh index 8526948a7..302ba98fc 100755 --- a/.github/workflows/ci/docker-entrypoint-initdb.d/020-wal2json.sh +++ b/.github/workflows/ci/docker-entrypoint-initdb.d/020-wal2json.sh @@ -5,7 +5,7 @@ set -e # https://github.com/debezium/docker-images/blob/master/postgres/11/Dockerfile PROTOC_VERSION=1.3 -WAL2JSON_COMMIT_ID=c54d89649c3fe5e0aa79c3a87493935232e962a7 +WAL2JSON_COMMIT_ID=wal2json_2_6 USE_PGXS=1 export PGUSER=postgres export PGPASSWORD=postgres @@ -13,13 +13,14 @@ createuser --superuser root apt-get update apt-get install -f -y --no-install-recommends \ - software-properties-common \ + apt-transport-https \ + ca-certificates \ build-essential \ pkg-config \ git \ postgresql-server-dev-$PG_MAJOR -add-apt-repository "deb http://ftp.debian.org/debian testing main contrib" -apt-get update +# add-apt-repository "deb http://ftp.debian.org/debian testing main contrib" +# apt-get update rm -rf /var/lib/apt/lists/* cd /