From 585b1cd4a4a5f72f5cc94be216baff5cd258a8f0 Mon Sep 17 00:00:00 2001 From: Florian Brombauer Date: Mon, 2 Mar 2026 14:32:51 +0100 Subject: [PATCH 1/3] fix: pending requests should be resolved when heartbeat eviction removes a dead node Reference: https://github.com/socketio/socket.io-mongo-adapter/issues/27 --- lib/index.ts | 31 ++++++++++++- test/index.ts | 126 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 156 insertions(+), 1 deletion(-) diff --git a/lib/index.ts b/lib/index.ts index 34a1dc0..e610a13 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -67,6 +67,7 @@ interface Request { expected: number; current: number; responses: any[]; + expectedNodes: Set; } interface AckRequest { @@ -485,6 +486,7 @@ export class MongoAdapter extends Adapter { this.publish({ type: EventType.HEARTBEAT, }).catch(onPublishError); + this.evictDeadNodes(); this.scheduleHeartbeat(); }, this.heartbeatInterval); } @@ -620,14 +622,39 @@ export class MongoAdapter extends Adapter { } public serverCount(): Promise { + this.evictDeadNodes(); + return Promise.resolve(1 + this.nodesMap.size); + } + + private evictDeadNodes(): void { this.nodesMap.forEach((lastSeen, uid) => { const nodeSeemsDown = Date.now() - lastSeen > this.heartbeatTimeout; if (nodeSeemsDown) { debug("node %s seems down", uid); this.nodesMap.delete(uid); + this.resolveRequestsForDeadNode(uid); + } + }); + } + + private resolveRequestsForDeadNode(uid: string): void { + this.requests.forEach((request, requestId) => { + if (!request.expectedNodes.has(uid)) { + return; + } + request.expectedNodes.delete(uid); + request.expected--; + + if (request.current >= request.expected) { + clearTimeout(request.timeout); + if (request.type === EventType.SERVER_SIDE_EMIT) { + request.resolve(null, request.responses); + } else { + request.resolve(request.responses); + } + this.requests.delete(requestId); } }); - return Promise.resolve(1 + this.nodesMap.size); } addSockets(opts: BroadcastOptions, rooms: Room[]) { @@ -711,6 +738,7 @@ export class MongoAdapter extends Adapter { current: 0, expected: expectedResponseCount, responses: localSockets, + expectedNodes: new Set(this.nodesMap.keys()), }; this.requests.set(requestId, storedRequest); @@ -777,6 +805,7 @@ export class MongoAdapter extends Adapter { current: 0, expected: expectedResponseCount, responses: [], + expectedNodes: new Set(this.nodesMap.keys()), }; this.requests.set(requestId, storedRequest); diff --git a/test/index.ts b/test/index.ts index 304c4a1..3546a51 100644 --- a/test/index.ts +++ b/test/index.ts @@ -476,3 +476,129 @@ describe("@socket.io/mongodb-adapter", () => { import("./connection-state-recovery"); }); + +describe("dead node eviction", function () { + let servers: Server[], + serverSockets: ServerSocket[], + clientSockets: ClientSocket[], + mongoClient: MongoClient; + + beforeEach(async function () { + servers = []; + serverSockets = []; + clientSockets = []; + + mongoClient = new MongoClient( + "mongodb://localhost:27017/?replicaSet=rs0&directConnection=true", + ); + await mongoClient.connect(); + + const collection = mongoClient.db("test").collection("events-eviction"); + + return new Promise((resolve) => { + for (let i = 1; i <= NODES_COUNT; i++) { + const httpServer = createServer(); + const io = new Server(httpServer); + io.adapter( + createAdapter(collection, { + heartbeatInterval: 100, + heartbeatTimeout: 200, + requestsTimeout: 2000, + }), + ); + httpServer.listen(() => { + const port = (httpServer.address() as AddressInfo).port; + const clientSocket = ioc(`http://localhost:${port}`); + + io.on("connection", async (socket) => { + clientSockets.push(clientSocket); + serverSockets.push(socket); + servers.push(io); + if (servers.length === NODES_COUNT) { + servers[0].emit("ping"); + servers[1].emit("ping"); + servers[2].emit("ping"); + + await sleep(500); + + resolve(); + } + }); + }); + } + }); + }); + + afterEach(async function () { + servers.forEach((server) => server.close()); + clientSockets.forEach((socket) => socket.disconnect()); + await mongoClient.close(); + }); + + it("resolves pending fetchSockets when a dead node is evicted", async function () { + this.timeout(5000); + + // Close server 2 — its adapter stops sending heartbeats + servers[2].close(); + clientSockets[2].disconnect(); + + // Immediately call fetchSockets on server 0. + // At this point server 2 is still in the nodesMap (heartbeat not expired yet), + // so expected=2 (servers 1 and 2). Server 1 responds quickly (current=1). + // Without the fix: waits the full requestsTimeout (2000ms) and rejects. + // With the fix: after heartbeatTimeout (~200ms) server 2 is evicted by + // evictDeadNodes, resolveRequestsForDeadNode decrements expected to 1, + // current(1) >= expected(1) → resolves early. + const start = Date.now(); + const sockets = await servers[0].fetchSockets(); + const elapsed = Date.now() - start; + + // Should resolve well before the 2000ms requestsTimeout + expect(elapsed).to.be.below(1500); + // Sockets from server 0 (local) and server 1 (remote) + expect(sockets).to.have.length(2); + // @ts-ignore + expect(servers[0].of("/").adapter.requests.size).to.eql(0); + }); + + it("resolves pending serverSideEmit when a dead node is evicted", function (done) { + this.timeout(5000); + + servers[2].close(); + clientSockets[2].disconnect(); + + servers[1].on("hello", (cb) => { + cb(2); + }); + + const start = Date.now(); + servers[0].serverSideEmit("hello", (err: Error, response: any) => { + const elapsed = Date.now() - start; + + expect(err).to.be(null); + expect(response).to.be.an(Array); + expect(response).to.contain(2); + expect(elapsed).to.be.below(1500); + done(); + }); + }); + + it("fetchSockets succeeds immediately after dead node is already evicted", async function () { + this.timeout(5000); + + servers[2].close(); + clientSockets[2].disconnect(); + + // Wait for heartbeatTimeout so the dead node is evicted from nodesMap + await sleep(500); + + // Now fetchSockets should succeed immediately — serverCount() no longer + // includes the dead node, so expected=1 (only server 1). + const start = Date.now(); + const sockets = await servers[0].fetchSockets(); + const elapsed = Date.now() - start; + + expect(elapsed).to.be.below(500); + expect(sockets).to.have.length(2); + }); +}); From 0fd5cde16cd5c136fb2ef158853a768f2a0d1781 Mon Sep 17 00:00:00 2001 From: Florian Brombauer Date: Mon, 2 Mar 2026 14:35:47 +0100 Subject: [PATCH 2/3] fix: wrong type in storedRequest of serverSideEmitWithAck --- lib/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/index.ts b/lib/index.ts index e610a13..83f0a51 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -799,7 +799,7 @@ export class MongoAdapter extends Adapter { }, this.requestsTimeout); const storedRequest = { - type: EventType.FETCH_SOCKETS, + type: EventType.SERVER_SIDE_EMIT, resolve: ack, timeout, current: 0, From a55b4a696fc82a2b69b630560bca195d04ac3bda Mon Sep 17 00:00:00 2001 From: Florian Brombauer Date: Wed, 25 Mar 2026 13:34:14 +0100 Subject: [PATCH 3/3] fix: add missing catch in fetchSockets() --- lib/index.ts | 48 ++++++++++++++++++++++++------------------------ 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/lib/index.ts b/lib/index.ts index 83f0a51..17bd3d7 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -160,7 +160,7 @@ function onPublishError(err: Error) { */ export function createAdapter( mongoCollection: Collection, - opts: Partial = {} + opts: Partial = {}, ) { opts.uid = opts.uid || randomId(); @@ -184,7 +184,7 @@ export function createAdapter( }, }, ], - changeStreamOpts + changeStreamOpts, ); changeStream.on("change", (event: any) => { @@ -269,7 +269,7 @@ export class MongoAdapter extends Adapter { constructor( nsp: any, mongoCollection: Collection, - opts: Partial = {} + opts: Partial = {}, ) { super(nsp); this.mongoCollection = mongoCollection; @@ -298,7 +298,7 @@ export class MongoAdapter extends Adapter { "new event of type %d for %s from %s", document.type, document.nsp, - document.uid + document.uid, ); if (document.uid && document.uid !== EMITTER_UID) { @@ -339,7 +339,7 @@ export class MongoAdapter extends Adapter { packet: arg, }, }); - } + }, ); } else { const packet = replaceBinaryObjectsByBuffers(document.data.packet); @@ -361,7 +361,7 @@ export class MongoAdapter extends Adapter { case EventType.BROADCAST_ACK: { const request = this.ackRequests.get(document.data.requestId); const clientResponse = replaceBinaryObjectsByBuffers( - document.data.packet + document.data.packet, ); request?.ack(clientResponse); break; @@ -371,7 +371,7 @@ export class MongoAdapter extends Adapter { debug("calling addSockets with opts %j", document.data.opts); super.addSockets( MongoAdapter.deserializeOptions(document.data.opts), - document.data.rooms + document.data.rooms, ); break; } @@ -379,7 +379,7 @@ export class MongoAdapter extends Adapter { debug("calling delSockets with opts %j", document.data.opts); super.delSockets( MongoAdapter.deserializeOptions(document.data.opts), - document.data.rooms + document.data.rooms, ); break; } @@ -387,14 +387,14 @@ export class MongoAdapter extends Adapter { debug("calling disconnectSockets with opts %j", document.data.opts); super.disconnectSockets( MongoAdapter.deserializeOptions(document.data.opts), - document.data.close + document.data.close, ); break; } case EventType.FETCH_SOCKETS: { debug("calling fetchSockets with opts %j", document.data.opts); const localSockets = await super.fetchSockets( - MongoAdapter.deserializeOptions(document.data.opts) + MongoAdapter.deserializeOptions(document.data.opts), ); this.publish({ @@ -420,7 +420,7 @@ export class MongoAdapter extends Adapter { request.current++; document.data.sockets.forEach((socket: any) => - request.responses.push(socket) + request.responses.push(socket), ); if (request.current === request.expected) { @@ -567,7 +567,7 @@ export class MongoAdapter extends Adapter { private addOffsetIfNecessary( packet: any, opts: BroadcastOptions, - offset: string + offset: string, ) { if (!this.nsp.server.opts.connectionStateRecovery) { return; @@ -587,7 +587,7 @@ export class MongoAdapter extends Adapter { packet: any, opts: BroadcastOptions, clientCountCallback: (clientCount: number) => void, - ack: (...args: any[]) => void + ack: (...args: any[]) => void, ) { const onlyLocal = opts?.flags?.local; if (!onlyLocal) { @@ -718,14 +718,14 @@ export class MongoAdapter extends Adapter { const requestId = randomId(); - return new Promise((resolve, reject) => { + return new Promise(async (resolve, reject) => { const timeout = setTimeout(() => { const storedRequest = this.requests.get(requestId); if (storedRequest) { reject( new Error( - `timeout reached: only ${storedRequest.current} responses received out of ${storedRequest.expected}` - ) + `timeout reached: only ${storedRequest.current} responses received out of ${storedRequest.expected}`, + ), ); this.requests.delete(requestId); } @@ -742,13 +742,13 @@ export class MongoAdapter extends Adapter { }; this.requests.set(requestId, storedRequest); - this.publish({ + await this.publish({ type: EventType.FETCH_SOCKETS, data: { opts: MongoAdapter.serializeOptions(opts), requestId, }, - }); + }).catch(onPublishError); }); } @@ -776,7 +776,7 @@ export class MongoAdapter extends Adapter { debug( 'waiting for %d responses to "serverSideEmit" request', - expectedResponseCount + expectedResponseCount, ); if (expectedResponseCount <= 0) { @@ -790,9 +790,9 @@ export class MongoAdapter extends Adapter { if (storedRequest) { ack( new Error( - `timeout reached: only ${storedRequest.current} responses received out of ${storedRequest.expected}` + `timeout reached: only ${storedRequest.current} responses received out of ${storedRequest.expected}`, ), - storedRequest.responses + storedRequest.responses, ); this.requests.delete(requestId); } @@ -828,7 +828,7 @@ export class MongoAdapter extends Adapter { override async restoreSession( pid: PrivateSessionId, - offset: string + offset: string, ): Promise { if (!ObjectId.isValid(offset)) { return Promise.reject("invalid offset"); @@ -919,7 +919,7 @@ export class MongoAdapter extends Adapter { } private findSession( - pid: PrivateSessionId + pid: PrivateSessionId, ): Promise | undefined> { const isCollectionCapped = !this.addCreatedAtField; if (isCollectionCapped) { @@ -933,7 +933,7 @@ export class MongoAdapter extends Adapter { sort: { _id: -1, }, - } + }, ) .then((result) => { if (!result) {