Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 55 additions & 26 deletions lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ interface Request {
expected: number;
current: number;
responses: any[];
expectedNodes: Set<string>;
}

interface AckRequest {
Expand Down Expand Up @@ -159,7 +160,7 @@ function onPublishError(err: Error) {
*/
export function createAdapter(
mongoCollection: Collection,
opts: Partial<MongoAdapterOptions> = {}
opts: Partial<MongoAdapterOptions> = {},
) {
opts.uid = opts.uid || randomId();

Expand All @@ -183,7 +184,7 @@ export function createAdapter(
},
},
],
changeStreamOpts
changeStreamOpts,
);

changeStream.on("change", (event: any) => {
Expand Down Expand Up @@ -268,7 +269,7 @@ export class MongoAdapter extends Adapter {
constructor(
nsp: any,
mongoCollection: Collection,
opts: Partial<MongoAdapterOptions> = {}
opts: Partial<MongoAdapterOptions> = {},
) {
super(nsp);
this.mongoCollection = mongoCollection;
Expand Down Expand Up @@ -297,7 +298,7 @@ export class MongoAdapter extends Adapter {
"new event of type %d for %s from %s",
document.type,
document.nsp,
document.uid
document.uid,
);

if (document.uid && document.uid !== EMITTER_UID) {
Expand Down Expand Up @@ -338,7 +339,7 @@ export class MongoAdapter extends Adapter {
packet: arg,
},
});
}
},
);
} else {
const packet = replaceBinaryObjectsByBuffers(document.data.packet);
Expand All @@ -360,7 +361,7 @@ export class MongoAdapter extends Adapter {
case EventType.BROADCAST_ACK: {
const request = this.ackRequests.get(document.data.requestId);
const clientResponse = replaceBinaryObjectsByBuffers(
document.data.packet
document.data.packet,
);
request?.ack(clientResponse);
break;
Expand All @@ -370,30 +371,30 @@ export class MongoAdapter extends Adapter {
debug("calling addSockets with opts %j", document.data.opts);
super.addSockets(
MongoAdapter.deserializeOptions(document.data.opts),
document.data.rooms
document.data.rooms,
);
break;
}
case EventType.SOCKETS_LEAVE: {
debug("calling delSockets with opts %j", document.data.opts);
super.delSockets(
MongoAdapter.deserializeOptions(document.data.opts),
document.data.rooms
document.data.rooms,
);
break;
}
case EventType.DISCONNECT_SOCKETS: {
debug("calling disconnectSockets with opts %j", document.data.opts);
super.disconnectSockets(
MongoAdapter.deserializeOptions(document.data.opts),
document.data.close
document.data.close,
);
break;
}
case EventType.FETCH_SOCKETS: {
debug("calling fetchSockets with opts %j", document.data.opts);
const localSockets = await super.fetchSockets(
MongoAdapter.deserializeOptions(document.data.opts)
MongoAdapter.deserializeOptions(document.data.opts),
);

this.publish({
Expand All @@ -419,7 +420,7 @@ export class MongoAdapter extends Adapter {

request.current++;
document.data.sockets.forEach((socket: any) =>
request.responses.push(socket)
request.responses.push(socket),
);

if (request.current === request.expected) {
Expand Down Expand Up @@ -485,6 +486,7 @@ export class MongoAdapter extends Adapter {
this.publish({
type: EventType.HEARTBEAT,
}).catch(onPublishError);
this.evictDeadNodes();
this.scheduleHeartbeat();
}, this.heartbeatInterval);
}
Expand Down Expand Up @@ -565,7 +567,7 @@ export class MongoAdapter extends Adapter {
private addOffsetIfNecessary(
packet: any,
opts: BroadcastOptions,
offset: string
offset: string,
) {
if (!this.nsp.server.opts.connectionStateRecovery) {
return;
Expand All @@ -585,7 +587,7 @@ export class MongoAdapter extends Adapter {
packet: any,
opts: BroadcastOptions,
clientCountCallback: (clientCount: number) => void,
ack: (...args: any[]) => void
ack: (...args: any[]) => void,
) {
const onlyLocal = opts?.flags?.local;
if (!onlyLocal) {
Expand Down Expand Up @@ -620,14 +622,39 @@ export class MongoAdapter extends Adapter {
}

public serverCount(): Promise<number> {
this.evictDeadNodes();
return Promise.resolve(1 + this.nodesMap.size);
}

private evictDeadNodes(): void {
this.nodesMap.forEach((lastSeen, uid) => {
const nodeSeemsDown = Date.now() - lastSeen > this.heartbeatTimeout;
if (nodeSeemsDown) {
debug("node %s seems down", uid);
this.nodesMap.delete(uid);
this.resolveRequestsForDeadNode(uid);
}
});
}

private resolveRequestsForDeadNode(uid: string): void {
this.requests.forEach((request, requestId) => {
if (!request.expectedNodes.has(uid)) {
return;
}
request.expectedNodes.delete(uid);
request.expected--;

if (request.current >= request.expected) {
clearTimeout(request.timeout);
if (request.type === EventType.SERVER_SIDE_EMIT) {
request.resolve(null, request.responses);
} else {
request.resolve(request.responses);
}
this.requests.delete(requestId);
}
});
return Promise.resolve(1 + this.nodesMap.size);
}

addSockets(opts: BroadcastOptions, rooms: Room[]) {
Expand Down Expand Up @@ -691,14 +718,14 @@ export class MongoAdapter extends Adapter {

const requestId = randomId();

return new Promise((resolve, reject) => {
return new Promise(async (resolve, reject) => {
const timeout = setTimeout(() => {
const storedRequest = this.requests.get(requestId);
if (storedRequest) {
reject(
new Error(
`timeout reached: only ${storedRequest.current} responses received out of ${storedRequest.expected}`
)
`timeout reached: only ${storedRequest.current} responses received out of ${storedRequest.expected}`,
),
);
this.requests.delete(requestId);
}
Expand All @@ -711,16 +738,17 @@ export class MongoAdapter extends Adapter {
current: 0,
expected: expectedResponseCount,
responses: localSockets,
expectedNodes: new Set(this.nodesMap.keys()),
};
this.requests.set(requestId, storedRequest);

this.publish({
await this.publish({
type: EventType.FETCH_SOCKETS,
data: {
opts: MongoAdapter.serializeOptions(opts),
requestId,
},
});
}).catch(onPublishError);
});
}

Expand Down Expand Up @@ -748,7 +776,7 @@ export class MongoAdapter extends Adapter {

debug(
'waiting for %d responses to "serverSideEmit" request',
expectedResponseCount
expectedResponseCount,
);

if (expectedResponseCount <= 0) {
Expand All @@ -762,21 +790,22 @@ export class MongoAdapter extends Adapter {
if (storedRequest) {
ack(
new Error(
`timeout reached: only ${storedRequest.current} responses received out of ${storedRequest.expected}`
`timeout reached: only ${storedRequest.current} responses received out of ${storedRequest.expected}`,
),
storedRequest.responses
storedRequest.responses,
);
this.requests.delete(requestId);
}
}, this.requestsTimeout);

const storedRequest = {
type: EventType.FETCH_SOCKETS,
type: EventType.SERVER_SIDE_EMIT,
resolve: ack,
timeout,
current: 0,
expected: expectedResponseCount,
responses: [],
expectedNodes: new Set(this.nodesMap.keys()),
};
this.requests.set(requestId, storedRequest);

Expand All @@ -799,7 +828,7 @@ export class MongoAdapter extends Adapter {

override async restoreSession(
pid: PrivateSessionId,
offset: string
offset: string,
): Promise<Session> {
if (!ObjectId.isValid(offset)) {
return Promise.reject("invalid offset");
Expand Down Expand Up @@ -890,7 +919,7 @@ export class MongoAdapter extends Adapter {
}

private findSession(
pid: PrivateSessionId
pid: PrivateSessionId,
): Promise<WithId<Document> | undefined> {
const isCollectionCapped = !this.addCreatedAtField;
if (isCollectionCapped) {
Expand All @@ -904,7 +933,7 @@ export class MongoAdapter extends Adapter {
sort: {
_id: -1,
},
}
},
)
.then((result) => {
if (!result) {
Expand Down
Loading