Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,22 @@ set -e
# https://github.com/debezium/docker-images/blob/master/postgres/11/Dockerfile

PROTOC_VERSION=1.3
WAL2JSON_COMMIT_ID=c54d89649c3fe5e0aa79c3a87493935232e962a7
WAL2JSON_COMMIT_ID=wal2json_2_6
USE_PGXS=1
export PGUSER=postgres
export PGPASSWORD=postgres
createuser --superuser root

apt-get update
apt-get install -f -y --no-install-recommends \
software-properties-common \
apt-transport-https \
ca-certificates \
build-essential \
pkg-config \
git \
postgresql-server-dev-$PG_MAJOR
add-apt-repository "deb http://ftp.debian.org/debian testing main contrib"
apt-get update
# add-apt-repository "deb http://ftp.debian.org/debian testing main contrib"
# apt-get update
Comment on lines +16 to +23
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may want to not merge this. As the commit message indicates, I have no idea what I was doing; but apparently installing ca-certificates solved the TLS problem with git-cloning the wal2json repository?

Not sure what the debian testing repository is good for. If it's still needed, would simply echoing this strings into some /etc/apt/sources.list.d/???.list file work?

rm -rf /var/lib/apt/lists/*

cd /
Expand Down
6 changes: 4 additions & 2 deletions packages/lds/__tests__/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as pg from "pg";
import PgLogicalDecoding from "../src/pg-logical-decoding";
import PgLogicalDecoding, { LdsOptions } from "../src/pg-logical-decoding";

export const DATABASE_URL = process.env.LDS_TEST_DATABASE_URL || "lds_test";
export { PoolClient } from "pg";
Expand Down Expand Up @@ -46,12 +46,14 @@ export async function withLdAndClient<T = void>(
}

export async function withLd<T = void>(
callback: (ld: PgLogicalDecoding) => Promise<T>
callback: (ld: PgLogicalDecoding) => Promise<T>,
options?: LdsOptions
): Promise<T> {
const slotName = "get_ld";
const ld = new PgLogicalDecoding(DATABASE_URL, {
slotName,
temporary: true,
...options,
});
await ld.createSlot();
try {
Expand Down
9 changes: 5 additions & 4 deletions packages/lds/__tests__/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ test("gets expected data, cleans up, doesn't receive data after cleanup", async
await sub.close();
expect(mockCallback).toHaveBeenCalledTimes(4);
// Now run a new mutation, and expect the mockCallback not to have been called
await withClient(DATABASE_URL, pgClient =>
pgClient.query(
await withClient(DATABASE_URL, async pgClient => {
await pgClient.query(
"insert into app_public.foo(name) values ('temp') returning id"
)
);
);
await sleep(100);
});
expect(mockCallback).toHaveBeenCalledTimes(4);

const {
Expand Down
99 changes: 97 additions & 2 deletions packages/lds/__tests__/pg-logical-decoding.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
import PgLogicalDecoding from "../src/pg-logical-decoding";
import { tryDropSlot, DATABASE_URL, query, withLdAndClient } from "./helpers";
import * as assert from "assert";
import * as pg from "pg";
import PgLogicalDecoding, { LdsOptions } from "../src/pg-logical-decoding";
import {
tryDropSlot,
DATABASE_URL,
query,
withLdAndClient,
withLd,
} from "./helpers";

const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));

Expand Down Expand Up @@ -271,3 +279,90 @@ test("multiple notifications", () =>
const changes3 = await ld.getChanges();
expect(changes3.length).toEqual(0);
}));

describe("parse results for", () => {
async function getUpdate(options: LdsOptions) {
const {
rows: [{ id }],
} = await query(
`insert into app_public.foo(name) values ('john doe') returning id;`
);
return withLd(async ld => {
await query(
`update app_public.foo set name = 'jane doe' where id = $1;`,
[id]
);
const rows = await ld.getChanges();
const change = rows[0].data.change[0];
assert.strictEqual(change.kind, "update" as const);
return {
id,
keys: ld.changeToPk(change),
data: ld.changeToRecord(change),
change,
};
}, options);
}

test("options without `types` should contain raw wal2json output", async () => {
const { id, keys, data } = await getUpdate({});
expect(keys).toEqual([id]);
expect(data).toEqual({
id,
name: "jane doe",
created_at: expect.any(String), // .stringMatching(isoDateRegex)
updated_at: expect.any(String), // .stringMatching(isoDateRegex)
});
});
test("options with `types` set to pg-types should parse output", async () => {
const getTypeParser = jest.fn(pg.types.getTypeParser); // like jest.spyOn(pg.types, "getTypeParser") but not globally shared
const { id, keys, data } = await getUpdate({
types: { getTypeParser },
});
expect(keys).toEqual([id]);
expect(data.name).toEqual("jane doe");
expect(data.created_at).toEqual(expect.any(Date));
expect(data.updated_at).toEqual(expect.any(Date));
expect(getTypeParser).toHaveBeenCalledTimes(5);
expect(getTypeParser.mock.calls).toEqual([
// in changeToPk (id)
[pg.types.builtins.INT4, "text"],
// in changeToRecord (id, name, created_at, updated_at)
[pg.types.builtins.INT4, "text"],
[pg.types.builtins.TEXT, "text"],
[pg.types.builtins.TIMESTAMPTZ, "text"],
[pg.types.builtins.TIMESTAMPTZ, "text"],
]);
});
test("options with `include-type-oids` overwritten should not have been parsed by `types`", async () => {
const getTypeParser = jest.fn();
const { id, keys } = await getUpdate({
types: { getTypeParser },
params: { "include-type-oids": "f" },
});
expect(keys).toEqual([String(id)]); // `numeric-data-types-as-string` still enabled
expect(getTypeParser).not.toHaveBeenCalled();
});
test("options with `types` set to pg-types should ignore numbers in output", async () => {
const { id, keys, data } = await getUpdate({
types: pg.types,
params: { "numeric-data-types-as-string": "f" },
});
expect(keys).toEqual([id]);
expect(data.name).toEqual("jane doe");
expect(data.created_at).toEqual(expect.any(Date));
});
test("options with `include-pk` and `include-types` set the change should have the respective properties", async () => {
const { change } = await getUpdate({
params: { "include-pk": "t", "include-types": "t" },
});
expect(change.columntypes).toEqual([
"integer",
"text",
"timestamp with time zone",
"timestamp with time zone",
]);
expect(change).toHaveProperty("pk");
expect((change as any).pk.pknames).toEqual(["id"]);
});
});
39 changes: 13 additions & 26 deletions packages/lds/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
/* eslint-disable no-console,curly */
import PgLogicalDecoding, {
changeToRecord,
changeToPk,
} from "./pg-logical-decoding";
import PgLogicalDecoding, { LdsOptions } from "./pg-logical-decoding";
import FatalError from "./fatal-error";

export interface Options {
slotName?: string;
tablePattern?: string;
export interface Options extends LdsOptions {
/** Number of milliseconds between polls. Defaults to `200`. */
sleepDuration?: number;
temporary?: boolean;
}

const DROP_STALE_SLOTS_INTERVAL = 15 * 60 * 1000;
Expand Down Expand Up @@ -57,18 +52,9 @@ export default async function subscribeToLogicalDecoding(
callback: AnnounceCallback,
options: Options = {}
): Promise<LDSubscription> {
const {
slotName = "postgraphile",
tablePattern = "*.*",
sleepDuration = 200,
temporary = false,
} = options;
const { sleepDuration = 200 } = options;
let lastLsn: string | null = null;
const client = new PgLogicalDecoding(connectionString, {
tablePattern,
slotName,
temporary,
});
const client = new PgLogicalDecoding(connectionString, options);

// We must do this before we create the temporary slot, since errors will release a temporary slot immediately
await client.dropStaleSlots();
Expand All @@ -81,7 +67,7 @@ export default async function subscribeToLogicalDecoding(
} else if (e.code === "42710") {
// Slot already exists; ignore.
} else if (e.code === "42602") {
throw new FatalError(`Invalid slot name '${slotName}'?`, e);
throw new FatalError(`Invalid slot name '${client.slotName}'?`, e);
} else {
console.error(
"An unhandled error occurred when attempting to create the replication slot:"
Expand Down Expand Up @@ -118,31 +104,32 @@ export default async function subscribeToLogicalDecoding(
_: "insertC",
schema,
table,
data: changeToRecord(change),
data: client.changeToRecord(change),
};
callback(announcement);
} else if (change.kind === "update") {
const data = client.changeToRecord(change);
const rowAnnouncement: UpdateRowAnnouncement = {
_: "update",
schema,
table,
keys: changeToPk(change),
data: changeToRecord(change),
keys: client.changeToPk(change),
data,
};
callback(rowAnnouncement);
const collectionAnnouncement: UpdateCollectionAnnouncement = {
_: "updateC",
schema,
table,
data: changeToRecord(change),
data,
};
callback(collectionAnnouncement);
} else if (change.kind === "delete") {
const announcement: DeleteRowAnnouncement = {
_: "delete",
schema,
table,
keys: changeToPk(change),
keys: client.changeToPk(change),
};
callback(announcement);
} else {
Expand All @@ -151,7 +138,7 @@ export default async function subscribeToLogicalDecoding(
}
}
}
if (!temporary && nextStaleCheck < Date.now()) {
if (!client.temporary && nextStaleCheck < Date.now()) {
// Roughly every 15 minutes, drop stale slots.
nextStaleCheck = Date.now() + DROP_STALE_SLOTS_INTERVAL;
client.dropStaleSlots().catch(e => {
Expand Down
Loading
Loading