From b8e4486fc3a724c79f2c0a6b65a678ee97757def Mon Sep 17 00:00:00 2001 From: Jacky Zhao Date: Sat, 7 Mar 2026 17:16:05 -0800 Subject: [PATCH 1/8] encode before incrementing seq + putting in send buffer --- __tests__/unserializable.test.ts | 160 ++++++++++++++++++ testUtil/fixtures/cleanup.ts | 18 +- transport/message.ts | 11 ++ transport/results.ts | 3 +- .../sessionStateMachine/SessionConnected.ts | 63 ++++--- transport/sessionStateMachine/common.ts | 46 +++-- .../sessionStateMachine/stateMachine.test.ts | 9 +- transport/sessionStateMachine/transitions.ts | 6 +- 8 files changed, 268 insertions(+), 48 deletions(-) create mode 100644 __tests__/unserializable.test.ts diff --git a/__tests__/unserializable.test.ts b/__tests__/unserializable.test.ts new file mode 100644 index 00000000..15981a0b --- /dev/null +++ b/__tests__/unserializable.test.ts @@ -0,0 +1,160 @@ +import { beforeEach, describe, expect, test } from 'vitest'; +import { Type } from '@sinclair/typebox'; +import { + Procedure, + createServiceSchema, + Ok, + createClient, + createServer, + UNEXPECTED_DISCONNECT_CODE, +} from '../router'; +import { testMatrix } from '../testUtil/fixtures/matrix'; +import { + advanceFakeTimersBySessionGrace, + cleanupTransports, + createPostTestCleanups, +} from '../testUtil/fixtures/cleanup'; +import { TestSetupHelpers } from '../testUtil/fixtures/transports'; +import { readNextResult } from '../testUtil'; + +const ServiceSchema = createServiceSchema(); + +const UnserializableServiceSchema = ServiceSchema.define({ + returnSymbol: Procedure.rpc({ + requestInit: Type.Object({}), + responseData: Type.Object({ id: Type.String() }), + async handler() { + return Ok({ id: 'test', extra: Symbol('unserializable') }); + }, + }), + streamSymbol: Procedure.subscription({ + requestInit: Type.Object({}), + responseData: Type.Object({ id: Type.String() }), + async handler({ resWritable }) { + resWritable.write(Ok({ id: 'test', extra: Symbol('unserializable') })); + resWritable.close(); + }, + }), +}); + +describe('unserializable values in procedure handlers', () => { + // binary codec (msgpack) throws on Symbol, causing encode failure + // which kills the session -- only test with ws transport since mock + // transport's setImmediate chains conflict with fake timer flushing + describe.each(testMatrix(['ws', 'binary']))( + 'binary codec ($transport.name transport)', + ({ transport, codec }) => { + const opts = { codec: codec.codec }; + const { addPostTestCleanup, postTestCleanup } = createPostTestCleanups(); + let getClientTransport: TestSetupHelpers['getClientTransport']; + let getServerTransport: TestSetupHelpers['getServerTransport']; + + beforeEach(async () => { + const setup = await transport.setup({ client: opts, server: opts }); + getClientTransport = setup.getClientTransport; + getServerTransport = setup.getServerTransport; + + return async () => { + await postTestCleanup(); + await setup.cleanup(); + }; + }); + + test('rpc handler returning symbol causes client disconnect', async () => { + const clientTransport = getClientTransport('client'); + const serverTransport = getServerTransport(); + const services = { svc: UnserializableServiceSchema }; + createServer(serverTransport, services); + const client = createClient( + clientTransport, + serverTransport.clientId, + ); + addPostTestCleanup(() => + cleanupTransports([clientTransport, serverTransport]), + ); + + const resultPromise = client.svc.returnSymbol.rpc({}); + await advanceFakeTimersBySessionGrace(); + + const result = await resultPromise; + expect(result).toMatchObject({ + ok: false, + payload: { + code: UNEXPECTED_DISCONNECT_CODE, + }, + }); + }); + + test('subscription handler writing symbol causes client disconnect', async () => { + const clientTransport = getClientTransport('client'); + const serverTransport = getServerTransport(); + const services = { svc: UnserializableServiceSchema }; + createServer(serverTransport, services); + const client = createClient( + clientTransport, + serverTransport.clientId, + ); + addPostTestCleanup(() => + cleanupTransports([clientTransport, serverTransport]), + ); + + const { resReadable } = client.svc.streamSymbol.subscribe({}); + await advanceFakeTimersBySessionGrace(); + + const result = await readNextResult(resReadable); + expect(result).toMatchObject({ + ok: false, + payload: { + code: UNEXPECTED_DISCONNECT_CODE, + }, + }); + }); + }, + ); + + // json codec silently drops Symbol values via JSON.stringify + describe.each(testMatrix(['all', 'naive']))( + 'json codec ($transport.name transport)', + ({ transport, codec }) => { + const opts = { codec: codec.codec }; + const { addPostTestCleanup, postTestCleanup } = createPostTestCleanups(); + let getClientTransport: TestSetupHelpers['getClientTransport']; + let getServerTransport: TestSetupHelpers['getServerTransport']; + + beforeEach(async () => { + const setup = await transport.setup({ client: opts, server: opts }); + getClientTransport = setup.getClientTransport; + getServerTransport = setup.getServerTransport; + + return async () => { + await postTestCleanup(); + await setup.cleanup(); + }; + }); + + test('rpc handler returning symbol silently drops the value', async () => { + const clientTransport = getClientTransport('client'); + const serverTransport = getServerTransport(); + const services = { svc: UnserializableServiceSchema }; + const server = createServer(serverTransport, services); + const client = createClient( + clientTransport, + serverTransport.clientId, + ); + addPostTestCleanup(() => + cleanupTransports([clientTransport, serverTransport]), + ); + + const result = await client.svc.returnSymbol.rpc({}); + // JSON.stringify silently drops Symbol values, so the + // response arrives with the extra symbol field missing + expect(result).toStrictEqual({ + ok: true, + payload: { id: 'test' }, + }); + + await server.close(); + }); + }, + ); +}); diff --git a/testUtil/fixtures/cleanup.ts b/testUtil/fixtures/cleanup.ts index 2ac0ed43..43cf38eb 100644 --- a/testUtil/fixtures/cleanup.ts +++ b/testUtil/fixtures/cleanup.ts @@ -1,8 +1,7 @@ -import { expect, vi } from 'vitest'; +import { assert, expect, vi } from 'vitest'; import { ClientTransport, Connection, - OpaqueTransportMessage, ServerTransport, Transport, } from '../../transport'; @@ -68,14 +67,17 @@ export async function ensureTransportBuffersAreEventuallyEmpty( [...t.sessions] .map(([client, sess]) => { // get all messages that are not heartbeats - const buff = sess.sendBuffer.filter((msg) => { - return !Value.Check(ControlMessageAckSchema, msg.payload); + const buff = sess.sendBuffer.filter((encodedMsg) => { + const decoded = sess.codec.fromBuffer(encodedMsg.data); + assert(decoded.ok); + + return !Value.Check( + ControlMessageAckSchema, + decoded.value.payload, + ); }); - return [client, buff] as [ - string, - ReadonlyArray, - ]; + return [client, buff] as const; }) .filter((entry) => entry[1].length > 0), ), diff --git a/transport/message.ts b/transport/message.ts index 1879341d..d8ccdbc1 100644 --- a/transport/message.ts +++ b/transport/message.ts @@ -280,6 +280,17 @@ export function cancelMessage( export type OpaqueTransportMessage = TransportMessage; export type TransportClientId = string; +/** + * An encoded message that is ready to be sent over the transport. + * The seq number is kept to track which messages have been + * acked by the peer and can be dropped from the send buffer. + */ +export interface EncodedTransportMessage { + id: string; + seq: number; + data: Uint8Array; +} + /** * Checks if the given control flag (usually found in msg.controlFlag) is an ack message. * @param controlFlag - The control flag to check. diff --git a/transport/results.ts b/transport/results.ts index ff8c12da..df9879b9 100644 --- a/transport/results.ts +++ b/transport/results.ts @@ -1,4 +1,4 @@ -import { OpaqueTransportMessage } from './message'; +import { EncodedTransportMessage, OpaqueTransportMessage } from './message'; // internal use only, not to be used in public API type SessionApiResult = @@ -13,5 +13,6 @@ type SessionApiResult = export type SendResult = SessionApiResult; export type SendBufferResult = SessionApiResult; +export type EncodeResult = SessionApiResult; export type SerializeResult = SessionApiResult; export type DeserializeResult = SessionApiResult; diff --git a/transport/sessionStateMachine/SessionConnected.ts b/transport/sessionStateMachine/SessionConnected.ts index 747c02cb..1633c017 100644 --- a/transport/sessionStateMachine/SessionConnected.ts +++ b/transport/sessionStateMachine/SessionConnected.ts @@ -2,15 +2,14 @@ import { Static } from '@sinclair/typebox'; import { ControlFlags, ControlMessageAckSchema, + EncodedTransportMessage, OpaqueTransportMessage, PartialTransportMessage, - TransportMessage, isAck, } from '../message'; import { IdentifiedSession, IdentifiedSessionProps, - sendMessage, SessionState, } from './common'; import { Connection } from '../connection'; @@ -21,7 +20,10 @@ export interface SessionConnectedListeners { onConnectionErrored: (err: unknown) => void; onConnectionClosed: () => void; onMessage: (msg: OpaqueTransportMessage) => void; - onMessageSendFailure: (msg: PartialTransportMessage, reason: string) => void; + onMessageSendFailure: ( + msg: Omit, + reason: string, + ) => void; onInvalidMessage: (reason: string) => void; } @@ -57,12 +59,11 @@ export class SessionConnected< this.startMissingHeartbeatTimeout(); } - private assertSendOrdering(constructedMsg: TransportMessage) { - if (constructedMsg.seq > this.seqSent + 1) { - const msg = `invariant violation: would have sent out of order msg (seq: ${constructedMsg.seq}, expected: ${this.seqSent} + 1)`; + private assertSendOrdering(encodedMsg: EncodedTransportMessage) { + if (encodedMsg.seq > this.seqSent + 1) { + const msg = `invariant violation: would have sent out of order msg (seq: ${encodedMsg.seq}, expected: ${this.seqSent} + 1)`; this.log?.error(msg, { ...this.loggingMetadata, - transportMessage: constructedMsg, tags: ['invariant-violation'], }); @@ -71,19 +72,34 @@ export class SessionConnected< } send(msg: PartialTransportMessage): SendResult { - const constructedMsg = this.constructMsg(msg); - this.assertSendOrdering(constructedMsg); - this.sendBuffer.push(constructedMsg); - const res = sendMessage(this.conn, this.codec, constructedMsg); - if (!res.ok) { - this.listeners.onMessageSendFailure(constructedMsg, res.reason); - - return res; + const encodeResult = this.encodeMsg(msg); + if (!encodeResult.ok) { + this.listeners.onMessageSendFailure( + { id: 'unknown', seq: this.seq }, + encodeResult.reason, + ); + + return encodeResult; + } + + const encodedMsg = encodeResult.value; + this.assertSendOrdering(encodedMsg); + this.sendBuffer.push(encodedMsg); + + const sent = this.conn.send(encodedMsg.data); + if (!sent) { + const reason = 'failed to send message'; + this.listeners.onMessageSendFailure( + { id: encodedMsg.id, seq: encodedMsg.seq }, + reason, + ); + + return { ok: false, reason }; } - this.seqSent = constructedMsg.seq; + this.seqSent = encodedMsg.seq; - return res; + return { ok: true, value: encodedMsg.id }; } constructor(props: SessionConnectedProps) { @@ -110,11 +126,16 @@ export class SessionConnected< for (const msg of this.sendBuffer) { this.assertSendOrdering(msg); - const res = sendMessage(this.conn, this.codec, msg); - if (!res.ok) { - this.listeners.onMessageSendFailure(msg, res.reason); - return res; + const sent = this.conn.send(msg.data); + if (!sent) { + const reason = 'failed to send buffered message'; + this.listeners.onMessageSendFailure( + { id: msg.id, seq: msg.seq }, + reason, + ); + + return { ok: false, reason }; } this.seqSent = msg.seq; diff --git a/transport/sessionStateMachine/common.ts b/transport/sessionStateMachine/common.ts index 2db1499b..bf6ce367 100644 --- a/transport/sessionStateMachine/common.ts +++ b/transport/sessionStateMachine/common.ts @@ -1,7 +1,7 @@ import { Logger, MessageMetadata } from '../../logging'; import { TelemetryInfo } from '../../tracing'; import { - OpaqueTransportMessage, + EncodedTransportMessage, PartialTransportMessage, ProtocolVersion, TransportClientId, @@ -10,7 +10,7 @@ import { import { Codec, CodecMessageAdapter } from '../../codec'; import { generateId } from '../id'; import { Tracer } from '@opentelemetry/api'; -import { SendResult } from '../results'; +import { EncodeResult, SendResult } from '../results'; import { Connection } from '../connection'; export const enum SessionState { @@ -174,7 +174,15 @@ export abstract class CommonSession extends StateMachineState { export type InheritedProperties = Pick< IdentifiedSession, - 'id' | 'from' | 'to' | 'seq' | 'ack' | 'sendBuffer' | 'telemetry' | 'options' + | 'id' + | 'from' + | 'to' + | 'seq' + | 'ack' + | 'seqSent' + | 'sendBuffer' + | 'telemetry' + | 'options' >; export type SessionId = string; @@ -186,7 +194,7 @@ export interface IdentifiedSessionProps extends CommonSessionProps { seq: number; ack: number; seqSent: number; - sendBuffer: Array; + sendBuffer: Array; telemetry: TelemetryInfo; protocolVersion: ProtocolVersion; } @@ -211,7 +219,7 @@ export abstract class IdentifiedSession extends CommonSession { * Number of unique messages we've received this session (excluding handshake) */ ack: number; - sendBuffer: Array; + sendBuffer: Array; constructor(props: IdentifiedSessionProps) { const { @@ -255,9 +263,7 @@ export abstract class IdentifiedSession extends CommonSession { return metadata; } - constructMsg( - partialMsg: PartialTransportMessage, - ): TransportMessage { + encodeMsg(partialMsg: PartialTransportMessage): EncodeResult { const msg = { ...partialMsg, id: generateId(), @@ -267,9 +273,21 @@ export abstract class IdentifiedSession extends CommonSession { ack: this.ack, }; + const encoded = this.codec.toBuffer(msg); + if (!encoded.ok) { + return encoded; + } + this.seq++; - return msg; + return { + ok: true, + value: { + id: msg.id, + seq: msg.seq, + data: encoded.value, + }, + }; } nextSeq(): number { @@ -277,12 +295,16 @@ export abstract class IdentifiedSession extends CommonSession { } send(msg: PartialTransportMessage): SendResult { - const constructedMsg = this.constructMsg(msg); - this.sendBuffer.push(constructedMsg); + const encodeResult = this.encodeMsg(msg); + if (!encodeResult.ok) { + return encodeResult; + } + + this.sendBuffer.push(encodeResult.value); return { ok: true, - value: constructedMsg.id, + value: encodeResult.value.id, }; } diff --git a/transport/sessionStateMachine/stateMachine.test.ts b/transport/sessionStateMachine/stateMachine.test.ts index 725cd8e4..d3a8c407 100644 --- a/transport/sessionStateMachine/stateMachine.test.ts +++ b/transport/sessionStateMachine/stateMachine.test.ts @@ -1,4 +1,4 @@ -import { describe, expect, test, vi } from 'vitest'; +import { assert, describe, expect, test, vi } from 'vitest'; import { payloadToTransportMessage, testingSessionOptions, @@ -1866,8 +1866,11 @@ describe('session state machine', () => { expect(onConnectionClosed).not.toHaveBeenCalled(); expect(onConnectionErrored).not.toHaveBeenCalled(); - const msg = session.constructMsg(payloadToTransportMessage('hello')); - session.conn.emitData(session.options.codec.toBuffer(msg)); + const encodeResult = session.encodeMsg( + payloadToTransportMessage('hello'), + ); + assert(encodeResult.ok); + session.conn.emitData(encodeResult.value.data); await waitFor(async () => { expect(onMessage).toHaveBeenCalledTimes(1); diff --git a/transport/sessionStateMachine/transitions.ts b/transport/sessionStateMachine/transitions.ts index b14a9c34..08c738e5 100644 --- a/transport/sessionStateMachine/transitions.ts +++ b/transport/sessionStateMachine/transitions.ts @@ -1,4 +1,4 @@ -import { OpaqueTransportMessage, TransportClientId } from '..'; +import { TransportClientId } from '..'; import { SessionConnecting, SessionConnectingListeners, @@ -38,7 +38,7 @@ import { SessionBackingOff, SessionBackingOffListeners, } from './SessionBackingOff'; -import { ProtocolVersion } from '../message'; +import { EncodedTransportMessage, ProtocolVersion } from '../message'; import { Tracer } from '@opentelemetry/api'; import { CodecMessageAdapter } from '../../codec'; @@ -84,7 +84,7 @@ export const SessionStateGraph = { ) => { const id = `session-${generateId()}`; const telemetry = createSessionTelemetryInfo(tracer, id, to, from); - const sendBuffer: Array = []; + const sendBuffer: Array = []; const session = new SessionNoConnection({ listeners, From 9e60326fa892c0c6da3fde9f0bcbdf5c7cc35cbc Mon Sep 17 00:00:00 2001 From: Jacky Zhao Date: Sat, 7 Mar 2026 17:16:29 -0800 Subject: [PATCH 2/8] 0.214.1 --- package-lock.json | 4 ++-- package.json | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/package-lock.json b/package-lock.json index e590b98b..a5692630 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@replit/river", - "version": "0.214.0", + "version": "0.214.1", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@replit/river", - "version": "0.214.0", + "version": "0.214.1", "license": "MIT", "dependencies": { "@msgpack/msgpack": "^3.1.2", diff --git a/package.json b/package.json index cdca6edd..2cd250f5 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "@replit/river", "description": "It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!", - "version": "0.214.0", + "version": "0.214.1", "type": "module", "exports": { ".": { From eb6a01a1ecc798071fb20d370a8d99e6c3f53376 Mon Sep 17 00:00:00 2001 From: Jacky Zhao Date: Sat, 7 Mar 2026 18:37:39 -0800 Subject: [PATCH 3/8] fix --- transport/message.ts | 1 + transport/sessionStateMachine/SessionConnected.ts | 8 ++++---- transport/sessionStateMachine/common.ts | 1 + 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/transport/message.ts b/transport/message.ts index d8ccdbc1..40861281 100644 --- a/transport/message.ts +++ b/transport/message.ts @@ -288,6 +288,7 @@ export type TransportClientId = string; export interface EncodedTransportMessage { id: string; seq: number; + msg: PartialTransportMessage; data: Uint8Array; } diff --git a/transport/sessionStateMachine/SessionConnected.ts b/transport/sessionStateMachine/SessionConnected.ts index 1633c017..bc249451 100644 --- a/transport/sessionStateMachine/SessionConnected.ts +++ b/transport/sessionStateMachine/SessionConnected.ts @@ -21,7 +21,7 @@ export interface SessionConnectedListeners { onConnectionClosed: () => void; onMessage: (msg: OpaqueTransportMessage) => void; onMessageSendFailure: ( - msg: Omit, + msg: PartialTransportMessage & { seq: number }, reason: string, ) => void; onInvalidMessage: (reason: string) => void; @@ -75,7 +75,7 @@ export class SessionConnected< const encodeResult = this.encodeMsg(msg); if (!encodeResult.ok) { this.listeners.onMessageSendFailure( - { id: 'unknown', seq: this.seq }, + { ...msg, seq: this.seq }, encodeResult.reason, ); @@ -90,7 +90,7 @@ export class SessionConnected< if (!sent) { const reason = 'failed to send message'; this.listeners.onMessageSendFailure( - { id: encodedMsg.id, seq: encodedMsg.seq }, + { ...encodedMsg.msg, seq: encodedMsg.seq }, reason, ); @@ -131,7 +131,7 @@ export class SessionConnected< if (!sent) { const reason = 'failed to send buffered message'; this.listeners.onMessageSendFailure( - { id: msg.id, seq: msg.seq }, + { ...msg.msg, seq: msg.seq }, reason, ); diff --git a/transport/sessionStateMachine/common.ts b/transport/sessionStateMachine/common.ts index bf6ce367..57e316f7 100644 --- a/transport/sessionStateMachine/common.ts +++ b/transport/sessionStateMachine/common.ts @@ -285,6 +285,7 @@ export abstract class IdentifiedSession extends CommonSession { value: { id: msg.id, seq: msg.seq, + msg: partialMsg, data: encoded.value, }, }; From b3512a35c0151eab98078111ff213c4bf7cd259b Mon Sep 17 00:00:00 2001 From: Jacky Zhao Date: Sat, 7 Mar 2026 19:47:26 -0800 Subject: [PATCH 4/8] fix again --- __tests__/unserializable.test.ts | 32 ++++++++++++++++++++++++++++++++ router/client.ts | 25 +++++++++++++++---------- 2 files changed, 47 insertions(+), 10 deletions(-) diff --git a/__tests__/unserializable.test.ts b/__tests__/unserializable.test.ts index 15981a0b..dc1a610f 100644 --- a/__tests__/unserializable.test.ts +++ b/__tests__/unserializable.test.ts @@ -85,6 +85,38 @@ describe('unserializable values in procedure handlers', () => { }); }); + test('client-side encode failure cleans up listeners', async () => { + const clientTransport = getClientTransport('client'); + const serverTransport = getServerTransport(); + const services = { svc: UnserializableServiceSchema }; + createServer(serverTransport, services); + const client = createClient( + clientTransport, + serverTransport.clientId, + ); + addPostTestCleanup(() => + cleanupTransports([clientTransport, serverTransport]), + ); + + const messageListenersBefore = + clientTransport.eventDispatcher.numberOfListeners('message'); + const sessionStatusListenersBefore = + clientTransport.eventDispatcher.numberOfListeners('sessionStatus'); + + // sending a Symbol as init payload will fail encoding on the client side + expect(() => + client.svc.returnSymbol.rpc({ extra: Symbol('x') } as any), + ).toThrow(); + + // listeners should not leak after the failed send + expect( + clientTransport.eventDispatcher.numberOfListeners('message'), + ).toEqual(messageListenersBefore); + expect( + clientTransport.eventDispatcher.numberOfListeners('sessionStatus'), + ).toEqual(sessionStatusListenersBefore); + }); + test('subscription handler writing symbol causes client disconnect', async () => { const clientTransport = getClientTransport('client'); const serverTransport = getServerTransport(); diff --git a/router/client.ts b/router/client.ts index 798974bc..818a02bc 100644 --- a/router/client.ts +++ b/router/client.ts @@ -508,16 +508,21 @@ function handleProc( transport.addEventListener('message', onMessage); transport.addEventListener('sessionStatus', onSessionStatus); - sessionScopedSend({ - streamId, - serviceName, - procedureName, - tracing: getPropagationContext(ctx), - payload: init, - controlFlags: procClosesWithInit - ? ControlFlags.StreamOpenBit | ControlFlags.StreamClosedBit - : ControlFlags.StreamOpenBit, - }); + try { + sessionScopedSend({ + streamId, + serviceName, + procedureName, + tracing: getPropagationContext(ctx), + payload: init, + controlFlags: procClosesWithInit + ? ControlFlags.StreamOpenBit | ControlFlags.StreamClosedBit + : ControlFlags.StreamOpenBit, + }); + } catch (e) { + cleanup(); + throw e; + } if (procClosesWithInit) { reqWritable.close(); From afaaadbddcd5d98abde5e3c38893cc483723b449 Mon Sep 17 00:00:00 2001 From: Jacky Zhao Date: Sat, 7 Mar 2026 20:03:32 -0800 Subject: [PATCH 5/8] yes --- __tests__/unserializable.test.ts | 1 + transport/sessionStateMachine/SessionConnected.ts | 2 ++ 2 files changed, 3 insertions(+) diff --git a/__tests__/unserializable.test.ts b/__tests__/unserializable.test.ts index dc1a610f..c7754aed 100644 --- a/__tests__/unserializable.test.ts +++ b/__tests__/unserializable.test.ts @@ -105,6 +105,7 @@ describe('unserializable values in procedure handlers', () => { // sending a Symbol as init payload will fail encoding on the client side expect(() => + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument, @typescript-eslint/no-explicit-any client.svc.returnSymbol.rpc({ extra: Symbol('x') } as any), ).toThrow(); diff --git a/transport/sessionStateMachine/SessionConnected.ts b/transport/sessionStateMachine/SessionConnected.ts index bc249451..4e93135c 100644 --- a/transport/sessionStateMachine/SessionConnected.ts +++ b/transport/sessionStateMachine/SessionConnected.ts @@ -74,6 +74,8 @@ export class SessionConnected< send(msg: PartialTransportMessage): SendResult { const encodeResult = this.encodeMsg(msg); if (!encodeResult.ok) { + // safety: onMessageSendFailure tears down the session via protocol error, + // which emits sessionStatus 'closing' and cleans up all procedure listeners. this.listeners.onMessageSendFailure( { ...msg, seq: this.seq }, encodeResult.reason, From 2a560ccc58b32fb64a1ec3dadb080e87bd02b8d8 Mon Sep 17 00:00:00 2001 From: Jacky Zhao Date: Sat, 7 Mar 2026 20:55:17 -0800 Subject: [PATCH 6/8] fix --- testUtil/index.ts | 3 + transport/client.ts | 48 ++++++++++++++++ transport/server.ts | 12 ++++ .../sessionStateMachine/SessionConnected.ts | 14 +---- transport/sessionStateMachine/common.ts | 21 ++++++- transport/sessionStateMachine/index.ts | 5 +- .../sessionStateMachine/stateMachine.test.ts | 4 ++ transport/sessionStateMachine/transitions.ts | 6 +- transport/transport.ts | 57 +++++++++++++++---- 9 files changed, 142 insertions(+), 28 deletions(-) diff --git a/testUtil/index.ts b/testUtil/index.ts index dc3dee0b..ae9a4330 100644 --- a/testUtil/index.ts +++ b/testUtil/index.ts @@ -184,6 +184,9 @@ export function dummySession() { onSessionGracePeriodElapsed: () => { /* noop */ }, + onMessageSendFailure: () => { + /* noop */ + }, }, testingSessionOptions, currentProtocolVersion, diff --git a/transport/client.ts b/transport/client.ts index 5927e4f9..0d5d876c 100644 --- a/transport/client.ts +++ b/transport/client.ts @@ -107,6 +107,18 @@ export abstract class ClientTransport< onSessionGracePeriodElapsed: () => { this.onSessionGracePeriodElapsed(session); }, + onMessageSendFailure: (msg, reason) => { + this.log?.error(`failed to send message: ${reason}`, { + ...session.loggingMetadata, + transportMessage: msg, + }); + + this.protocolError({ + type: ProtocolError.MessageSendFailure, + message: reason, + }); + this.deleteSession(session, { unhealthy: true }); + }, }, this.options, currentProtocolVersion, @@ -186,6 +198,18 @@ export abstract class ClientTransport< onSessionGracePeriodElapsed: () => { this.onSessionGracePeriodElapsed(handshakingSession); }, + onMessageSendFailure: (msg, reason) => { + this.log?.error(`failed to send message: ${reason}`, { + ...handshakingSession.loggingMetadata, + transportMessage: msg, + }); + + this.protocolError({ + type: ProtocolError.MessageSendFailure, + message: reason, + }); + this.deleteSession(handshakingSession, { unhealthy: true }); + }, }, ); @@ -395,6 +419,18 @@ export abstract class ClientTransport< onSessionGracePeriodElapsed: () => { this.onSessionGracePeriodElapsed(backingOffSession); }, + onMessageSendFailure: (msg, reason) => { + this.log?.error(`failed to send message: ${reason}`, { + ...backingOffSession.loggingMetadata, + transportMessage: msg, + }); + + this.protocolError({ + type: ProtocolError.MessageSendFailure, + message: reason, + }); + this.deleteSession(backingOffSession, { unhealthy: true }); + }, }, ); @@ -470,6 +506,18 @@ export abstract class ClientTransport< onSessionGracePeriodElapsed: () => { this.onSessionGracePeriodElapsed(connectingSession); }, + onMessageSendFailure: (msg, reason) => { + this.log?.error(`failed to send message: ${reason}`, { + ...connectingSession.loggingMetadata, + transportMessage: msg, + }); + + this.protocolError({ + type: ProtocolError.MessageSendFailure, + message: reason, + }); + this.deleteSession(connectingSession, { unhealthy: true }); + }, }, ); diff --git a/transport/server.ts b/transport/server.ts index 8a7f8554..78cf8042 100644 --- a/transport/server.ts +++ b/transport/server.ts @@ -408,6 +408,18 @@ export abstract class ServerTransport< onSessionGracePeriodElapsed: () => { this.onSessionGracePeriodElapsed(noConnectionSession); }, + onMessageSendFailure: (msg, reason) => { + this.log?.error(`failed to send message: ${reason}`, { + ...noConnectionSession.loggingMetadata, + transportMessage: msg, + }); + + this.protocolError({ + type: ProtocolError.MessageSendFailure, + message: reason, + }); + this.deleteSession(noConnectionSession, { unhealthy: true }); + }, }, ); diff --git a/transport/sessionStateMachine/SessionConnected.ts b/transport/sessionStateMachine/SessionConnected.ts index 4e93135c..1316ca5f 100644 --- a/transport/sessionStateMachine/SessionConnected.ts +++ b/transport/sessionStateMachine/SessionConnected.ts @@ -9,6 +9,7 @@ import { } from '../message'; import { IdentifiedSession, + IdentifiedSessionListeners, IdentifiedSessionProps, SessionState, } from './common'; @@ -16,14 +17,10 @@ import { Connection } from '../connection'; import { SpanStatusCode } from '@opentelemetry/api'; import { SendBufferResult, SendResult } from '../results'; -export interface SessionConnectedListeners { +export interface SessionConnectedListeners extends IdentifiedSessionListeners { onConnectionErrored: (err: unknown) => void; onConnectionClosed: () => void; onMessage: (msg: OpaqueTransportMessage) => void; - onMessageSendFailure: ( - msg: PartialTransportMessage & { seq: number }, - reason: string, - ) => void; onInvalidMessage: (reason: string) => void; } @@ -74,13 +71,6 @@ export class SessionConnected< send(msg: PartialTransportMessage): SendResult { const encodeResult = this.encodeMsg(msg); if (!encodeResult.ok) { - // safety: onMessageSendFailure tears down the session via protocol error, - // which emits sessionStatus 'closing' and cleans up all procedure listeners. - this.listeners.onMessageSendFailure( - { ...msg, seq: this.seq }, - encodeResult.reason, - ); - return encodeResult; } diff --git a/transport/sessionStateMachine/common.ts b/transport/sessionStateMachine/common.ts index 57e316f7..f97bb1f7 100644 --- a/transport/sessionStateMachine/common.ts +++ b/transport/sessionStateMachine/common.ts @@ -187,6 +187,13 @@ export type InheritedProperties = Pick< export type SessionId = string; +export interface IdentifiedSessionListeners { + onMessageSendFailure: ( + msg: PartialTransportMessage & { seq: number }, + reason: string, + ) => void; +} + // all sessions where we know the other side's client id export interface IdentifiedSessionProps extends CommonSessionProps { id: SessionId; @@ -197,6 +204,7 @@ export interface IdentifiedSessionProps extends CommonSessionProps { sendBuffer: Array; telemetry: TelemetryInfo; protocolVersion: ProtocolVersion; + listeners: IdentifiedSessionListeners; } export abstract class IdentifiedSession extends CommonSession { @@ -204,6 +212,7 @@ export abstract class IdentifiedSession extends CommonSession { readonly telemetry: TelemetryInfo; readonly to: TransportClientId; readonly protocolVersion: ProtocolVersion; + listeners: IdentifiedSessionListeners; /** * Index of the message we will send next (excluding handshake) @@ -232,6 +241,7 @@ export abstract class IdentifiedSession extends CommonSession { log, protocolVersion, seqSent: messagesSent, + listeners, } = props; super(props); this.id = id; @@ -243,6 +253,7 @@ export abstract class IdentifiedSession extends CommonSession { this.log = log; this.protocolVersion = protocolVersion; this.seqSent = messagesSent; + this.listeners = listeners; } get loggingMetadata(): MessageMetadata { @@ -275,6 +286,13 @@ export abstract class IdentifiedSession extends CommonSession { const encoded = this.codec.toBuffer(msg); if (!encoded.ok) { + // safety: onMessageSendFailure tears down the session via protocol error, + // which emits sessionStatus 'closing' and cleans up all procedure listeners. + this.listeners.onMessageSendFailure( + { ...partialMsg, seq: this.seq }, + encoded.reason, + ); + return encoded; } @@ -320,7 +338,8 @@ export abstract class IdentifiedSession extends CommonSession { } } -export interface IdentifiedSessionWithGracePeriodListeners { +export interface IdentifiedSessionWithGracePeriodListeners + extends IdentifiedSessionListeners { onSessionGracePeriodElapsed: () => void; } diff --git a/transport/sessionStateMachine/index.ts b/transport/sessionStateMachine/index.ts index 9d75122a..03830f81 100644 --- a/transport/sessionStateMachine/index.ts +++ b/transport/sessionStateMachine/index.ts @@ -1,7 +1,10 @@ export { SessionState } from './common'; export { type SessionWaitingForHandshake } from './SessionWaitingForHandshake'; export { type SessionConnecting } from './SessionConnecting'; -export { type SessionNoConnection } from './SessionNoConnection'; +export { + type SessionNoConnection, + type SessionNoConnectionListeners, +} from './SessionNoConnection'; export { type SessionHandshaking } from './SessionHandshaking'; export { type SessionConnected } from './SessionConnected'; export { diff --git a/transport/sessionStateMachine/stateMachine.test.ts b/transport/sessionStateMachine/stateMachine.test.ts index d3a8c407..171280e9 100644 --- a/transport/sessionStateMachine/stateMachine.test.ts +++ b/transport/sessionStateMachine/stateMachine.test.ts @@ -99,6 +99,7 @@ function getPendingMockConnection(): PendingMockConnectionHandle { function createSessionNoConnectionListeners(): SessionNoConnectionListeners { return { onSessionGracePeriodElapsed: vi.fn(), + onMessageSendFailure: vi.fn(), }; } @@ -106,6 +107,7 @@ function createSessionBackingOffListeners(): SessionBackingOffListeners { return { onBackoffFinished: vi.fn(), onSessionGracePeriodElapsed: vi.fn(), + onMessageSendFailure: vi.fn(), }; } @@ -115,6 +117,7 @@ function createSessionConnectingListeners(): SessionConnectingListeners { onConnectionFailed: vi.fn(), onConnectionTimeout: vi.fn(), onSessionGracePeriodElapsed: vi.fn(), + onMessageSendFailure: vi.fn(), }; } @@ -126,6 +129,7 @@ function createSessionHandshakingListeners(): SessionHandshakingListeners { onConnectionErrored: vi.fn(), onHandshakeTimeout: vi.fn(), onSessionGracePeriodElapsed: vi.fn(), + onMessageSendFailure: vi.fn(), }; } diff --git a/transport/sessionStateMachine/transitions.ts b/transport/sessionStateMachine/transitions.ts index 08c738e5..b85745ee 100644 --- a/transport/sessionStateMachine/transitions.ts +++ b/transport/sessionStateMachine/transitions.ts @@ -44,7 +44,7 @@ import { CodecMessageAdapter } from '../../codec'; function inheritSharedSession( session: IdentifiedSession, -): IdentifiedSessionProps { +): Omit { return { id: session.id, from: session.from, @@ -255,7 +255,7 @@ export const SessionStateGraph = { ): SessionConnected => { const conn = pendingSession.conn; const { from, options } = pendingSession; - const carriedState: IdentifiedSessionProps = oldSession + const carriedState: Omit = oldSession ? // old session exists, inherit state inheritSharedSession(oldSession) : // old session does not exist, create new state @@ -279,7 +279,7 @@ export const SessionStateGraph = { log: pendingSession.log, protocolVersion, codec: new CodecMessageAdapter(options.codec), - } satisfies IdentifiedSessionProps); + } satisfies Omit); pendingSession._handleStateExit(); oldSession?._handleStateExit(); diff --git a/transport/transport.ts b/transport/transport.ts index 1dd944fe..85e80083 100644 --- a/transport/transport.ts +++ b/transport/transport.ts @@ -10,7 +10,13 @@ import { LoggingLevel, createLogProxy, } from '../logging/log'; -import { EventDispatcher, EventHandler, EventMap, EventTypes } from './events'; +import { + EventDispatcher, + EventHandler, + EventMap, + EventTypes, + ProtocolError, +} from './events'; import { ProvidedTransportOptions, TransportOptions, @@ -21,6 +27,7 @@ import { SessionConnecting, SessionHandshaking, SessionNoConnection, + SessionNoConnectionListeners, SessionState, } from './sessionStateMachine'; import { Connection } from './connection'; @@ -277,6 +284,18 @@ export abstract class Transport { onSessionGracePeriodElapsed: () => { this.onSessionGracePeriodElapsed(noConnectionSession); }, + onMessageSendFailure: (msg, reason) => { + this.log?.error(`failed to send message: ${reason}`, { + ...noConnectionSession.loggingMetadata, + transportMessage: msg, + }); + + this.protocolError({ + type: ProtocolError.MessageSendFailure, + message: reason, + }); + this.deleteSession(noConnectionSession, { unhealthy: true }); + }, }); this.updateSession(noConnectionSession); @@ -289,20 +308,36 @@ export abstract class Transport { ): SessionNoConnection { // transition to no connection let noConnectionSession: SessionNoConnection; + const listeners: SessionNoConnectionListeners = { + onSessionGracePeriodElapsed: () => { + this.onSessionGracePeriodElapsed(noConnectionSession); + }, + onMessageSendFailure: (msg, reason) => { + this.log?.error(`failed to send message: ${reason}`, { + ...noConnectionSession.loggingMetadata, + transportMessage: msg, + }); + + this.protocolError({ + type: ProtocolError.MessageSendFailure, + message: reason, + }); + this.deleteSession(noConnectionSession, { unhealthy: true }); + }, + }; + if (session.state === SessionState.Handshaking) { noConnectionSession = - SessionStateGraph.transition.HandshakingToNoConnection(session, { - onSessionGracePeriodElapsed: () => { - this.onSessionGracePeriodElapsed(noConnectionSession); - }, - }); + SessionStateGraph.transition.HandshakingToNoConnection( + session, + listeners, + ); } else { noConnectionSession = - SessionStateGraph.transition.ConnectedToNoConnection(session, { - onSessionGracePeriodElapsed: () => { - this.onSessionGracePeriodElapsed(noConnectionSession); - }, - }); + SessionStateGraph.transition.ConnectedToNoConnection( + session, + listeners, + ); } this.updateSession(noConnectionSession); From bf48a1da12a0f5a8ef3b12daf17e37648edc0c8a Mon Sep 17 00:00:00 2001 From: Jacky Zhao Date: Sun, 8 Mar 2026 15:47:48 -0700 Subject: [PATCH 7/8] test --- .../sessionStateMachine/SessionHandshaking.ts | 19 +++- .../SessionWaitingForHandshake.ts | 25 +++-- transport/sessionStateMachine/common.ts | 26 ----- .../sessionStateMachine/stateMachine.test.ts | 97 +++++++++++++++++++ 4 files changed, 132 insertions(+), 35 deletions(-) diff --git a/transport/sessionStateMachine/SessionHandshaking.ts b/transport/sessionStateMachine/SessionHandshaking.ts index b7c14a02..50359952 100644 --- a/transport/sessionStateMachine/SessionHandshaking.ts +++ b/transport/sessionStateMachine/SessionHandshaking.ts @@ -9,7 +9,6 @@ import { IdentifiedSessionWithGracePeriod, IdentifiedSessionWithGracePeriodListeners, IdentifiedSessionWithGracePeriodProps, - sendMessage, SessionState, } from './common'; import { SendResult } from '../results'; @@ -83,7 +82,23 @@ export class SessionHandshaking< }; sendHandshake(msg: TransportMessage): SendResult { - return sendMessage(this.conn, this.codec, msg); + const buff = this.codec.toBuffer(msg); + if (!buff.ok) { + return buff; + } + + const sent = this.conn.send(buff.value); + if (!sent) { + return { + ok: false, + reason: 'failed to send handshake', + }; + } + + return { + ok: true, + value: msg.id, + }; } _handleStateExit(): void { diff --git a/transport/sessionStateMachine/SessionWaitingForHandshake.ts b/transport/sessionStateMachine/SessionWaitingForHandshake.ts index d88685de..6a8107d9 100644 --- a/transport/sessionStateMachine/SessionWaitingForHandshake.ts +++ b/transport/sessionStateMachine/SessionWaitingForHandshake.ts @@ -5,12 +5,7 @@ import { OpaqueTransportMessage, TransportMessage, } from '../message'; -import { - CommonSession, - CommonSessionProps, - sendMessage, - SessionState, -} from './common'; +import { CommonSession, CommonSessionProps, SessionState } from './common'; import { SendResult } from '../results'; export interface SessionWaitingForHandshakeListeners { @@ -84,7 +79,23 @@ export class SessionWaitingForHandshake< }; sendHandshake(msg: TransportMessage): SendResult { - return sendMessage(this.conn, this.codec, msg); + const buff = this.codec.toBuffer(msg); + if (!buff.ok) { + return buff; + } + + const sent = this.conn.send(buff.value); + if (!sent) { + return { + ok: false, + reason: 'failed to send handshake', + }; + } + + return { + ok: true, + value: msg.id, + }; } _handleStateExit(): void { diff --git a/transport/sessionStateMachine/common.ts b/transport/sessionStateMachine/common.ts index f97bb1f7..e4b46e23 100644 --- a/transport/sessionStateMachine/common.ts +++ b/transport/sessionStateMachine/common.ts @@ -5,13 +5,11 @@ import { PartialTransportMessage, ProtocolVersion, TransportClientId, - TransportMessage, } from '../message'; import { Codec, CodecMessageAdapter } from '../../codec'; import { generateId } from '../id'; import { Tracer } from '@opentelemetry/api'; import { EncodeResult, SendResult } from '../results'; -import { Connection } from '../connection'; export const enum SessionState { NoConnection = 'NoConnection', @@ -378,27 +376,3 @@ export abstract class IdentifiedSessionWithGracePeriod extends IdentifiedSession super._handleClose(); } } - -export function sendMessage( - conn: Connection, - codec: CodecMessageAdapter, - msg: TransportMessage, -): SendResult { - const buff = codec.toBuffer(msg); - if (!buff.ok) { - return buff; - } - - const sent = conn.send(buff.value); - if (!sent) { - return { - ok: false, - reason: 'failed to send message', - }; - } - - return { - ok: true, - value: msg.id, - }; -} diff --git a/transport/sessionStateMachine/stateMachine.test.ts b/transport/sessionStateMachine/stateMachine.test.ts index 171280e9..0d825daa 100644 --- a/transport/sessionStateMachine/stateMachine.test.ts +++ b/transport/sessionStateMachine/stateMachine.test.ts @@ -1811,6 +1811,103 @@ describe('session state machine', () => { }); }); + test('handshaking sendHandshake: codec failure does not corrupt seq and subsequent success works', async () => { + const sessionHandle = await createSessionHandshaking(); + const session = sessionHandle.session; + + // buffer some messages during handshake + session.send(payloadToTransportMessage('hello')); + session.send(payloadToTransportMessage('world')); + expect(session.seq).toBe(2); + expect(session.ack).toBe(0); + expect(session.sendBuffer.length).toBe(2); + + const msg = handshakeRequestMessage({ + from: 'from', + to: 'to', + sessionId: 'clientSessionId', + expectedSessionState: { + nextExpectedSeq: 0, + nextSentSeq: 0, + }, + }); + + // make codec.toBuffer fail + const spy = vi + .spyOn(session.codec, 'toBuffer') + .mockReturnValue({ ok: false, reason: 'encode error' }); + + const res = session.sendHandshake(msg); + expect(res.ok).toBe(false); + assert(!res.ok); + expect(res.reason).toBe('encode error'); + expect(session.conn.send).not.toHaveBeenCalled(); + + // seq/ack/sendBuffer should be unchanged + expect(session.seq).toBe(2); + expect(session.ack).toBe(0); + expect(session.sendBuffer.length).toBe(2); + + // restore codec and retry handshake + spy.mockRestore(); + const retryRes = session.sendHandshake(msg); + expect(retryRes.ok).toBe(true); + + // transition to connected and verify messages work + const connectedListeners = createSessionConnectedListeners(); + const connected = SessionStateGraph.transition.HandshakingToConnected( + session, + connectedListeners, + ); + + expect(connected.state).toBe(SessionState.Connected); + expect(connected.seq).toBe(2); + expect(connected.ack).toBe(0); + + // flush buffered messages first + const bufferRes = connected.sendBufferedMessages(); + expect(bufferRes.ok).toBe(true); + + // send a new message in connected state + const sendRes = connected.send(payloadToTransportMessage('after')); + expect(sendRes.ok).toBe(true); + expect(connected.seq).toBe(3); + // 1 handshake retry + 2 buffered + 1 new = 4 + expect(connected.conn.send).toHaveBeenCalledTimes(4); + }); + + test('pending identification sendHandshake: codec failure does not prevent subsequent success', () => { + const sessionHandle = createSessionWaitingForHandshake(); + const session = sessionHandle.session; + + const msg = handshakeRequestMessage({ + from: 'from', + to: 'to', + sessionId: 'clientSessionId', + expectedSessionState: { + nextExpectedSeq: 0, + nextSentSeq: 0, + }, + }); + + // make codec.toBuffer fail + const spy = vi + .spyOn(session.codec, 'toBuffer') + .mockReturnValue({ ok: false, reason: 'encode error' }); + + const res = session.sendHandshake(msg); + expect(res.ok).toBe(false); + assert(!res.ok); + expect(res.reason).toBe('encode error'); + expect(session.conn.send).not.toHaveBeenCalled(); + + // restore codec and retry handshake + spy.mockRestore(); + const retryRes = session.sendHandshake(msg); + expect(retryRes.ok).toBe(true); + expect(session.conn.send).toHaveBeenCalledTimes(1); + }); + test('connected event listeners: connectionErrored', async () => { const sessionHandle = await createSessionConnected(); const session = sessionHandle.session; From 98a1c052f3bb6e05624cae2c2b93f471845bf6d6 Mon Sep 17 00:00:00 2001 From: Jacky Zhao Date: Sun, 8 Mar 2026 15:47:55 -0700 Subject: [PATCH 8/8] 0.215.0 --- package-lock.json | 4 ++-- package.json | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/package-lock.json b/package-lock.json index a5692630..ace3e0bd 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@replit/river", - "version": "0.214.1", + "version": "0.215.0", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@replit/river", - "version": "0.214.1", + "version": "0.215.0", "license": "MIT", "dependencies": { "@msgpack/msgpack": "^3.1.2", diff --git a/package.json b/package.json index 2cd250f5..e1d6b519 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "@replit/river", "description": "It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!", - "version": "0.214.1", + "version": "0.215.0", "type": "module", "exports": { ".": {