diff --git a/.changeset/abort-handlers-on-close.md b/.changeset/abort-handlers-on-close.md new file mode 100644 index 000000000..b6bc65e65 --- /dev/null +++ b/.changeset/abort-handlers-on-close.md @@ -0,0 +1,5 @@ +--- +'@modelcontextprotocol/core': patch +--- + +Abort in-flight request handlers when the connection closes. Previously, request handlers would continue running after the transport disconnected, wasting resources and preventing proper cleanup. Also fixes `InMemoryTransport.close()` firing `onclose` twice on the initiating side. diff --git a/packages/core/src/shared/protocol.ts b/packages/core/src/shared/protocol.ts index d82d67da3..836ec3a19 100644 --- a/packages/core/src/shared/protocol.ts +++ b/packages/core/src/shared/protocol.ts @@ -494,13 +494,28 @@ export abstract class Protocol { this._taskManager.onClose(); this._pendingDebouncedNotifications.clear(); + for (const info of this._timeoutInfo.values()) { + clearTimeout(info.timeoutId); + } + this._timeoutInfo.clear(); + + const requestHandlerAbortControllers = this._requestHandlerAbortControllers; + this._requestHandlerAbortControllers = new Map(); + const error = new SdkError(SdkErrorCode.ConnectionClosed, 'Connection closed'); this._transport = undefined; - this.onclose?.(); - for (const handler of responseHandlers.values()) { - handler(error); + try { + this.onclose?.(); + } finally { + for (const handler of responseHandlers.values()) { + handler(error); + } + + for (const controller of requestHandlerAbortControllers.values()) { + controller.abort(error); + } } } diff --git a/packages/core/src/shared/taskManager.ts b/packages/core/src/shared/taskManager.ts index 28460d1d9..2d6f4eeaa 100644 --- a/packages/core/src/shared/taskManager.ts +++ b/packages/core/src/shared/taskManager.ts @@ -801,6 +801,7 @@ export class TaskManager { onClose(): void { this._taskProgressTokens.clear(); + this._requestResolvers.clear(); } // -- Private helpers -- diff --git a/packages/core/src/util/inMemory.ts b/packages/core/src/util/inMemory.ts index 1103b5733..256363c13 100644 --- a/packages/core/src/util/inMemory.ts +++ b/packages/core/src/util/inMemory.ts @@ -13,6 +13,7 @@ interface QueuedMessage { export class InMemoryTransport implements Transport { private _otherTransport?: InMemoryTransport; private _messageQueue: QueuedMessage[] = []; + private _closed = false; onclose?: () => void; onerror?: (error: Error) => void; @@ -39,10 +40,16 @@ export class InMemoryTransport implements Transport { } async close(): Promise { + if (this._closed) return; + this._closed = true; + const other = this._otherTransport; this._otherTransport = undefined; - await other?.close(); - this.onclose?.(); + try { + await other?.close(); + } finally { + this.onclose?.(); + } } /** diff --git a/packages/core/test/inMemory.test.ts b/packages/core/test/inMemory.test.ts index 280efdf4b..46332eaa2 100644 --- a/packages/core/test/inMemory.test.ts +++ b/packages/core/test/inMemory.test.ts @@ -99,6 +99,53 @@ describe('InMemoryTransport', () => { await expect(clientTransport.send({ jsonrpc: '2.0', method: 'test', id: 1 })).rejects.toThrow('Not connected'); }); + test('should fire onclose exactly once per transport', async () => { + let clientCloseCount = 0; + let serverCloseCount = 0; + + clientTransport.onclose = () => clientCloseCount++; + serverTransport.onclose = () => serverCloseCount++; + + await clientTransport.close(); + + expect(clientCloseCount).toBe(1); + expect(serverCloseCount).toBe(1); + }); + + test('should handle double close idempotently', async () => { + let clientCloseCount = 0; + clientTransport.onclose = () => clientCloseCount++; + + await clientTransport.close(); + await clientTransport.close(); + + expect(clientCloseCount).toBe(1); + }); + + test('should handle concurrent close from both sides', async () => { + let clientCloseCount = 0; + let serverCloseCount = 0; + + clientTransport.onclose = () => clientCloseCount++; + serverTransport.onclose = () => serverCloseCount++; + + await Promise.all([clientTransport.close(), serverTransport.close()]); + + expect(clientCloseCount).toBe(1); + expect(serverCloseCount).toBe(1); + }); + + test('should fire onclose even if peer onclose throws', async () => { + let clientCloseCount = 0; + clientTransport.onclose = () => clientCloseCount++; + serverTransport.onclose = () => { + throw new Error('boom'); + }; + + await expect(clientTransport.close()).rejects.toThrow('boom'); + expect(clientCloseCount).toBe(1); + }); + test('should queue messages sent before start', async () => { const message: JSONRPCMessage = { jsonrpc: '2.0', diff --git a/packages/core/test/shared/protocol.test.ts b/packages/core/test/shared/protocol.test.ts index 60e4e6d24..69735bc3a 100644 --- a/packages/core/test/shared/protocol.test.ts +++ b/packages/core/test/shared/protocol.test.ts @@ -217,6 +217,36 @@ describe('protocol tests', () => { expect(oncloseMock).toHaveBeenCalled(); }); + test('should abort in-flight request handlers when the connection is closed', async () => { + await protocol.connect(transport); + + let abortReason: unknown; + let handlerStarted = false; + const handlerDone = new Promise(resolve => { + protocol.setRequestHandler('ping', async (_request, ctx) => { + handlerStarted = true; + await new Promise(resolveInner => { + ctx.mcpReq.signal.addEventListener('abort', () => { + abortReason = ctx.mcpReq.signal.reason; + resolveInner(); + }); + }); + resolve(); + return {}; + }); + }); + + transport.onmessage?.({ jsonrpc: '2.0', id: 1, method: 'ping', params: {} }); + + await vi.waitFor(() => expect(handlerStarted).toBe(true)); + + await transport.close(); + await handlerDone; + + expect(abortReason).toBeInstanceOf(SdkError); + expect((abortReason as SdkError).code).toBe(SdkErrorCode.ConnectionClosed); + }); + test('should not overwrite existing hooks when connecting transports', async () => { const oncloseMock = vi.fn(); const onerrorMock = vi.fn();