Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions crates/bindings-typescript/src/sdk/client_api/v3.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

162 changes: 125 additions & 37 deletions crates/bindings-typescript/src/sdk/db_connection_impl.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { ConnectionId, ProductBuilder, ProductType } from '../';
import { AlgebraicType, type ComparablePrimitive } from '../';
import { BinaryReader } from '../';
import { BinaryWriter } from '../';
import BinaryReader from '../lib/binary_reader.ts';
import BinaryWriter from '../lib/binary_writer.ts';
import {
BsatnRowList,
ClientMessage,
Expand Down Expand Up @@ -60,6 +60,18 @@ import type { ProceduresView } from './procedures.ts';
import type { Values } from '../lib/type_util.ts';
import type { TransactionUpdate } from './client_api/types.ts';
import { InternalError, SenderError } from '../lib/errors.ts';
import {
normalizeWsProtocol,
PREFERRED_WS_PROTOCOLS,
V2_WS_PROTOCOL,
V3_WS_PROTOCOL,
type NegotiatedWsProtocol,
} from './websocket_protocols';
import {
countClientMessagesForV3Frame,
decodeServerMessagesV3,
encodeClientMessagesV3,
} from './websocket_v3_frames.ts';

export {
DbConnectionBuilder,
Expand Down Expand Up @@ -117,6 +129,9 @@ const CLIENT_MESSAGE_CALL_REDUCER_TAG =
getClientMessageVariantTag('CallReducer');
const CLIENT_MESSAGE_CALL_PROCEDURE_TAG =
getClientMessageVariantTag('CallProcedure');
// Keep individual v3 frames bounded so one burst does not monopolize the send
// path or create very large websocket writes.
const MAX_V3_OUTBOUND_FRAME_BYTES = 256 * 1024;

export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
implements DbContext<RemoteModule>
Expand Down Expand Up @@ -172,6 +187,8 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
#inboundQueueOffset = 0;
#isDrainingInboundQueue = false;
#outboundQueue: Uint8Array[] = [];
#isOutboundFlushScheduled = false;
#negotiatedWsProtocol: NegotiatedWsProtocol = V2_WS_PROTOCOL;
#subscriptionManager = new SubscriptionManager<RemoteModule>();
#remoteModule: RemoteModule;
#reducerCallbacks = new Map<
Expand All @@ -198,6 +215,7 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
#sourceNameToTableDef: Record<string, Values<RemoteModule['tables']>>;
#messageReader = new BinaryReader(new Uint8Array());
#rowListReader = new BinaryReader(new Uint8Array());
#clientFrameEncoder = new BinaryWriter(1024);
#boundSubscriptionBuilder!: () => SubscriptionBuilderImpl<RemoteModule>;
#boundDisconnect!: () => void;

Expand Down Expand Up @@ -296,7 +314,7 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
this.wsPromise = createWSFn({
url,
nameOrAddress,
wsProtocol: 'v2.bsatn.spacetimedb',
wsProtocol: [...PREFERRED_WS_PROTOCOLS],
authToken: token,
compression: compression,
lightMode: lightMode,
Expand Down Expand Up @@ -595,23 +613,87 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
}

#flushOutboundQueue(wsResolved: WebsocketAdapter): void {
if (this.#negotiatedWsProtocol === V3_WS_PROTOCOL) {
this.#flushOutboundQueueV3(wsResolved);
return;
}
this.#flushOutboundQueueV2(wsResolved);
}

#flushOutboundQueueV2(wsResolved: WebsocketAdapter): void {
const pending = this.#outboundQueue.splice(0);
for (const message of pending) {
wsResolved.send(message);
}
}

#flushOutboundQueueV3(wsResolved: WebsocketAdapter): void {
if (this.#outboundQueue.length === 0) {
return;
}

// Emit at most one bounded frame per flush. If more encoded v2 messages
// remain in the queue, they are sent by a later scheduled flush so inbound
// traffic and other tasks get a chance to run between websocket writes.
const batchSize = countClientMessagesForV3Frame(
this.#outboundQueue,
MAX_V3_OUTBOUND_FRAME_BYTES
);
const pending = this.#outboundQueue.splice(0, batchSize);
wsResolved.send(encodeClientMessagesV3(this.#clientFrameEncoder, pending));

if (this.#outboundQueue.length > 0) {
this.#scheduleDeferredOutboundFlush();
}
}

#scheduleOutboundFlush(): void {
this.#scheduleOutboundFlushWith('microtask');
}

#scheduleDeferredOutboundFlush(): void {
this.#scheduleOutboundFlushWith('next-task');
}

#scheduleOutboundFlushWith(schedule: 'microtask' | 'next-task'): void {
if (this.#isOutboundFlushScheduled) {
return;
}

this.#isOutboundFlushScheduled = true;
const flush = () => {
this.#isOutboundFlushScheduled = false;
if (this.ws && this.isActive) {
this.#flushOutboundQueue(this.ws);
}
};

// The first v3 flush stays on the current turn so same-tick sends coalesce.
// Follow-up flushes after a size-capped frame yield to the next task so we
// do not sit in a tight send loop while inbound websocket work is waiting.
if (schedule === 'next-task') {
setTimeout(flush, 0);
} else {
queueMicrotask(flush);
}
}

#reducerArgsEncoder = new BinaryWriter(1024);
#clientMessageEncoder = new BinaryWriter(1024);
#sendEncodedMessage(encoded: Uint8Array, describe: () => string): void {
stdbLogger('trace', describe);
if (this.ws && this.isActive) {
if (this.#outboundQueue.length) this.#flushOutboundQueue(this.ws);
if (this.#negotiatedWsProtocol === V2_WS_PROTOCOL) {
if (this.#outboundQueue.length) this.#flushOutboundQueue(this.ws);
this.ws.send(encoded);
return;
}

stdbLogger('trace', describe);
this.ws.send(encoded);
this.#outboundQueue.push(encoded.slice());
this.#scheduleOutboundFlush();
} else {
stdbLogger('trace', describe);
// use slice() to copy, in case the clientMessageEncoder's buffer gets used
// Use slice() to copy, in case the clientMessageEncoder's buffer gets reused
// before the connection opens or before a v3 microbatch flush runs.
this.#outboundQueue.push(encoded.slice());
}
}
Expand Down Expand Up @@ -681,6 +763,9 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
* Handles WebSocket onOpen event.
*/
#handleOnOpen(): void {
if (this.ws) {
this.#negotiatedWsProtocol = normalizeWsProtocol(this.ws.protocol);
}
this.isActive = true;
if (this.ws) {
this.#flushOutboundQueue(this.ws);
Expand Down Expand Up @@ -728,7 +813,17 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
);
}

#processMessage(data: Uint8Array): void {
#dispatchPendingCallbacks(callbacks: readonly PendingCallback[]): void {
stdbLogger(
'trace',
() => `Calling ${callbacks.length} triggered row callbacks`
);
for (const callback of callbacks) {
callback.cb();
}
}

#processV2Message(data: Uint8Array): void {
const reader = this.#messageReader;
reader.reset(data);
const serverMessage = ServerMessage.deserialize(reader);
Expand Down Expand Up @@ -769,13 +864,7 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
const callbacks = this.#applyTableUpdates(tableUpdates, eventContext);
const { event: _, ...subscriptionEventContext } = eventContext;
subscription.emitter.emit('applied', subscriptionEventContext);
stdbLogger(
'trace',
() => `Calling ${callbacks.length} triggered row callbacks`
);
for (const callback of callbacks) {
callback.cb();
}
this.#dispatchPendingCallbacks(callbacks);
break;
}
case 'UnsubscribeApplied': {
Expand All @@ -801,13 +890,7 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
const { event: _, ...subscriptionEventContext } = eventContext;
subscription.emitter.emit('end', subscriptionEventContext);
this.#subscriptionManager.subscriptions.delete(querySetId);
stdbLogger(
'trace',
() => `Calling ${callbacks.length} triggered row callbacks`
);
for (const callback of callbacks) {
callback.cb();
}
this.#dispatchPendingCallbacks(callbacks);
break;
}
case 'SubscriptionError': {
Expand Down Expand Up @@ -861,13 +944,7 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
eventContext,
serverMessage.value
);
stdbLogger(
'trace',
() => `Calling ${callbacks.length} triggered row callbacks`
);
for (const callback of callbacks) {
callback.cb();
}
this.#dispatchPendingCallbacks(callbacks);
break;
}
case 'ReducerResult': {
Expand Down Expand Up @@ -899,13 +976,7 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
eventContext,
result.value.transactionUpdate
);
stdbLogger(
'trace',
() => `Calling ${callbacks.length} triggered row callbacks`
);
for (const callback of callbacks) {
callback.cb();
}
this.#dispatchPendingCallbacks(callbacks);
}
this.#reducerCallInfo.delete(requestId);
const cb = this.#reducerCallbacks.get(requestId);
Expand Down Expand Up @@ -934,6 +1005,23 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
}
}

#processMessage(data: Uint8Array): void {
if (this.#negotiatedWsProtocol !== V3_WS_PROTOCOL) {
this.#processV2Message(data);
return;
}

const messages = decodeServerMessagesV3(this.#messageReader, data);
stdbLogger(
'trace',
() =>
`Processing server frame: ${messages.length === 1 ? 'single' : `batch(${messages.length})`}`
);
for (const message of messages) {
this.#processV2Message(message);
}
}

/**
* Handles WebSocket onMessage event.
* @param wsMessage MessageEvent object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { decompress } from './decompress';
import { resolveWS } from './ws';

export interface WebsocketAdapter {
readonly protocol: string;
send(msg: Uint8Array): void;
close(): void;

Expand All @@ -12,6 +13,10 @@ export interface WebsocketAdapter {
}

export class WebsocketDecompressAdapter implements WebsocketAdapter {
get protocol(): string {
return this.#ws.protocol;
}

set onclose(handler: (ev: CloseEvent) => void) {
this.#ws.onclose = handler;
}
Expand Down Expand Up @@ -73,7 +78,7 @@ export class WebsocketDecompressAdapter implements WebsocketAdapter {
confirmedReads,
}: {
url: URL;
wsProtocol: string;
wsProtocol: string | string[];
nameOrAddress: string;
authToken?: string;
compression: 'gzip' | 'none';
Expand Down
25 changes: 25 additions & 0 deletions crates/bindings-typescript/src/sdk/websocket_protocols.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { stdbLogger } from './logger.ts';

export const V2_WS_PROTOCOL = 'v2.bsatn.spacetimedb';
export const V3_WS_PROTOCOL = 'v3.bsatn.spacetimedb';
export const PREFERRED_WS_PROTOCOLS = [V3_WS_PROTOCOL, V2_WS_PROTOCOL] as const;

export type NegotiatedWsProtocol =
| typeof V2_WS_PROTOCOL
| typeof V3_WS_PROTOCOL;

export function normalizeWsProtocol(protocol: string): NegotiatedWsProtocol {
if (protocol === V3_WS_PROTOCOL) {
return V3_WS_PROTOCOL;
}
// We treat an empty negotiated subprotocol as legacy v2 for compatibility.
if (protocol === '' || protocol === V2_WS_PROTOCOL) {
return V2_WS_PROTOCOL;
}

stdbLogger(
'warn',
`Unexpected websocket subprotocol "${protocol}", falling back to ${V2_WS_PROTOCOL}.`
);
return V2_WS_PROTOCOL;
}
Loading
Loading