diff --git a/.github/workflows/ci/docker-entrypoint-initdb.d/020-wal2json.sh b/.github/workflows/ci/docker-entrypoint-initdb.d/020-wal2json.sh index 8526948a7..302ba98fc 100755 --- a/.github/workflows/ci/docker-entrypoint-initdb.d/020-wal2json.sh +++ b/.github/workflows/ci/docker-entrypoint-initdb.d/020-wal2json.sh @@ -5,7 +5,7 @@ set -e # https://github.com/debezium/docker-images/blob/master/postgres/11/Dockerfile PROTOC_VERSION=1.3 -WAL2JSON_COMMIT_ID=c54d89649c3fe5e0aa79c3a87493935232e962a7 +WAL2JSON_COMMIT_ID=wal2json_2_6 USE_PGXS=1 export PGUSER=postgres export PGPASSWORD=postgres @@ -13,13 +13,14 @@ createuser --superuser root apt-get update apt-get install -f -y --no-install-recommends \ - software-properties-common \ + apt-transport-https \ + ca-certificates \ build-essential \ pkg-config \ git \ postgresql-server-dev-$PG_MAJOR -add-apt-repository "deb http://ftp.debian.org/debian testing main contrib" -apt-get update +# add-apt-repository "deb http://ftp.debian.org/debian testing main contrib" +# apt-get update rm -rf /var/lib/apt/lists/* cd / diff --git a/packages/lds/__tests__/helpers.ts b/packages/lds/__tests__/helpers.ts index c659cc0e6..af786ec85 100644 --- a/packages/lds/__tests__/helpers.ts +++ b/packages/lds/__tests__/helpers.ts @@ -1,5 +1,5 @@ import * as pg from "pg"; -import PgLogicalDecoding from "../src/pg-logical-decoding"; +import PgLogicalDecoding, { LdsOptions } from "../src/pg-logical-decoding"; export const DATABASE_URL = process.env.LDS_TEST_DATABASE_URL || "lds_test"; export { PoolClient } from "pg"; @@ -46,12 +46,14 @@ export async function withLdAndClient( } export async function withLd( - callback: (ld: PgLogicalDecoding) => Promise + callback: (ld: PgLogicalDecoding) => Promise, + options?: LdsOptions ): Promise { const slotName = "get_ld"; const ld = new PgLogicalDecoding(DATABASE_URL, { slotName, temporary: true, + ...options, }); await ld.createSlot(); try { diff --git a/packages/lds/__tests__/index.test.ts b/packages/lds/__tests__/index.test.ts index 101ed660e..716d84a14 100644 --- a/packages/lds/__tests__/index.test.ts +++ b/packages/lds/__tests__/index.test.ts @@ -28,11 +28,12 @@ test("gets expected data, cleans up, doesn't receive data after cleanup", async await sub.close(); expect(mockCallback).toHaveBeenCalledTimes(4); // Now run a new mutation, and expect the mockCallback not to have been called - await withClient(DATABASE_URL, pgClient => - pgClient.query( + await withClient(DATABASE_URL, async pgClient => { + await pgClient.query( "insert into app_public.foo(name) values ('temp') returning id" - ) - ); + ); + await sleep(100); + }); expect(mockCallback).toHaveBeenCalledTimes(4); const { diff --git a/packages/lds/__tests__/pg-logical-decoding.test.ts b/packages/lds/__tests__/pg-logical-decoding.test.ts index 4960b7f74..3968c221f 100644 --- a/packages/lds/__tests__/pg-logical-decoding.test.ts +++ b/packages/lds/__tests__/pg-logical-decoding.test.ts @@ -1,5 +1,13 @@ -import PgLogicalDecoding from "../src/pg-logical-decoding"; -import { tryDropSlot, DATABASE_URL, query, withLdAndClient } from "./helpers"; +import * as assert from "assert"; +import * as pg from "pg"; +import PgLogicalDecoding, { LdsOptions } from "../src/pg-logical-decoding"; +import { + tryDropSlot, + DATABASE_URL, + query, + withLdAndClient, + withLd, +} from "./helpers"; const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); @@ -271,3 +279,90 @@ test("multiple notifications", () => const changes3 = await ld.getChanges(); expect(changes3.length).toEqual(0); })); + +describe("parse results for", () => { + async function getUpdate(options: LdsOptions) { + const { + rows: [{ id }], + } = await query( + `insert into app_public.foo(name) values ('john doe') returning id;` + ); + return withLd(async ld => { + await query( + `update app_public.foo set name = 'jane doe' where id = $1;`, + [id] + ); + const rows = await ld.getChanges(); + const change = rows[0].data.change[0]; + assert.strictEqual(change.kind, "update" as const); + return { + id, + keys: ld.changeToPk(change), + data: ld.changeToRecord(change), + change, + }; + }, options); + } + + test("options without `types` should contain raw wal2json output", async () => { + const { id, keys, data } = await getUpdate({}); + expect(keys).toEqual([id]); + expect(data).toEqual({ + id, + name: "jane doe", + created_at: expect.any(String), // .stringMatching(isoDateRegex) + updated_at: expect.any(String), // .stringMatching(isoDateRegex) + }); + }); + test("options with `types` set to pg-types should parse output", async () => { + const getTypeParser = jest.fn(pg.types.getTypeParser); // like jest.spyOn(pg.types, "getTypeParser") but not globally shared + const { id, keys, data } = await getUpdate({ + types: { getTypeParser }, + }); + expect(keys).toEqual([id]); + expect(data.name).toEqual("jane doe"); + expect(data.created_at).toEqual(expect.any(Date)); + expect(data.updated_at).toEqual(expect.any(Date)); + expect(getTypeParser).toHaveBeenCalledTimes(5); + expect(getTypeParser.mock.calls).toEqual([ + // in changeToPk (id) + [pg.types.builtins.INT4, "text"], + // in changeToRecord (id, name, created_at, updated_at) + [pg.types.builtins.INT4, "text"], + [pg.types.builtins.TEXT, "text"], + [pg.types.builtins.TIMESTAMPTZ, "text"], + [pg.types.builtins.TIMESTAMPTZ, "text"], + ]); + }); + test("options with `include-type-oids` overwritten should not have been parsed by `types`", async () => { + const getTypeParser = jest.fn(); + const { id, keys } = await getUpdate({ + types: { getTypeParser }, + params: { "include-type-oids": "f" }, + }); + expect(keys).toEqual([String(id)]); // `numeric-data-types-as-string` still enabled + expect(getTypeParser).not.toHaveBeenCalled(); + }); + test("options with `types` set to pg-types should ignore numbers in output", async () => { + const { id, keys, data } = await getUpdate({ + types: pg.types, + params: { "numeric-data-types-as-string": "f" }, + }); + expect(keys).toEqual([id]); + expect(data.name).toEqual("jane doe"); + expect(data.created_at).toEqual(expect.any(Date)); + }); + test("options with `include-pk` and `include-types` set the change should have the respective properties", async () => { + const { change } = await getUpdate({ + params: { "include-pk": "t", "include-types": "t" }, + }); + expect(change.columntypes).toEqual([ + "integer", + "text", + "timestamp with time zone", + "timestamp with time zone", + ]); + expect(change).toHaveProperty("pk"); + expect((change as any).pk.pknames).toEqual(["id"]); + }); +}); diff --git a/packages/lds/src/index.ts b/packages/lds/src/index.ts index 4e963da2d..795fa1d05 100755 --- a/packages/lds/src/index.ts +++ b/packages/lds/src/index.ts @@ -1,15 +1,10 @@ /* eslint-disable no-console,curly */ -import PgLogicalDecoding, { - changeToRecord, - changeToPk, -} from "./pg-logical-decoding"; +import PgLogicalDecoding, { LdsOptions } from "./pg-logical-decoding"; import FatalError from "./fatal-error"; -export interface Options { - slotName?: string; - tablePattern?: string; +export interface Options extends LdsOptions { + /** Number of milliseconds between polls. Defaults to `200`. */ sleepDuration?: number; - temporary?: boolean; } const DROP_STALE_SLOTS_INTERVAL = 15 * 60 * 1000; @@ -57,18 +52,9 @@ export default async function subscribeToLogicalDecoding( callback: AnnounceCallback, options: Options = {} ): Promise { - const { - slotName = "postgraphile", - tablePattern = "*.*", - sleepDuration = 200, - temporary = false, - } = options; + const { sleepDuration = 200 } = options; let lastLsn: string | null = null; - const client = new PgLogicalDecoding(connectionString, { - tablePattern, - slotName, - temporary, - }); + const client = new PgLogicalDecoding(connectionString, options); // We must do this before we create the temporary slot, since errors will release a temporary slot immediately await client.dropStaleSlots(); @@ -81,7 +67,7 @@ export default async function subscribeToLogicalDecoding( } else if (e.code === "42710") { // Slot already exists; ignore. } else if (e.code === "42602") { - throw new FatalError(`Invalid slot name '${slotName}'?`, e); + throw new FatalError(`Invalid slot name '${client.slotName}'?`, e); } else { console.error( "An unhandled error occurred when attempting to create the replication slot:" @@ -118,23 +104,24 @@ export default async function subscribeToLogicalDecoding( _: "insertC", schema, table, - data: changeToRecord(change), + data: client.changeToRecord(change), }; callback(announcement); } else if (change.kind === "update") { + const data = client.changeToRecord(change); const rowAnnouncement: UpdateRowAnnouncement = { _: "update", schema, table, - keys: changeToPk(change), - data: changeToRecord(change), + keys: client.changeToPk(change), + data, }; callback(rowAnnouncement); const collectionAnnouncement: UpdateCollectionAnnouncement = { _: "updateC", schema, table, - data: changeToRecord(change), + data, }; callback(collectionAnnouncement); } else if (change.kind === "delete") { @@ -142,7 +129,7 @@ export default async function subscribeToLogicalDecoding( _: "delete", schema, table, - keys: changeToPk(change), + keys: client.changeToPk(change), }; callback(announcement); } else { @@ -151,7 +138,7 @@ export default async function subscribeToLogicalDecoding( } } } - if (!temporary && nextStaleCheck < Date.now()) { + if (!client.temporary && nextStaleCheck < Date.now()) { // Roughly every 15 minutes, drop stale slots. nextStaleCheck = Date.now() + DROP_STALE_SLOTS_INTERVAL; client.dropStaleSlots().catch(e => { diff --git a/packages/lds/src/pg-logical-decoding.ts b/packages/lds/src/pg-logical-decoding.ts index 62f5dc8cf..5e7fa0fa2 100644 --- a/packages/lds/src/pg-logical-decoding.ts +++ b/packages/lds/src/pg-logical-decoding.ts @@ -16,7 +16,8 @@ declare module "pg" { */ interface Keys { keynames: Array; - keytypes: Array; + keytypes?: Array; // with `include-types` option (default true) + keytypeoids?: Array; // with `include-type-oids` option (default false) keyvalues: Array; } @@ -34,7 +35,8 @@ export interface InsertChange extends Change { // https://github.com/eulerto/wal2json/blob/f81bf7af09324da656be87dfd53d20741c01e1e0/wal2json.c#L969 columnnames: Array; - columntypes: Array; + columntypes?: Array; // with `include-types` option (default true) + columntypeoids?: Array; // with `include-type-oids` option (default false) columnvalues: Array; } @@ -43,7 +45,8 @@ export interface UpdateChange extends Change { // https://github.com/eulerto/wal2json/blob/f81bf7af09324da656be87dfd53d20741c01e1e0/wal2json.c#L973 columnnames: Array; - columntypes: Array; + columntypes?: Array; // with `include-types` option (default true) + columntypeoids?: Array; // with `include-type-oids` option (default false) columnvalues: Array; // https://github.com/eulerto/wal2json/blob/f81bf7af09324da656be87dfd53d20741c01e1e0/wal2json.c#L992-L1003 @@ -57,18 +60,6 @@ export interface DeleteChange extends Change { oldkeys: Keys; } -export const changeToRecord = (change: InsertChange | UpdateChange) => { - const { columnnames, columnvalues } = change; - return columnnames.reduce((memo, name, i) => { - memo[name] = columnvalues[i]; - return memo; - }, {}); -}; - -export const changeToPk = (change: UpdateChange | DeleteChange) => { - return change.oldkeys.keyvalues; -}; - interface Payload { lsn: string; data: { @@ -81,37 +72,67 @@ const toLsnData = ([lsn, data]: [string, string]): Payload => ({ data: JSON.parse(data), }); -interface Options { +export interface LdsOptions { + /** The 'add-tables' wal2json parameter. Defaults to `*.*`. */ tablePattern?: string; + /** The [replication slot](https://www.postgresql.org/docs/current/logicaldecoding-explanation.html#LOGICALDECODING-REPLICATION-SLOTS) identifier. Defaults to `postgraphile`. */ slotName?: string; + /** Whether `.createSlot()` should create a temporary replication slot which will be limited to the `client` session and gets cleaned up automatically. Defaults to `false`. */ temporary?: boolean; + /** (Custom) [type parsers](https://node-postgres.com/features/queries#types) to deserialise the wal2json column string values. Pass `pg.types` to get the default type parsing. Defaults to `undefined`, that is raw values will get emitted. */ + types?: pg.CustomTypesConfig; + /** Extra [parameters to be passed to wal2json](https://github.com/eulerto/wal2json?tab=readme-ov-file#parameters). Use e.g. `{'numeric-data-types-as-string', 't'}` to make the type parsers apply to numeric values. */ + params?: Partial>; } export default class PgLogicalDecoding extends EventEmitter { public readonly slotName: string; public readonly temporary: boolean; - private connectionString: string; - private tablePattern: string; + private readonly getChangesQueryText: string; + private readonly parse: (value: any, typeOid: number) => any; private pool: pg.Pool | null; private client: Promise | null; - constructor(connectionString: string, options?: Options) { + constructor(connectionString: string, options?: LdsOptions) { super(); - this.connectionString = connectionString; const { tablePattern = "*.*", slotName = "postgraphile", temporary = false, + types, + params, } = options || {}; - this.tablePattern = tablePattern; this.slotName = slotName; this.temporary = temporary; + const parametersSql = Object.entries({ + "add-tables": tablePattern != "*.*" ? tablePattern : null, + "include-types": "f", // type names are unnecessary + "include-type-oids": types ? "t" : null, + "numeric-data-types-as-string": types ? "t" : null, + ...params, + }) + .flatMap(entry => (typeof entry[1] == "string" ? entry : [])) + .map(pg.Client.prototype.escapeLiteral) + .join(", "); + this.getChangesQueryText = `SELECT lsn, data FROM pg_catalog.pg_logical_slot_get_changes($1, $2, $3, ${parametersSql})`; + this.parse = types + ? (value: any, typeOid: number) => { + if (value === null) return null; + // wal2json always outputs `bool`s as boolean + if (typeof value === "boolean") return value; // assert: typeOid === pg.types.builtins.BOOL + // wal2json outputs numeric data as numbers, unless `numeric-data-types-as-string` is set + if (typeof value === "number") return value; + const parser = types.getTypeParser(typeOid, "text"); + return parser(value); + } + : (value, _) => value; // We just use the pool to get better error handling this.pool = new pg.Pool({ - connectionString: this.connectionString, + connectionString, max: 1, }); this.pool.on("error", this.onPoolError); + this.client = null; } public async dropStaleSlots() { @@ -172,9 +193,9 @@ export default class PgLogicalDecoding extends EventEmitter { const client = await this.getClient(); await this.trackSelf(client); try { - const { rows } = await client.query({ - text: `SELECT lsn, data FROM pg_catalog.pg_logical_slot_get_changes($1, $2, $3, 'add-tables', $4::text)`, - values: [this.slotName, uptoLsn, uptoNchanges, this.tablePattern], + const { rows } = await client.query<[lsn: string, data: string]>({ + text: this.getChangesQueryText, + values: [this.slotName, uptoLsn, uptoNchanges], rowMode: "array", }); return rows.map(toLsnData); @@ -192,6 +213,31 @@ export default class PgLogicalDecoding extends EventEmitter { } } + public changeToRecord( + change: InsertChange | UpdateChange + ): Record { + const { columnnames, columnvalues, columntypeoids } = change; + return columnnames.reduce>( + columntypeoids + ? (memo, name, i) => { + memo[name] = this.parse(columnvalues[i], columntypeoids[i]); + return memo; + } + : (memo, name, i) => { + memo[name] = columnvalues[i]; + return memo; + }, + {} + ); + } + + public changeToPk(change: UpdateChange | DeleteChange): any[] { + const { keyvalues, keytypeoids } = change.oldkeys; + return keytypeoids + ? keyvalues.map((value, i) => this.parse(value, keytypeoids[i])) + : keyvalues; + } + public async close() { if (!this.temporary) { const client = await this.getClient();