Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/bindings-typescript/src/sdk/client_api/index.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

90 changes: 71 additions & 19 deletions crates/bindings-typescript/src/sdk/db_connection_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import type { RowType, UntypedTableDef } from '../lib/table.ts';
import { toCamelCase } from '../lib/util.ts';
import type { ProceduresView } from './procedures.ts';
import type { Values } from '../lib/type_util.ts';
import type { TransactionUpdate } from './client_api/types.ts';

export {
DbConnectionBuilder,
Expand Down Expand Up @@ -154,6 +155,7 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
number,
(result: ReducerResultMessage['result']) => void
>();
#reducerCallInfo = new Map<number, { name: string; args: object }>();
#procedureCallbacks = new Map<number, ProcedureCallback>();
#rowDeserializers: Record<string, Deserializer<any>>;
#reducerArgsSerializers: Record<
Expand Down Expand Up @@ -313,7 +315,7 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
const writer = new BinaryWriter(1024);
serializeArgs(writer, params);
const argsBuffer = writer.getBuffer();
return this.callReducer(reducerName, argsBuffer);
return this.callReducer(reducerName, argsBuffer, params);
};
}

Expand Down Expand Up @@ -409,7 +411,7 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
ClientMessage.Unsubscribe({
querySetId: { id: querySetId },
requestId,
flags: UnsubscribeFlags.Default,
flags: UnsubscribeFlags.SendDroppedRows,
Copy link
Contributor

@cloutiertyler cloutiertyler Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this mean? And why are we now enabling it? I'm not so familiar with v2.

I have a vague recollection that it's required because we don't have per query storage, is that right?

})
);
}
Expand Down Expand Up @@ -460,16 +462,17 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
return rows;
}

// Take a bunch of table updates and ensure that there is at most one update per table.
#mergeTableUpdates(
updates: CacheTableUpdate<UntypedTableDef>[]
): CacheTableUpdate<UntypedTableDef>[] {
const merged = new Map<string, Operation[]>();
for (const update of updates) {
const ops = merged.get(update.tableName);
if (ops) {
ops.push(...update.operations);
for (const op of update.operations) ops.push(op);
} else {
merged.set(update.tableName, [...update.operations]);
merged.set(update.tableName, update.operations.slice());
}
}
return Array.from(merged, ([tableName, operations]) => ({
Expand Down Expand Up @@ -577,6 +580,24 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
return pendingCallbacks;
}

#applyTransactionUpdates(
eventContext: EventContextInterface<RemoteModule>,
tu: TransactionUpdate
): PendingCallback[] {
const all_updates: CacheTableUpdate<UntypedTableDef>[] = [];
Copy link
Contributor

@cloutiertyler cloutiertyler Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: use camelCase, we really ought to have a linter for this, but alas

for (const querySetUpdate of tu.querySets) {
const tableUpdates = this.#querySetUpdateToTableUpdates(querySetUpdate);
for (const update of tableUpdates) {
all_updates.push(update);
}
// TODO: When we have per-query storage, we will want to apply the per-query events here.
}
return this.#applyTableUpdates(
this.#mergeTableUpdates(all_updates),
eventContext
);
}

async #processMessage(data: Uint8Array): Promise<void> {
const serverMessage = ServerMessage.deserialize(new BinaryReader(data));
switch (serverMessage.tag) {
Expand Down Expand Up @@ -664,30 +685,44 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
case 'TransactionUpdate': {
const event: Event<never> = { tag: 'UnknownTransaction' };
const eventContext = this.#makeEventContext(event);
for (const querySetUpdate of serverMessage.value.querySets) {
const tableUpdates =
this.#querySetUpdateToTableUpdates(querySetUpdate);
const callbacks = this.#applyTableUpdates(tableUpdates, eventContext);
for (const callback of callbacks) {
callback.cb();
}
const callbacks = this.#applyTransactionUpdates(
eventContext,
serverMessage.value
);
for (const callback of callbacks) {
callback.cb();
}
break;
}
case 'ReducerResult': {
const { requestId, result } = serverMessage.value;

if (result.tag === 'Ok') {
const tableUpdates = result.value.transactionUpdate.querySets.flatMap(
qs => this.#querySetUpdateToTableUpdates(qs)
const reducerInfo = this.#reducerCallInfo.get(requestId);
const event: Event<any> = reducerInfo
? {
tag: 'Reducer',
value: {
timestamp: serverMessage.value.timestamp,
outcome: result,
reducer: {
name: reducerInfo.name,
args: reducerInfo.args,
},
},
}
: { tag: 'UnknownTransaction' };
const eventContext = this.#makeEventContext(event as any);

const callbacks = this.#applyTransactionUpdates(
eventContext,
result.value.transactionUpdate
);
const event: Event<never> = { tag: 'UnknownTransaction' };
const eventContext = this.#makeEventContext(event);
const callbacks = this.#applyTableUpdates(tableUpdates, eventContext);
for (const callback of callbacks) {
callback.cb();
}
}
this.#reducerCallInfo.delete(requestId);
const cb = this.#reducerCallbacks.get(requestId);
this.#reducerCallbacks.delete(requestId);
cb?.(result);
Expand Down Expand Up @@ -733,7 +768,11 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
* @param reducerName The name of the reducer to call
* @param argsSerializer The arguments to pass to the reducer
*/
callReducer(reducerName: string, argsBuffer: Uint8Array): Promise<void> {
callReducer(
reducerName: string,
argsBuffer: Uint8Array,
reducerArgs?: object
): Promise<void> {
const { promise, resolve, reject } = Promise.withResolvers<void>();
const requestId = this.#getNextRequestId();
const message = ClientMessage.CallReducer({
Expand All @@ -743,11 +782,24 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
flags: 0,
});
this.#sendMessage(message);
if (reducerArgs) {
this.#reducerCallInfo.set(requestId, {
name: reducerName,
args: reducerArgs,
});
}
this.#reducerCallbacks.set(requestId, result => {
if (result.tag === 'Ok' || result.tag === 'OkEmpty') {
resolve();
} else {
reject(result.value);
if (result.tag === 'Err') {
/// Interpret the user-returned error as a string.
const reader = new BinaryReader(result.value);
const errorString = reader.readString();
reject(errorString);
} else {
reject(result.value);
}
}
});
return promise;
Expand All @@ -768,7 +820,7 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
const writer = new BinaryWriter(1024);
this.#reducerArgsSerializers[reducerName].serialize(writer, params);
const argsBuffer = writer.getBuffer();
return this.callReducer(reducerName, argsBuffer);
return this.callReducer(reducerName, argsBuffer, params);
}

/**
Expand Down
11 changes: 10 additions & 1 deletion crates/bindings-typescript/src/sdk/websocket_test_adapter.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { BinaryWriter, type Infer } from '../';
import { BinaryReader, BinaryWriter, type Infer } from '../';
import ClientMessageSerde from './client_api/client_message_type';
import ServerMessage from './client_api/server_message_type';
import type { ClientMessage } from './client_api/types';

class WebsocketTestAdapter {
onclose: any;
Expand All @@ -9,14 +11,21 @@ class WebsocketTestAdapter {
onerror: any;

messageQueue: any[];
outgoingMessages: ClientMessage[];
closed: boolean;

constructor() {
this.messageQueue = [];
this.outgoingMessages = [];
this.closed = false;
}

send(message: any): void {
const parsedMessage = ClientMessageSerde.deserialize(
new BinaryReader(message)
);
this.outgoingMessages.push(parsedMessage);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're just accumulating messages here? What's the purpose of this?

// console.ClientMessageSerde.deserialize(message);
this.messageQueue.push(message);
}

Expand Down
20 changes: 16 additions & 4 deletions crates/bindings-typescript/test-app/src/module_bindings/index.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading