-
Notifications
You must be signed in to change notification settings - Fork 686
Add more tests for typescript client and fix some bugs #4307
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
59ddfc4
5793215
692198c
d92b6f4
ff77757
9578e41
98cb84f
bdfa414
bbe99d6
dac97a0
deff649
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -61,6 +61,7 @@ import type { RowType, UntypedTableDef } from '../lib/table.ts'; | |
| import { toCamelCase } from '../lib/util.ts'; | ||
| import type { ProceduresView } from './procedures.ts'; | ||
| import type { Values } from '../lib/type_util.ts'; | ||
| import type { TransactionUpdate } from './client_api/types.ts'; | ||
|
|
||
| export { | ||
| DbConnectionBuilder, | ||
|
|
@@ -154,6 +155,7 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule> | |
| number, | ||
| (result: ReducerResultMessage['result']) => void | ||
| >(); | ||
| #reducerCallInfo = new Map<number, { name: string; args: object }>(); | ||
| #procedureCallbacks = new Map<number, ProcedureCallback>(); | ||
| #rowDeserializers: Record<string, Deserializer<any>>; | ||
| #reducerArgsSerializers: Record< | ||
|
|
@@ -313,7 +315,7 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule> | |
| const writer = new BinaryWriter(1024); | ||
| serializeArgs(writer, params); | ||
| const argsBuffer = writer.getBuffer(); | ||
| return this.callReducer(reducerName, argsBuffer); | ||
| return this.callReducer(reducerName, argsBuffer, params); | ||
| }; | ||
| } | ||
|
|
||
|
|
@@ -409,7 +411,7 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule> | |
| ClientMessage.Unsubscribe({ | ||
| querySetId: { id: querySetId }, | ||
| requestId, | ||
| flags: UnsubscribeFlags.Default, | ||
| flags: UnsubscribeFlags.SendDroppedRows, | ||
| }) | ||
| ); | ||
| } | ||
|
|
@@ -460,16 +462,17 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule> | |
| return rows; | ||
| } | ||
|
|
||
| // Take a bunch of table updates and ensure that there is at most one update per table. | ||
| #mergeTableUpdates( | ||
| updates: CacheTableUpdate<UntypedTableDef>[] | ||
| ): CacheTableUpdate<UntypedTableDef>[] { | ||
| const merged = new Map<string, Operation[]>(); | ||
| for (const update of updates) { | ||
| const ops = merged.get(update.tableName); | ||
| if (ops) { | ||
| ops.push(...update.operations); | ||
| for (const op of update.operations) ops.push(op); | ||
| } else { | ||
| merged.set(update.tableName, [...update.operations]); | ||
| merged.set(update.tableName, update.operations.slice()); | ||
| } | ||
| } | ||
| return Array.from(merged, ([tableName, operations]) => ({ | ||
|
|
@@ -577,6 +580,24 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule> | |
| return pendingCallbacks; | ||
| } | ||
|
|
||
| #applyTransactionUpdates( | ||
| eventContext: EventContextInterface<RemoteModule>, | ||
| tu: TransactionUpdate | ||
| ): PendingCallback[] { | ||
| const all_updates: CacheTableUpdate<UntypedTableDef>[] = []; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NIT: use |
||
| for (const querySetUpdate of tu.querySets) { | ||
| const tableUpdates = this.#querySetUpdateToTableUpdates(querySetUpdate); | ||
| for (const update of tableUpdates) { | ||
| all_updates.push(update); | ||
| } | ||
| // TODO: When we have per-query storage, we will want to apply the per-query events here. | ||
| } | ||
| return this.#applyTableUpdates( | ||
| this.#mergeTableUpdates(all_updates), | ||
| eventContext | ||
| ); | ||
| } | ||
|
|
||
| async #processMessage(data: Uint8Array): Promise<void> { | ||
| const serverMessage = ServerMessage.deserialize(new BinaryReader(data)); | ||
| switch (serverMessage.tag) { | ||
|
|
@@ -664,30 +685,44 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule> | |
| case 'TransactionUpdate': { | ||
| const event: Event<never> = { tag: 'UnknownTransaction' }; | ||
| const eventContext = this.#makeEventContext(event); | ||
| for (const querySetUpdate of serverMessage.value.querySets) { | ||
| const tableUpdates = | ||
| this.#querySetUpdateToTableUpdates(querySetUpdate); | ||
| const callbacks = this.#applyTableUpdates(tableUpdates, eventContext); | ||
| for (const callback of callbacks) { | ||
| callback.cb(); | ||
| } | ||
| const callbacks = this.#applyTransactionUpdates( | ||
| eventContext, | ||
| serverMessage.value | ||
| ); | ||
| for (const callback of callbacks) { | ||
| callback.cb(); | ||
| } | ||
| break; | ||
| } | ||
| case 'ReducerResult': { | ||
| const { requestId, result } = serverMessage.value; | ||
|
|
||
| if (result.tag === 'Ok') { | ||
| const tableUpdates = result.value.transactionUpdate.querySets.flatMap( | ||
| qs => this.#querySetUpdateToTableUpdates(qs) | ||
| const reducerInfo = this.#reducerCallInfo.get(requestId); | ||
| const event: Event<any> = reducerInfo | ||
| ? { | ||
| tag: 'Reducer', | ||
| value: { | ||
| timestamp: serverMessage.value.timestamp, | ||
| outcome: result, | ||
| reducer: { | ||
| name: reducerInfo.name, | ||
| args: reducerInfo.args, | ||
| }, | ||
| }, | ||
| } | ||
| : { tag: 'UnknownTransaction' }; | ||
| const eventContext = this.#makeEventContext(event as any); | ||
|
|
||
| const callbacks = this.#applyTransactionUpdates( | ||
| eventContext, | ||
| result.value.transactionUpdate | ||
| ); | ||
| const event: Event<never> = { tag: 'UnknownTransaction' }; | ||
| const eventContext = this.#makeEventContext(event); | ||
| const callbacks = this.#applyTableUpdates(tableUpdates, eventContext); | ||
| for (const callback of callbacks) { | ||
| callback.cb(); | ||
| } | ||
| } | ||
| this.#reducerCallInfo.delete(requestId); | ||
| const cb = this.#reducerCallbacks.get(requestId); | ||
| this.#reducerCallbacks.delete(requestId); | ||
| cb?.(result); | ||
|
|
@@ -733,7 +768,11 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule> | |
| * @param reducerName The name of the reducer to call | ||
| * @param argsSerializer The arguments to pass to the reducer | ||
| */ | ||
| callReducer(reducerName: string, argsBuffer: Uint8Array): Promise<void> { | ||
| callReducer( | ||
| reducerName: string, | ||
| argsBuffer: Uint8Array, | ||
| reducerArgs?: object | ||
| ): Promise<void> { | ||
| const { promise, resolve, reject } = Promise.withResolvers<void>(); | ||
| const requestId = this.#getNextRequestId(); | ||
| const message = ClientMessage.CallReducer({ | ||
|
|
@@ -743,11 +782,24 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule> | |
| flags: 0, | ||
| }); | ||
| this.#sendMessage(message); | ||
| if (reducerArgs) { | ||
| this.#reducerCallInfo.set(requestId, { | ||
| name: reducerName, | ||
| args: reducerArgs, | ||
| }); | ||
| } | ||
| this.#reducerCallbacks.set(requestId, result => { | ||
| if (result.tag === 'Ok' || result.tag === 'OkEmpty') { | ||
| resolve(); | ||
| } else { | ||
| reject(result.value); | ||
| if (result.tag === 'Err') { | ||
| /// Interpret the user-returned error as a string. | ||
| const reader = new BinaryReader(result.value); | ||
| const errorString = reader.readString(); | ||
| reject(errorString); | ||
| } else { | ||
| reject(result.value); | ||
| } | ||
| } | ||
| }); | ||
| return promise; | ||
|
|
@@ -768,7 +820,7 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule> | |
| const writer = new BinaryWriter(1024); | ||
| this.#reducerArgsSerializers[reducerName].serialize(writer, params); | ||
| const argsBuffer = writer.getBuffer(); | ||
| return this.callReducer(reducerName, argsBuffer); | ||
| return this.callReducer(reducerName, argsBuffer, params); | ||
| } | ||
|
|
||
| /** | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,7 @@ | ||
| import { BinaryWriter, type Infer } from '../'; | ||
| import { BinaryReader, BinaryWriter, type Infer } from '../'; | ||
| import ClientMessageSerde from './client_api/client_message_type'; | ||
| import ServerMessage from './client_api/server_message_type'; | ||
| import type { ClientMessage } from './client_api/types'; | ||
|
|
||
| class WebsocketTestAdapter { | ||
| onclose: any; | ||
|
|
@@ -9,14 +11,21 @@ class WebsocketTestAdapter { | |
| onerror: any; | ||
|
|
||
| messageQueue: any[]; | ||
| outgoingMessages: ClientMessage[]; | ||
| closed: boolean; | ||
|
|
||
| constructor() { | ||
| this.messageQueue = []; | ||
| this.outgoingMessages = []; | ||
| this.closed = false; | ||
| } | ||
|
|
||
| send(message: any): void { | ||
| const parsedMessage = ClientMessageSerde.deserialize( | ||
| new BinaryReader(message) | ||
| ); | ||
| this.outgoingMessages.push(parsedMessage); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We're just accumulating messages here? What's the purpose of this? |
||
| // console.ClientMessageSerde.deserialize(message); | ||
| this.messageQueue.push(message); | ||
| } | ||
|
|
||
|
|
||
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this mean? And why are we now enabling it? I'm not so familiar with v2.
I have a vague recollection that it's required because we don't have per query storage, is that right?