From d494169fdd1222ad87d15ce3b7f5668cb257ab13 Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Wed, 8 Apr 2026 13:34:58 -0700 Subject: [PATCH 1/2] v3 websocket protocol --- crates/client-api-messages/DEVELOP.md | 9 ++ .../examples/get_ws_schema_v3.rs | 13 ++ crates/client-api-messages/src/websocket.rs | 1 + .../client-api-messages/src/websocket/v3.rs | 28 ++++ crates/client-api/src/routes/subscribe.rs | 132 +++++++++++++----- crates/core/src/client.rs | 1 + crates/core/src/client/client_connection.rs | 3 +- crates/core/src/client/message_handlers.rs | 1 + crates/core/src/client/message_handlers_v2.rs | 8 ++ crates/core/src/client/message_handlers_v3.rs | 32 +++++ crates/core/src/client/messages.rs | 62 +++++--- .../subscription/module_subscription_actor.rs | 2 +- 12 files changed, 237 insertions(+), 55 deletions(-) create mode 100644 crates/client-api-messages/examples/get_ws_schema_v3.rs create mode 100644 crates/client-api-messages/src/websocket/v3.rs create mode 100644 crates/core/src/client/message_handlers_v3.rs diff --git a/crates/client-api-messages/DEVELOP.md b/crates/client-api-messages/DEVELOP.md index 47868d4d3ce..48341bd80aa 100644 --- a/crates/client-api-messages/DEVELOP.md +++ b/crates/client-api-messages/DEVELOP.md @@ -19,3 +19,12 @@ spacetime generate -p spacetimedb-cli --lang \ --out-dir \ --module-def ws_schema_v2.json ``` + +For the v3 WebSocket transport schema: + +```sh +cargo run --example get_ws_schema_v3 > ws_schema_v3.json +spacetime generate -p spacetimedb-cli --lang \ + --out-dir \ + --module-def ws_schema_v3.json +``` diff --git a/crates/client-api-messages/examples/get_ws_schema_v3.rs b/crates/client-api-messages/examples/get_ws_schema_v3.rs new file mode 100644 index 00000000000..b4a752a5664 --- /dev/null +++ b/crates/client-api-messages/examples/get_ws_schema_v3.rs @@ -0,0 +1,13 @@ +use spacetimedb_client_api_messages::websocket::v3::{ClientFrame, ServerFrame}; +use spacetimedb_lib::ser::serde::SerializeWrapper; +use spacetimedb_lib::{RawModuleDef, RawModuleDefV8}; + +fn main() -> Result<(), serde_json::Error> { + let module = RawModuleDefV8::with_builder(|module| { + module.add_type::(); + module.add_type::(); + }); + let module = RawModuleDef::V8BackCompat(module); + + serde_json::to_writer(std::io::stdout().lock(), SerializeWrapper::from_ref(&module)) +} diff --git a/crates/client-api-messages/src/websocket.rs b/crates/client-api-messages/src/websocket.rs index 0935d2e3c55..14ec394670f 100644 --- a/crates/client-api-messages/src/websocket.rs +++ b/crates/client-api-messages/src/websocket.rs @@ -17,3 +17,4 @@ pub mod common; pub mod v1; pub mod v2; +pub mod v3; diff --git a/crates/client-api-messages/src/websocket/v3.rs b/crates/client-api-messages/src/websocket/v3.rs new file mode 100644 index 00000000000..5be37768299 --- /dev/null +++ b/crates/client-api-messages/src/websocket/v3.rs @@ -0,0 +1,28 @@ +use bytes::Bytes; +pub use spacetimedb_sats::SpacetimeType; + +pub const BIN_PROTOCOL: &str = "v3.bsatn.spacetimedb"; + +/// Transport envelopes sent by the client over the v3 websocket protocol. +/// +/// The inner bytes are BSATN-encoded v2 [`crate::websocket::v2::ClientMessage`] values. +#[derive(SpacetimeType, Debug)] +#[sats(crate = spacetimedb_lib)] +pub enum ClientFrame { + /// A single logical client message. + Single(Bytes), + /// Multiple logical client messages that should be processed in-order. + Batch(Box<[Bytes]>), +} + +/// Transport envelopes sent by the server over the v3 websocket protocol. +/// +/// The inner bytes are BSATN-encoded v2 [`crate::websocket::v2::ServerMessage`] values. +#[derive(SpacetimeType, Debug)] +#[sats(crate = spacetimedb_lib)] +pub enum ServerFrame { + /// A single logical server message. + Single(Bytes), + /// Multiple logical server messages that should be processed in-order. + Batch(Box<[Bytes]>), +} diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index d1bb1d2b11f..868e7425eda 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -23,8 +23,8 @@ use prometheus::{Histogram, IntGauge}; use scopeguard::{defer, ScopeGuard}; use serde::Deserialize; use spacetimedb::client::messages::{ - serialize, serialize_v2, IdentityTokenMessage, InUseSerializeBuffer, SerializeBuffer, SwitchedServerMessage, - ToProtocol, + serialize, serialize_v2, serialize_v3, IdentityTokenMessage, InUseSerializeBuffer, SerializeBuffer, + SwitchedServerMessage, ToProtocol, }; use spacetimedb::client::{ ClientActorId, ClientConfig, ClientConnection, ClientConnectionReceiver, DataMessage, MessageExecutionError, @@ -38,6 +38,7 @@ use spacetimedb::worker_metrics::WORKER_METRICS; use spacetimedb::Identity; use spacetimedb_client_api_messages::websocket::v1 as ws_v1; use spacetimedb_client_api_messages::websocket::v2 as ws_v2; +use spacetimedb_client_api_messages::websocket::v3 as ws_v3; use spacetimedb_datastore::execution_context::WorkloadType; use spacetimedb_lib::connection_id::{ConnectionId, ConnectionIdForUrl}; use tokio::sync::{mpsc, watch}; @@ -62,6 +63,8 @@ pub const TEXT_PROTOCOL: HeaderValue = HeaderValue::from_static(ws_v1::TEXT_PROT pub const BIN_PROTOCOL: HeaderValue = HeaderValue::from_static(ws_v1::BIN_PROTOCOL); #[allow(clippy::declare_interior_mutable_const)] pub const V2_BIN_PROTOCOL: HeaderValue = HeaderValue::from_static(ws_v2::BIN_PROTOCOL); +#[allow(clippy::declare_interior_mutable_const)] +pub const V3_BIN_PROTOCOL: HeaderValue = HeaderValue::from_static(ws_v3::BIN_PROTOCOL); pub trait HasWebSocketOptions { fn websocket_options(&self) -> WebSocketOptions; @@ -101,7 +104,7 @@ fn resolve_confirmed_reads_default(version: WsVersion, confirmed: Option) } match version { WsVersion::V1 => false, - WsVersion::V2 => crate::DEFAULT_CONFIRMED_READS, + WsVersion::V2 | WsVersion::V3 => crate::DEFAULT_CONFIRMED_READS, } } @@ -151,6 +154,13 @@ where } let (res, ws_upgrade, protocol) = ws.select_protocol([ + ( + V3_BIN_PROTOCOL, + NegotiatedProtocol { + protocol: Protocol::Binary, + version: WsVersion::V3, + }, + ), ( V2_BIN_PROTOCOL, NegotiatedProtocol { @@ -284,7 +294,7 @@ where }; client.send_message(None, OutboundMessage::V1(message.into())) } - WsVersion::V2 => { + WsVersion::V2 | WsVersion::V3 => { let message = ws_v2::ServerMessage::InitialConnection(ws_v2::InitialConnection { identity: client_identity, connection_id, @@ -1293,10 +1303,15 @@ async fn ws_encode_task( // copied to the wire. Since we don't know when that will happen, we prepare // for a few messages to be in-flight, i.e. encoded but not yet sent. const BUF_POOL_CAPACITY: usize = 16; + let binary_message_serializer = match config.version { + WsVersion::V1 => None, + WsVersion::V2 => Some(serialize_v2 as BinarySerializeFn), + WsVersion::V3 => Some(serialize_v3 as BinarySerializeFn), + }; let buf_pool = ArrayQueue::new(BUF_POOL_CAPACITY); let mut in_use_bufs: Vec> = Vec::with_capacity(BUF_POOL_CAPACITY); - while let Some(message) = messages.recv().await { + 'send: while let Some(message) = messages.recv().await { // Drop serialize buffers with no external referent, // returning them to the pool. in_use_bufs.retain(|in_use| !in_use.is_unique()); @@ -1306,16 +1321,22 @@ async fn ws_encode_task( let in_use_buf = match message { OutboundWsMessage::Error(message) => { - if config.version == WsVersion::V2 { - log::error!("dropping v1 error message sent to a v2 client: {:?}", message); + if config.version != WsVersion::V1 { + log::error!( + "dropping v1 error message sent to a binary websocket client: {:?}", + message + ); continue; } - let (stats, in_use, mut frames) = ws_encode_message(config, buf, message, false, &bsatn_rlb_pool).await; - metrics.report(None, None, stats); - if frames.try_for_each(|frame| outgoing_frames.send(frame)).is_err() { - break; - } - + let Ok(in_use) = ws_forward_frames( + &metrics, + &outgoing_frames, + None, + None, + ws_encode_message(config, buf, message, false, &bsatn_rlb_pool).await, + ) else { + break 'send; + }; in_use } OutboundWsMessage::Message(message) => { @@ -1323,38 +1344,47 @@ async fn ws_encode_task( let num_rows = message.num_rows(); match message { OutboundMessage::V2(server_message) => { - if config.version != WsVersion::V2 { + if config.version == WsVersion::V1 { log::error!("dropping v2 message on v1 connection"); continue; } - let (stats, in_use, mut frames) = - ws_encode_message_v2(config, buf, server_message, false, &bsatn_rlb_pool).await; - metrics.report(workload, num_rows, stats); - if frames.try_for_each(|frame| outgoing_frames.send(frame)).is_err() { - break; - } - + let Ok(in_use) = ws_forward_frames( + &metrics, + &outgoing_frames, + workload, + num_rows, + ws_encode_binary_message( + config, + buf, + server_message, + binary_message_serializer.expect("v2 message should not be sent on a v1 connection"), + false, + &bsatn_rlb_pool, + ) + .await, + ) else { + break 'send; + }; in_use } OutboundMessage::V1(message) => { - if config.version == WsVersion::V2 { - log::error!( - "dropping v1 message for v2 connection until v2 serialization is implemented: {:?}", - message - ); + if config.version != WsVersion::V1 { + log::error!("dropping v1 message for a binary websocket connection: {:?}", message); continue; } let is_large = num_rows.is_some_and(|n| n > 1024); - let (stats, in_use, mut frames) = - ws_encode_message(config, buf, message, is_large, &bsatn_rlb_pool).await; - metrics.report(workload, num_rows, stats); - if frames.try_for_each(|frame| outgoing_frames.send(frame)).is_err() { - break; - } - + let Ok(in_use) = ws_forward_frames( + &metrics, + &outgoing_frames, + workload, + num_rows, + ws_encode_message(config, buf, message, is_large, &bsatn_rlb_pool).await, + ) else { + break 'send; + }; in_use } } @@ -1370,6 +1400,24 @@ async fn ws_encode_task( } } +/// Reports encode metrics for an already-encoded message and forwards all of +/// its frames to the websocket send task. +fn ws_forward_frames( + metrics: &SendMetrics, + outgoing_frames: &mpsc::UnboundedSender, + workload: Option, + num_rows: Option, + encoded: (EncodeMetrics, InUseSerializeBuffer, I), +) -> Result> +where + I: Iterator, +{ + let (stats, in_use, frames) = encoded; + metrics.report(workload, num_rows, stats); + frames.into_iter().try_for_each(|frame| outgoing_frames.send(frame))?; + Ok(in_use) +} + /// Some stats about serialization and compression. /// /// Returned by [`ws_encode_message`]. @@ -1443,21 +1491,29 @@ async fn ws_encode_message( (metrics, msg_alloc, frames) } -#[allow(dead_code, unused_variables)] -async fn ws_encode_message_v2( +type BinarySerializeFn = fn( + &BsatnRowListBuilderPool, + SerializeBuffer, + ws_v2::ServerMessage, + ws_v1::Compression, +) -> (InUseSerializeBuffer, Bytes); + +async fn ws_encode_binary_message( config: ClientConfig, buf: SerializeBuffer, message: ws_v2::ServerMessage, + serialize_message: BinarySerializeFn, is_large_message: bool, bsatn_rlb_pool: &BsatnRowListBuilderPool, ) -> (EncodeMetrics, InUseSerializeBuffer, impl Iterator + use<>) { let start = Instant::now(); + let compression = config.compression; let (in_use, data) = if is_large_message { let bsatn_rlb_pool = bsatn_rlb_pool.clone(); - spawn_rayon(move || serialize_v2(&bsatn_rlb_pool, buf, message, config.compression)).await + spawn_rayon(move || serialize_message(&bsatn_rlb_pool, buf, message, compression)).await } else { - serialize_v2(bsatn_rlb_pool, buf, message, config.compression) + serialize_message(bsatn_rlb_pool, buf, message, compression) }; let metrics = EncodeMetrics { @@ -2298,9 +2354,11 @@ mod tests { #[test] fn confirmed_reads_default_depends_on_ws_version() { + assert!(resolve_confirmed_reads_default(WsVersion::V3, None)); assert!(resolve_confirmed_reads_default(WsVersion::V2, None)); assert!(!resolve_confirmed_reads_default(WsVersion::V1, None)); assert!(resolve_confirmed_reads_default(WsVersion::V1, Some(true))); + assert!(!resolve_confirmed_reads_default(WsVersion::V3, Some(false))); assert!(!resolve_confirmed_reads_default(WsVersion::V2, Some(false))); } diff --git a/crates/core/src/client.rs b/crates/core/src/client.rs index cad4f79adcf..4411192c625 100644 --- a/crates/core/src/client.rs +++ b/crates/core/src/client.rs @@ -7,6 +7,7 @@ pub mod consume_each_list; mod message_handlers; mod message_handlers_v1; mod message_handlers_v2; +mod message_handlers_v3; pub mod messages; pub use client_connection::{ diff --git a/crates/core/src/client/client_connection.rs b/crates/core/src/client/client_connection.rs index 6fb8d8e1623..0a7a7f1a11b 100644 --- a/crates/core/src/client/client_connection.rs +++ b/crates/core/src/client/client_connection.rs @@ -47,6 +47,7 @@ pub enum Protocol { pub enum WsVersion { V1, V2, + V3, } impl Protocol { @@ -384,7 +385,7 @@ impl ClientConnectionSender { debug_assert!( matches!( (&self.config.version, &message), - (WsVersion::V1, OutboundMessage::V1(_)) | (WsVersion::V2, OutboundMessage::V2(_)) + (WsVersion::V1, OutboundMessage::V1(_)) | (WsVersion::V2 | WsVersion::V3, OutboundMessage::V2(_)) ), "attempted to send message variant that does not match client websocket version" ); diff --git a/crates/core/src/client/message_handlers.rs b/crates/core/src/client/message_handlers.rs index 76f5fa53afa..fb85730c11c 100644 --- a/crates/core/src/client/message_handlers.rs +++ b/crates/core/src/client/message_handlers.rs @@ -23,5 +23,6 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst match client.config.version { WsVersion::V1 => super::message_handlers_v1::handle(client, message, timer).await, WsVersion::V2 => super::message_handlers_v2::handle(client, message, timer).await, + WsVersion::V3 => super::message_handlers_v3::handle(client, message, timer).await, } } diff --git a/crates/core/src/client/message_handlers_v2.rs b/crates/core/src/client/message_handlers_v2.rs index 5dd2f80d01b..2db523e472d 100644 --- a/crates/core/src/client/message_handlers_v2.rs +++ b/crates/core/src/client/message_handlers_v2.rs @@ -20,6 +20,14 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst ))) } }; + handle_decoded_message(client, message, timer).await +} + +pub(super) async fn handle_decoded_message( + client: &ClientConnection, + message: ws_v2::ClientMessage, + timer: Instant, +) -> Result<(), MessageHandleError> { let module = client.module(); let mod_info = module.info(); let mod_metrics = &mod_info.metrics; diff --git a/crates/core/src/client/message_handlers_v3.rs b/crates/core/src/client/message_handlers_v3.rs new file mode 100644 index 00000000000..696e7337ed0 --- /dev/null +++ b/crates/core/src/client/message_handlers_v3.rs @@ -0,0 +1,32 @@ +use super::{ClientConnection, DataMessage, MessageHandleError}; +use serde::de::Error as _; +use spacetimedb_client_api_messages::websocket::{v2 as ws_v2, v3 as ws_v3}; +use spacetimedb_lib::bsatn; +use std::time::Instant; + +pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Instant) -> Result<(), MessageHandleError> { + client.observe_websocket_request_message(&message); + let frame = match message { + DataMessage::Binary(message_buf) => bsatn::from_slice::(&message_buf)?, + DataMessage::Text(_) => { + return Err(MessageHandleError::TextDecode(serde_json::Error::custom( + "v3 websocket does not support text messages", + ))) + } + }; + + match frame { + ws_v3::ClientFrame::Single(message) => { + let message = bsatn::from_slice::(&message)?; + super::message_handlers_v2::handle_decoded_message(client, message, timer).await?; + } + ws_v3::ClientFrame::Batch(messages) => { + for message in messages { + let message = bsatn::from_slice::(&message)?; + super::message_handlers_v2::handle_decoded_message(client, message, timer).await?; + } + } + } + + Ok(()) +} diff --git a/crates/core/src/client/messages.rs b/crates/core/src/client/messages.rs index ed65e092d0e..38c5fadb260 100644 --- a/crates/core/src/client/messages.rs +++ b/crates/core/src/client/messages.rs @@ -10,6 +10,7 @@ use derive_more::From; use spacetimedb_client_api_messages::websocket::common::{self as ws_common, RowListLen as _}; use spacetimedb_client_api_messages::websocket::v1::{self as ws_v1}; use spacetimedb_client_api_messages::websocket::v2 as ws_v2; +use spacetimedb_client_api_messages::websocket::v3 as ws_v3; use spacetimedb_datastore::execution_context::WorkloadType; use spacetimedb_lib::identity::RequestId; use spacetimedb_lib::ser::serde::SerializeWrapper; @@ -97,6 +98,20 @@ impl SerializeBuffer { } } +fn finalize_binary_serialize_buffer( + buffer: SerializeBuffer, + uncompressed_len: usize, + compression: ws_v1::Compression, +) -> (InUseSerializeBuffer, Bytes) { + match decide_compression(uncompressed_len, compression) { + ws_v1::Compression::None => buffer.uncompressed(), + ws_v1::Compression::Brotli => { + buffer.compress_with_tag(ws_common::SERVER_MSG_COMPRESSION_TAG_BROTLI, brotli_compress) + } + ws_v1::Compression::Gzip => buffer.compress_with_tag(ws_common::SERVER_MSG_COMPRESSION_TAG_GZIP, gzip_compress), + } +} + type BytesMutWriter<'a> = bytes::buf::Writer<&'a mut BytesMut>; pub enum InUseSerializeBuffer { @@ -159,21 +174,14 @@ pub fn serialize( let srv_msg = buffer.write_with_tag(ws_common::SERVER_MSG_COMPRESSION_TAG_NONE, |w| { bsatn::to_writer(w.into_inner(), &msg).unwrap() }); + let srv_msg_len = srv_msg.len(); // At this point, we no longer have a use for `msg`, // so try to reclaim its buffers. msg.consume_each_list(&mut |buffer| bsatn_rlb_pool.try_put(buffer)); // Conditionally compress the message. - let (in_use, msg_bytes) = match decide_compression(srv_msg.len(), config.compression) { - ws_v1::Compression::None => buffer.uncompressed(), - ws_v1::Compression::Brotli => { - buffer.compress_with_tag(ws_common::SERVER_MSG_COMPRESSION_TAG_BROTLI, brotli_compress) - } - ws_v1::Compression::Gzip => { - buffer.compress_with_tag(ws_common::SERVER_MSG_COMPRESSION_TAG_GZIP, gzip_compress) - } - }; + let (in_use, msg_bytes) = finalize_binary_serialize_buffer(buffer, srv_msg_len, config.compression); (in_use, msg_bytes.into()) } } @@ -192,18 +200,40 @@ pub fn serialize_v2( let srv_msg = buffer.write_with_tag(ws_common::SERVER_MSG_COMPRESSION_TAG_NONE, |w| { bsatn::to_writer(w.into_inner(), &msg).expect("should be able to bsatn encode v2 message"); }); + let srv_msg_len = srv_msg.len(); // At this point, we no longer have a use for `msg`, // so try to reclaim its buffers. msg.consume_each_list(&mut |buffer| bsatn_rlb_pool.try_put(buffer)); - match decide_compression(srv_msg.len(), compression) { - ws_v1::Compression::None => buffer.uncompressed(), - ws_v1::Compression::Brotli => { - buffer.compress_with_tag(ws_common::SERVER_MSG_COMPRESSION_TAG_BROTLI, brotli_compress) - } - ws_v1::Compression::Gzip => buffer.compress_with_tag(ws_common::SERVER_MSG_COMPRESSION_TAG_GZIP, gzip_compress), - } + finalize_binary_serialize_buffer(buffer, srv_msg_len, compression) +} + +/// Serialize `msg` into a [`DataMessage`] containing a [`ws_v3::ServerFrame::Single`] +/// whose payload is a BSATN-encoded [`ws_v2::ServerMessage`]. +/// +/// This mirrors the v2 framing by prepending the compression tag and applying +/// conditional compression when configured. +pub fn serialize_v3( + bsatn_rlb_pool: &BsatnRowListBuilderPool, + mut buffer: SerializeBuffer, + msg: ws_v2::ServerMessage, + compression: ws_v1::Compression, +) -> (InUseSerializeBuffer, Bytes) { + let mut inner = BytesMut::with_capacity(SERIALIZE_BUFFER_INIT_CAP); + bsatn::to_writer((&mut inner).writer().into_inner(), &msg).expect("should be able to bsatn encode v2 message"); + + // At this point, we no longer have a use for `msg`, + // so try to reclaim its buffers. + msg.consume_each_list(&mut |buffer| bsatn_rlb_pool.try_put(buffer)); + + let frame = ws_v3::ServerFrame::Single(inner.freeze()); + let srv_msg = buffer.write_with_tag(ws_common::SERVER_MSG_COMPRESSION_TAG_NONE, |w| { + bsatn::to_writer(w.into_inner(), &frame).expect("should be able to bsatn encode v3 server frame"); + }); + let srv_msg_len = srv_msg.len(); + + finalize_binary_serialize_buffer(buffer, srv_msg_len, compression) } #[derive(Debug, From)] diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 92f296f3b8c..4ab8b0c28a7 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -1639,7 +1639,7 @@ impl ModuleSubscriptions { message, ); } - WsVersion::V2 => { + WsVersion::V2 | WsVersion::V3 => { if let Some(request_id) = event.request_id { self.send_reducer_failure_result_v2(client, &event, request_id); } From af5da7bf0b4f15c94cb42807e460cdd3d5e9b992 Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Sun, 12 Apr 2026 06:03:22 -0700 Subject: [PATCH 2/2] Update typescript sdk to use v3 websocket api (#4762) # Description of Changes Adds TypeScript SDK support for the `v3.bsatn.spacetimedb` websocket API which was added in #4761. `v3` just adds batching on top of `v2`, so adding support for it just required the following: 1. The client negotiates the protocol with the server and falls back to `v2` if necessary 2. When `v3` is negotiated, same-tick outbound client messages are batched into a single ws frame 3. Inbound `v3` server frames are unwrapped into their inner v2 messages and processed in-order Some notes on the batching behavior: 1. Outbound `v3` frames are capped at `256 KiB` (unless a single message exceeds the limit) 2. The SDK sends one v3 frame per flush 3. If more messages remain queued after hitting the cap, it schedules a follow-up flush on a later task instead of draining in a tight loop, so that outbound ws work doesn't starve inbound ws work or any other event-loop tasks # API and ABI breaking changes None, `v2` fallback remains supported, so existing servers that only negotiate `v2` still work with the updated client. # Expected complexity level and risk 3 Constraints that must hold: 1. Same-tick sends should coalesce under `v3` 2. `v2` should remain unaffected 3. Capped batching should not starve inbound processing 4. Server frames containing multiple logical messages must be processed in-order 5. Handshake fallback to `v2` must remain correct # Testing Added tests for the fast path and fallback behavior --- .../src/sdk/client_api/v3.ts | 15 ++ .../src/sdk/db_connection_impl.ts | 162 ++++++++++++++---- .../src/sdk/websocket_decompress_adapter.ts | 7 +- .../src/sdk/websocket_protocols.ts | 25 +++ .../src/sdk/websocket_test_adapter.ts | 82 +++++++-- .../src/sdk/websocket_v3_frames.ts | 109 ++++++++++++ .../tests/db_connection.test.ts | 67 ++++++++ .../tests/websocket_v3_frames.test.ts | 25 +++ 8 files changed, 437 insertions(+), 55 deletions(-) create mode 100644 crates/bindings-typescript/src/sdk/client_api/v3.ts create mode 100644 crates/bindings-typescript/src/sdk/websocket_protocols.ts create mode 100644 crates/bindings-typescript/src/sdk/websocket_v3_frames.ts create mode 100644 crates/bindings-typescript/tests/websocket_v3_frames.test.ts diff --git a/crates/bindings-typescript/src/sdk/client_api/v3.ts b/crates/bindings-typescript/src/sdk/client_api/v3.ts new file mode 100644 index 00000000000..296080164ad --- /dev/null +++ b/crates/bindings-typescript/src/sdk/client_api/v3.ts @@ -0,0 +1,15 @@ +/* eslint-disable */ +/* tslint:disable */ +import { t as __t, type Infer as __Infer } from '../../lib/type_builders'; + +export const ClientFrame = __t.enum('ClientFrame', { + Single: __t.byteArray(), + Batch: __t.array(__t.byteArray()), +}); +export type ClientFrame = __Infer; + +export const ServerFrame = __t.enum('ServerFrame', { + Single: __t.byteArray(), + Batch: __t.array(__t.byteArray()), +}); +export type ServerFrame = __Infer; diff --git a/crates/bindings-typescript/src/sdk/db_connection_impl.ts b/crates/bindings-typescript/src/sdk/db_connection_impl.ts index b873b30bff6..da73546a6b9 100644 --- a/crates/bindings-typescript/src/sdk/db_connection_impl.ts +++ b/crates/bindings-typescript/src/sdk/db_connection_impl.ts @@ -1,7 +1,7 @@ import { ConnectionId, ProductBuilder, ProductType } from '../'; import { AlgebraicType, type ComparablePrimitive } from '../'; -import { BinaryReader } from '../'; -import { BinaryWriter } from '../'; +import BinaryReader from '../lib/binary_reader.ts'; +import BinaryWriter from '../lib/binary_writer.ts'; import { BsatnRowList, ClientMessage, @@ -60,6 +60,18 @@ import type { ProceduresView } from './procedures.ts'; import type { Values } from '../lib/type_util.ts'; import type { TransactionUpdate } from './client_api/types.ts'; import { InternalError, SenderError } from '../lib/errors.ts'; +import { + normalizeWsProtocol, + PREFERRED_WS_PROTOCOLS, + V2_WS_PROTOCOL, + V3_WS_PROTOCOL, + type NegotiatedWsProtocol, +} from './websocket_protocols'; +import { + countClientMessagesForV3Frame, + decodeServerMessagesV3, + encodeClientMessagesV3, +} from './websocket_v3_frames.ts'; export { DbConnectionBuilder, @@ -117,6 +129,9 @@ const CLIENT_MESSAGE_CALL_REDUCER_TAG = getClientMessageVariantTag('CallReducer'); const CLIENT_MESSAGE_CALL_PROCEDURE_TAG = getClientMessageVariantTag('CallProcedure'); +// Keep individual v3 frames bounded so one burst does not monopolize the send +// path or create very large websocket writes. +const MAX_V3_OUTBOUND_FRAME_BYTES = 256 * 1024; export class DbConnectionImpl implements DbContext @@ -172,6 +187,8 @@ export class DbConnectionImpl #inboundQueueOffset = 0; #isDrainingInboundQueue = false; #outboundQueue: Uint8Array[] = []; + #isOutboundFlushScheduled = false; + #negotiatedWsProtocol: NegotiatedWsProtocol = V2_WS_PROTOCOL; #subscriptionManager = new SubscriptionManager(); #remoteModule: RemoteModule; #reducerCallbacks = new Map< @@ -198,6 +215,7 @@ export class DbConnectionImpl #sourceNameToTableDef: Record>; #messageReader = new BinaryReader(new Uint8Array()); #rowListReader = new BinaryReader(new Uint8Array()); + #clientFrameEncoder = new BinaryWriter(1024); #boundSubscriptionBuilder!: () => SubscriptionBuilderImpl; #boundDisconnect!: () => void; @@ -296,7 +314,7 @@ export class DbConnectionImpl this.wsPromise = createWSFn({ url, nameOrAddress, - wsProtocol: 'v2.bsatn.spacetimedb', + wsProtocol: [...PREFERRED_WS_PROTOCOLS], authToken: token, compression: compression, lightMode: lightMode, @@ -595,23 +613,87 @@ export class DbConnectionImpl } #flushOutboundQueue(wsResolved: WebsocketAdapter): void { + if (this.#negotiatedWsProtocol === V3_WS_PROTOCOL) { + this.#flushOutboundQueueV3(wsResolved); + return; + } + this.#flushOutboundQueueV2(wsResolved); + } + + #flushOutboundQueueV2(wsResolved: WebsocketAdapter): void { const pending = this.#outboundQueue.splice(0); for (const message of pending) { wsResolved.send(message); } } + #flushOutboundQueueV3(wsResolved: WebsocketAdapter): void { + if (this.#outboundQueue.length === 0) { + return; + } + + // Emit at most one bounded frame per flush. If more encoded v2 messages + // remain in the queue, they are sent by a later scheduled flush so inbound + // traffic and other tasks get a chance to run between websocket writes. + const batchSize = countClientMessagesForV3Frame( + this.#outboundQueue, + MAX_V3_OUTBOUND_FRAME_BYTES + ); + const pending = this.#outboundQueue.splice(0, batchSize); + wsResolved.send(encodeClientMessagesV3(this.#clientFrameEncoder, pending)); + + if (this.#outboundQueue.length > 0) { + this.#scheduleDeferredOutboundFlush(); + } + } + + #scheduleOutboundFlush(): void { + this.#scheduleOutboundFlushWith('microtask'); + } + + #scheduleDeferredOutboundFlush(): void { + this.#scheduleOutboundFlushWith('next-task'); + } + + #scheduleOutboundFlushWith(schedule: 'microtask' | 'next-task'): void { + if (this.#isOutboundFlushScheduled) { + return; + } + + this.#isOutboundFlushScheduled = true; + const flush = () => { + this.#isOutboundFlushScheduled = false; + if (this.ws && this.isActive) { + this.#flushOutboundQueue(this.ws); + } + }; + + // The first v3 flush stays on the current turn so same-tick sends coalesce. + // Follow-up flushes after a size-capped frame yield to the next task so we + // do not sit in a tight send loop while inbound websocket work is waiting. + if (schedule === 'next-task') { + setTimeout(flush, 0); + } else { + queueMicrotask(flush); + } + } + #reducerArgsEncoder = new BinaryWriter(1024); #clientMessageEncoder = new BinaryWriter(1024); #sendEncodedMessage(encoded: Uint8Array, describe: () => string): void { + stdbLogger('trace', describe); if (this.ws && this.isActive) { - if (this.#outboundQueue.length) this.#flushOutboundQueue(this.ws); + if (this.#negotiatedWsProtocol === V2_WS_PROTOCOL) { + if (this.#outboundQueue.length) this.#flushOutboundQueue(this.ws); + this.ws.send(encoded); + return; + } - stdbLogger('trace', describe); - this.ws.send(encoded); + this.#outboundQueue.push(encoded.slice()); + this.#scheduleOutboundFlush(); } else { - stdbLogger('trace', describe); - // use slice() to copy, in case the clientMessageEncoder's buffer gets used + // Use slice() to copy, in case the clientMessageEncoder's buffer gets reused + // before the connection opens or before a v3 microbatch flush runs. this.#outboundQueue.push(encoded.slice()); } } @@ -681,6 +763,9 @@ export class DbConnectionImpl * Handles WebSocket onOpen event. */ #handleOnOpen(): void { + if (this.ws) { + this.#negotiatedWsProtocol = normalizeWsProtocol(this.ws.protocol); + } this.isActive = true; if (this.ws) { this.#flushOutboundQueue(this.ws); @@ -728,7 +813,17 @@ export class DbConnectionImpl ); } - #processMessage(data: Uint8Array): void { + #dispatchPendingCallbacks(callbacks: readonly PendingCallback[]): void { + stdbLogger( + 'trace', + () => `Calling ${callbacks.length} triggered row callbacks` + ); + for (const callback of callbacks) { + callback.cb(); + } + } + + #processV2Message(data: Uint8Array): void { const reader = this.#messageReader; reader.reset(data); const serverMessage = ServerMessage.deserialize(reader); @@ -769,13 +864,7 @@ export class DbConnectionImpl const callbacks = this.#applyTableUpdates(tableUpdates, eventContext); const { event: _, ...subscriptionEventContext } = eventContext; subscription.emitter.emit('applied', subscriptionEventContext); - stdbLogger( - 'trace', - () => `Calling ${callbacks.length} triggered row callbacks` - ); - for (const callback of callbacks) { - callback.cb(); - } + this.#dispatchPendingCallbacks(callbacks); break; } case 'UnsubscribeApplied': { @@ -801,13 +890,7 @@ export class DbConnectionImpl const { event: _, ...subscriptionEventContext } = eventContext; subscription.emitter.emit('end', subscriptionEventContext); this.#subscriptionManager.subscriptions.delete(querySetId); - stdbLogger( - 'trace', - () => `Calling ${callbacks.length} triggered row callbacks` - ); - for (const callback of callbacks) { - callback.cb(); - } + this.#dispatchPendingCallbacks(callbacks); break; } case 'SubscriptionError': { @@ -861,13 +944,7 @@ export class DbConnectionImpl eventContext, serverMessage.value ); - stdbLogger( - 'trace', - () => `Calling ${callbacks.length} triggered row callbacks` - ); - for (const callback of callbacks) { - callback.cb(); - } + this.#dispatchPendingCallbacks(callbacks); break; } case 'ReducerResult': { @@ -899,13 +976,7 @@ export class DbConnectionImpl eventContext, result.value.transactionUpdate ); - stdbLogger( - 'trace', - () => `Calling ${callbacks.length} triggered row callbacks` - ); - for (const callback of callbacks) { - callback.cb(); - } + this.#dispatchPendingCallbacks(callbacks); } this.#reducerCallInfo.delete(requestId); const cb = this.#reducerCallbacks.get(requestId); @@ -934,6 +1005,23 @@ export class DbConnectionImpl } } + #processMessage(data: Uint8Array): void { + if (this.#negotiatedWsProtocol !== V3_WS_PROTOCOL) { + this.#processV2Message(data); + return; + } + + const messages = decodeServerMessagesV3(this.#messageReader, data); + stdbLogger( + 'trace', + () => + `Processing server frame: ${messages.length === 1 ? 'single' : `batch(${messages.length})`}` + ); + for (const message of messages) { + this.#processV2Message(message); + } + } + /** * Handles WebSocket onMessage event. * @param wsMessage MessageEvent object. diff --git a/crates/bindings-typescript/src/sdk/websocket_decompress_adapter.ts b/crates/bindings-typescript/src/sdk/websocket_decompress_adapter.ts index 40157393dd1..b5db13a8149 100644 --- a/crates/bindings-typescript/src/sdk/websocket_decompress_adapter.ts +++ b/crates/bindings-typescript/src/sdk/websocket_decompress_adapter.ts @@ -2,6 +2,7 @@ import { decompress } from './decompress'; import { resolveWS } from './ws'; export interface WebsocketAdapter { + readonly protocol: string; send(msg: Uint8Array): void; close(): void; @@ -12,6 +13,10 @@ export interface WebsocketAdapter { } export class WebsocketDecompressAdapter implements WebsocketAdapter { + get protocol(): string { + return this.#ws.protocol; + } + set onclose(handler: (ev: CloseEvent) => void) { this.#ws.onclose = handler; } @@ -73,7 +78,7 @@ export class WebsocketDecompressAdapter implements WebsocketAdapter { confirmedReads, }: { url: URL; - wsProtocol: string; + wsProtocol: string | string[]; nameOrAddress: string; authToken?: string; compression: 'gzip' | 'none'; diff --git a/crates/bindings-typescript/src/sdk/websocket_protocols.ts b/crates/bindings-typescript/src/sdk/websocket_protocols.ts new file mode 100644 index 00000000000..2d6598143e9 --- /dev/null +++ b/crates/bindings-typescript/src/sdk/websocket_protocols.ts @@ -0,0 +1,25 @@ +import { stdbLogger } from './logger.ts'; + +export const V2_WS_PROTOCOL = 'v2.bsatn.spacetimedb'; +export const V3_WS_PROTOCOL = 'v3.bsatn.spacetimedb'; +export const PREFERRED_WS_PROTOCOLS = [V3_WS_PROTOCOL, V2_WS_PROTOCOL] as const; + +export type NegotiatedWsProtocol = + | typeof V2_WS_PROTOCOL + | typeof V3_WS_PROTOCOL; + +export function normalizeWsProtocol(protocol: string): NegotiatedWsProtocol { + if (protocol === V3_WS_PROTOCOL) { + return V3_WS_PROTOCOL; + } + // We treat an empty negotiated subprotocol as legacy v2 for compatibility. + if (protocol === '' || protocol === V2_WS_PROTOCOL) { + return V2_WS_PROTOCOL; + } + + stdbLogger( + 'warn', + `Unexpected websocket subprotocol "${protocol}", falling back to ${V2_WS_PROTOCOL}.` + ); + return V2_WS_PROTOCOL; +} diff --git a/crates/bindings-typescript/src/sdk/websocket_test_adapter.ts b/crates/bindings-typescript/src/sdk/websocket_test_adapter.ts index 6ac15f0e7fe..4c2c48b42f0 100644 --- a/crates/bindings-typescript/src/sdk/websocket_test_adapter.ts +++ b/crates/bindings-typescript/src/sdk/websocket_test_adapter.ts @@ -1,61 +1,109 @@ -import { BinaryReader, BinaryWriter } from '../'; +import BinaryReader from '../lib/binary_reader.ts'; +import BinaryWriter from '../lib/binary_writer.ts'; import { ClientMessage, ServerMessage } from './client_api/types'; import type { WebsocketAdapter } from './websocket_decompress_adapter'; +import { PREFERRED_WS_PROTOCOLS, V3_WS_PROTOCOL } from './websocket_protocols'; +import { + decodeClientMessagesV3, + encodeServerMessagesV3, +} from './websocket_v3_frames.ts'; class WebsocketTestAdapter implements WebsocketAdapter { - onclose: any; - // eslint-disable-next-line @typescript-eslint/no-unsafe-function-type - onopen!: () => void; - onmessage: any; - onerror: any; + protocol: string = ''; - messageQueue: any[]; + messageQueue: Uint8Array[]; outgoingMessages: ClientMessage[]; closed: boolean; + supportedProtocols: string[]; + + #onclose: (ev: CloseEvent) => void = () => {}; + #onopen: () => void = () => {}; + #onmessage: (msg: { data: Uint8Array }) => void = () => {}; constructor() { this.messageQueue = []; this.outgoingMessages = []; this.closed = false; + this.supportedProtocols = [...PREFERRED_WS_PROTOCOLS]; + } + + set onclose(handler: (ev: CloseEvent) => void) { + this.#onclose = handler; + } + + set onopen(handler: () => void) { + this.#onopen = handler; } - send(message: any): void { - const parsedMessage = ClientMessage.deserialize(new BinaryReader(message)); - this.outgoingMessages.push(parsedMessage); - // console.ClientMessageSerde.deserialize(message); - this.messageQueue.push(message); + set onmessage(handler: (msg: { data: Uint8Array }) => void) { + this.#onmessage = handler; + } + + set onerror(_handler: (msg: ErrorEvent) => void) {} + + send(message: Uint8Array): void { + const rawMessage = message.slice(); + const outgoingMessages = + this.protocol === V3_WS_PROTOCOL + ? decodeClientMessagesV3(rawMessage) + : [rawMessage]; + + for (const outgoingMessage of outgoingMessages) { + this.outgoingMessages.push( + ClientMessage.deserialize(new BinaryReader(outgoingMessage)) + ); + } + this.messageQueue.push(rawMessage); } close(): void { this.closed = true; - this.onclose?.({ code: 1000, reason: 'normal closure', wasClean: true }); + this.#onclose({ + code: 1000, + reason: 'normal closure', + wasClean: true, + } as CloseEvent); } acceptConnection(): void { - this.onopen(); + this.#onopen(); } sendToClient(message: ServerMessage): void { const writer = new BinaryWriter(1024); ServerMessage.serialize(writer, message); - const rawBytes = writer.getBuffer(); + const rawBytes = writer.getBuffer().slice(); // The brotli library's `compress` is somehow broken: it returns `null` for some inputs. // See https://github.com/foliojs/brotli.js/issues/36, which is closed but not actually fixed. // So we send the uncompressed data here, and in `spacetimedb.ts`, // if compression fails, we treat the raw message as having been uncompressed all along. // const data = compress(rawBytes); - this.onmessage({ data: rawBytes }); + const outboundData = + this.protocol === V3_WS_PROTOCOL + ? encodeServerMessagesV3(writer, [rawBytes]).slice() + : rawBytes; + this.#onmessage({ data: outboundData }); } async createWebSocketFn(_args: { url: URL; - wsProtocol: string; + wsProtocol: string | string[]; nameOrAddress: string; authToken?: string; compression: 'gzip' | 'none'; lightMode: boolean; confirmedReads?: boolean; }): Promise { + const requestedProtocols = Array.isArray(_args.wsProtocol) + ? _args.wsProtocol + : [_args.wsProtocol]; + const negotiatedProtocol = requestedProtocols.find(protocol => + this.supportedProtocols.includes(protocol) + ); + if (!negotiatedProtocol) { + return Promise.reject(new Error('No compatible websocket protocol')); + } + this.protocol = negotiatedProtocol; return this; } } diff --git a/crates/bindings-typescript/src/sdk/websocket_v3_frames.ts b/crates/bindings-typescript/src/sdk/websocket_v3_frames.ts new file mode 100644 index 00000000000..4aa494cb764 --- /dev/null +++ b/crates/bindings-typescript/src/sdk/websocket_v3_frames.ts @@ -0,0 +1,109 @@ +import BinaryReader from '../lib/binary_reader.ts'; +import BinaryWriter from '../lib/binary_writer.ts'; +import { + ClientFrame, + ServerFrame, + type ClientFrame as ClientFrameValue, + type ServerFrame as ServerFrameValue, +} from './client_api/v3'; + +// v3 is only a transport envelope. The inner payloads are already-encoded v2 +// websocket messages, so these helpers intentionally operate on raw bytes. +type V3FrameValue = ClientFrameValue | ServerFrameValue; + +function flattenFrame(frame: V3FrameValue): Uint8Array[] { + return frame.tag === 'Single' ? [frame.value] : frame.value; +} + +function ensureMessages(messages: readonly Uint8Array[]): void { + if (messages.length === 0) { + throw new RangeError( + 'v3 websocket frames must contain at least one message' + ); + } +} + +const BSATN_SUM_TAG_BYTES = 1; +const BSATN_LENGTH_PREFIX_BYTES = 4; + +function encodedSingleFrameSize(message: Uint8Array): number { + return BSATN_SUM_TAG_BYTES + BSATN_LENGTH_PREFIX_BYTES + message.length; +} + +function encodedBatchFrameSizeForFirstMessage(message: Uint8Array): number { + return ( + BSATN_SUM_TAG_BYTES + + BSATN_LENGTH_PREFIX_BYTES + + BSATN_LENGTH_PREFIX_BYTES + + message.length + ); +} + +function encodedBatchElementSize(message: Uint8Array): number { + return BSATN_LENGTH_PREFIX_BYTES + message.length; +} + +export function countClientMessagesForV3Frame( + messages: readonly Uint8Array[], + maxFrameBytes: number +): number { + ensureMessages(messages); + + const firstMessage = messages[0]!; + if (encodedSingleFrameSize(firstMessage) > maxFrameBytes) { + return 1; + } + + let count = 1; + let batchSize = encodedBatchFrameSizeForFirstMessage(firstMessage); + while (count < messages.length) { + const nextMessage = messages[count]!; + const nextBatchSize = batchSize + encodedBatchElementSize(nextMessage); + if (nextBatchSize > maxFrameBytes) { + break; + } + batchSize = nextBatchSize; + count += 1; + } + return count; +} + +export function encodeClientMessagesV3( + writer: BinaryWriter, + messages: readonly Uint8Array[] +): Uint8Array { + ensureMessages(messages); + writer.clear(); + if (messages.length === 1) { + ClientFrame.serialize(writer, ClientFrame.Single(messages[0]!)); + } else { + ClientFrame.serialize(writer, ClientFrame.Batch(Array.from(messages))); + } + return writer.getBuffer(); +} + +export function decodeClientMessagesV3(data: Uint8Array): Uint8Array[] { + return flattenFrame(ClientFrame.deserialize(new BinaryReader(data))); +} + +export function encodeServerMessagesV3( + writer: BinaryWriter, + messages: readonly Uint8Array[] +): Uint8Array { + ensureMessages(messages); + writer.clear(); + if (messages.length === 1) { + ServerFrame.serialize(writer, ServerFrame.Single(messages[0]!)); + } else { + ServerFrame.serialize(writer, ServerFrame.Batch(Array.from(messages))); + } + return writer.getBuffer(); +} + +export function decodeServerMessagesV3( + reader: BinaryReader, + data: Uint8Array +): Uint8Array[] { + reader.reset(data); + return flattenFrame(ServerFrame.deserialize(reader)); +} diff --git a/crates/bindings-typescript/tests/db_connection.test.ts b/crates/bindings-typescript/tests/db_connection.test.ts index ec17430e41a..f4eca70f245 100644 --- a/crates/bindings-typescript/tests/db_connection.test.ts +++ b/crates/bindings-typescript/tests/db_connection.test.ts @@ -1,5 +1,6 @@ import { beforeEach, describe, expect, test } from 'vitest'; import { + BinaryReader, BinaryWriter, ConnectionId, Identity, @@ -8,8 +9,10 @@ import { Timestamp, type Infer, } from '../src'; +import { ClientFrame } from '../src/sdk/client_api/v3'; import { ServerMessage } from '../src/sdk/client_api/types'; import WebsocketTestAdapter from '../src/sdk/websocket_test_adapter'; +import { V2_WS_PROTOCOL, V3_WS_PROTOCOL } from '../src/sdk/websocket_protocols'; import { DbConnection } from '../test-app/src/module_bindings'; import User from '../test-app/src/module_bindings/user_table'; import { @@ -194,6 +197,69 @@ describe('DbConnection', () => { expect(called).toBeTruthy(); }); + test('batches same-tick reducer calls when v3 is negotiated', async () => { + const wsAdapter = new WebsocketTestAdapter(); + const client = DbConnection.builder() + .withUri('ws://127.0.0.1:1234') + .withDatabaseName('db') + .withWSFn(wsAdapter.createWebSocketFn.bind(wsAdapter) as any) + .build(); + + await client['wsPromise']; + wsAdapter.acceptConnection(); + + void client.reducers.createPlayer({ + name: 'Player One', + location: { x: 1, y: 2 }, + }); + void client.reducers.createPlayer({ + name: 'Player Two', + location: { x: 3, y: 4 }, + }); + + await Promise.resolve(); + + expect(wsAdapter.protocol).toEqual(V3_WS_PROTOCOL); + expect(wsAdapter.messageQueue).toHaveLength(1); + expect(wsAdapter.outgoingMessages).toHaveLength(2); + + const outboundFrame = ClientFrame.deserialize( + new BinaryReader(wsAdapter.messageQueue[0]) + ); + expect(outboundFrame.tag).toEqual('Batch'); + if (outboundFrame.tag === 'Batch') { + expect(outboundFrame.value).toHaveLength(2); + } + }); + + test('falls back to v2 and does not batch reducer calls when v3 is unavailable', async () => { + const wsAdapter = new WebsocketTestAdapter(); + wsAdapter.supportedProtocols = [V2_WS_PROTOCOL]; + const client = DbConnection.builder() + .withUri('ws://127.0.0.1:1234') + .withDatabaseName('db') + .withWSFn(wsAdapter.createWebSocketFn.bind(wsAdapter) as any) + .build(); + + await client['wsPromise']; + wsAdapter.acceptConnection(); + + void client.reducers.createPlayer({ + name: 'Player One', + location: { x: 1, y: 2 }, + }); + void client.reducers.createPlayer({ + name: 'Player Two', + location: { x: 3, y: 4 }, + }); + + await Promise.resolve(); + + expect(wsAdapter.protocol).toEqual(V2_WS_PROTOCOL); + expect(wsAdapter.messageQueue).toHaveLength(2); + expect(wsAdapter.outgoingMessages).toHaveLength(2); + }); + test('disconnects when SubscriptionError has no requestId', async () => { const onDisconnectPromise = new Deferred(); const wsAdapter = new WebsocketTestAdapter(); @@ -750,6 +816,7 @@ describe('DbConnection', () => { .withWSFn(wsAdapter.createWebSocketFn.bind(wsAdapter) as any) .build(); await client['wsPromise']; + wsAdapter.acceptConnection(); const user1 = { identity: bobIdentity, username: 'bob' }; const user2 = { identity: sallyIdentity, diff --git a/crates/bindings-typescript/tests/websocket_v3_frames.test.ts b/crates/bindings-typescript/tests/websocket_v3_frames.test.ts new file mode 100644 index 00000000000..d0ac7dbe258 --- /dev/null +++ b/crates/bindings-typescript/tests/websocket_v3_frames.test.ts @@ -0,0 +1,25 @@ +import { describe, expect, test } from 'vitest'; +import { countClientMessagesForV3Frame } from '../src/sdk/websocket_v3_frames'; + +describe('websocket_v3_frames', () => { + test('counts as many client messages as fit within the encoded frame limit', () => { + const messages = [ + new Uint8Array(10), + new Uint8Array(20), + new Uint8Array(30), + ]; + + expect(countClientMessagesForV3Frame(messages, 1 + 4 + 10)).toBe(1); + expect( + countClientMessagesForV3Frame(messages, 1 + 4 + 4 + 10 + 4 + 20) + ).toBe(2); + expect( + countClientMessagesForV3Frame(messages, 1 + 4 + 4 + 10 + 4 + 20 + 4 + 30) + ).toBe(3); + }); + + test('still emits an oversized first message on its own', () => { + const messages = [new Uint8Array(300_000), new Uint8Array(10)]; + expect(countClientMessagesForV3Frame(messages, 256 * 1024)).toBe(1); + }); +});