Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions crates/client-api-messages/DEVELOP.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,12 @@ spacetime generate -p spacetimedb-cli --lang <SDK lang> \
--out-dir <sdk WebSocket schema bindings dir> \
--module-def ws_schema_v2.json
```

For the v3 WebSocket transport schema:

```sh
cargo run --example get_ws_schema_v3 > ws_schema_v3.json
spacetime generate -p spacetimedb-cli --lang <SDK lang> \
--out-dir <sdk WebSocket schema bindings dir> \
--module-def ws_schema_v3.json
```
13 changes: 13 additions & 0 deletions crates/client-api-messages/examples/get_ws_schema_v3.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use spacetimedb_client_api_messages::websocket::v3::{ClientFrame, ServerFrame};
use spacetimedb_lib::ser::serde::SerializeWrapper;
use spacetimedb_lib::{RawModuleDef, RawModuleDefV8};

fn main() -> Result<(), serde_json::Error> {
let module = RawModuleDefV8::with_builder(|module| {
module.add_type::<ClientFrame>();
module.add_type::<ServerFrame>();
});
let module = RawModuleDef::V8BackCompat(module);

serde_json::to_writer(std::io::stdout().lock(), SerializeWrapper::from_ref(&module))
}
1 change: 1 addition & 0 deletions crates/client-api-messages/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@
pub mod common;
pub mod v1;
pub mod v2;
pub mod v3;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you prefer adding a new version with a wrapper, rather than adding new message variants to v2?

28 changes: 28 additions & 0 deletions crates/client-api-messages/src/websocket/v3.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use bytes::Bytes;
pub use spacetimedb_sats::SpacetimeType;

pub const BIN_PROTOCOL: &str = "v3.bsatn.spacetimedb";

/// Transport envelopes sent by the client over the v3 websocket protocol.
///
/// The inner bytes are BSATN-encoded v2 [`crate::websocket::v2::ClientMessage`] values.
#[derive(SpacetimeType, Debug)]
#[sats(crate = spacetimedb_lib)]
pub enum ClientFrame {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm; both Single and Batch could actually be stored as just Bytes with the logic that the host will try to deserialize as many ClientMessages as it can until the read buffer has been exhausted.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, we could go for a more structured / typed representation here, with Box<[super::v2::ClientMessage]>. This would still include some of the overhead/bookkeeping bytes and allocations which Mazdak's suggestion eliminates, but it would add type safety and eliminate a lot of ser/de boilerplate code. That would look like:

enum ClientFrame {
    Single(super::v2::ClientMessage),
    Batch(Box<[super::v2::ClientMessage]>),
}

And, you know, doing the same thing to ServerMessage.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah if we go for the tagged representation then a typed one seems better. However, given that the goal of this PR is perf, it seems to me that we should go for the representation with the least overhead. (This PR actually is a regression for the single case, but with my suggestion, the added overhead is a single branch rather than a full extra allocation / taking from the pool + memcpy.)

/// A single logical client message.
Single(Bytes),
/// Multiple logical client messages that should be processed in-order.
Batch(Box<[Bytes]>),
}

/// Transport envelopes sent by the server over the v3 websocket protocol.
///
/// The inner bytes are BSATN-encoded v2 [`crate::websocket::v2::ServerMessage`] values.
#[derive(SpacetimeType, Debug)]
#[sats(crate = spacetimedb_lib)]
pub enum ServerFrame {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ServerFrame could also just be Bytes.

/// A single logical server message.
Single(Bytes),
/// Multiple logical server messages that should be processed in-order.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Multiple logical server messages that should be processed in-order.
/// Multiple logical server messages that should be processed in-order.
///
/// This is currently never produced.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the intended semantics for ServerFrame::Batch? Will the server always respond ServerFrame::Single to ClientFrame::Single and ServerFrame::Batch to ClientFrame::Batch, or is the server free to re-group messages so long as order is preserved?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was also wondering this xD

Batch(Box<[Bytes]>),
}
132 changes: 95 additions & 37 deletions crates/client-api/src/routes/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use prometheus::{Histogram, IntGauge};
use scopeguard::{defer, ScopeGuard};
use serde::Deserialize;
use spacetimedb::client::messages::{
serialize, serialize_v2, IdentityTokenMessage, InUseSerializeBuffer, SerializeBuffer, SwitchedServerMessage,
ToProtocol,
serialize, serialize_v2, serialize_v3, IdentityTokenMessage, InUseSerializeBuffer, SerializeBuffer,
SwitchedServerMessage, ToProtocol,
};
use spacetimedb::client::{
ClientActorId, ClientConfig, ClientConnection, ClientConnectionReceiver, DataMessage, MessageExecutionError,
Expand All @@ -38,6 +38,7 @@ use spacetimedb::worker_metrics::WORKER_METRICS;
use spacetimedb::Identity;
use spacetimedb_client_api_messages::websocket::v1 as ws_v1;
use spacetimedb_client_api_messages::websocket::v2 as ws_v2;
use spacetimedb_client_api_messages::websocket::v3 as ws_v3;
use spacetimedb_datastore::execution_context::WorkloadType;
use spacetimedb_lib::connection_id::{ConnectionId, ConnectionIdForUrl};
use tokio::sync::{mpsc, watch};
Expand All @@ -62,6 +63,8 @@ pub const TEXT_PROTOCOL: HeaderValue = HeaderValue::from_static(ws_v1::TEXT_PROT
pub const BIN_PROTOCOL: HeaderValue = HeaderValue::from_static(ws_v1::BIN_PROTOCOL);
#[allow(clippy::declare_interior_mutable_const)]
pub const V2_BIN_PROTOCOL: HeaderValue = HeaderValue::from_static(ws_v2::BIN_PROTOCOL);
#[allow(clippy::declare_interior_mutable_const)]
pub const V3_BIN_PROTOCOL: HeaderValue = HeaderValue::from_static(ws_v3::BIN_PROTOCOL);

pub trait HasWebSocketOptions {
fn websocket_options(&self) -> WebSocketOptions;
Expand Down Expand Up @@ -101,7 +104,7 @@ fn resolve_confirmed_reads_default(version: WsVersion, confirmed: Option<bool>)
}
match version {
WsVersion::V1 => false,
WsVersion::V2 => crate::DEFAULT_CONFIRMED_READS,
WsVersion::V2 | WsVersion::V3 => crate::DEFAULT_CONFIRMED_READS,
}
}

Expand Down Expand Up @@ -151,6 +154,13 @@ where
}

let (res, ws_upgrade, protocol) = ws.select_protocol([
(
V3_BIN_PROTOCOL,
NegotiatedProtocol {
protocol: Protocol::Binary,
version: WsVersion::V3,
},
),
(
V2_BIN_PROTOCOL,
NegotiatedProtocol {
Expand Down Expand Up @@ -284,7 +294,7 @@ where
};
client.send_message(None, OutboundMessage::V1(message.into()))
}
WsVersion::V2 => {
WsVersion::V2 | WsVersion::V3 => {
let message = ws_v2::ServerMessage::InitialConnection(ws_v2::InitialConnection {
identity: client_identity,
connection_id,
Expand Down Expand Up @@ -1293,10 +1303,15 @@ async fn ws_encode_task(
// copied to the wire. Since we don't know when that will happen, we prepare
// for a few messages to be in-flight, i.e. encoded but not yet sent.
const BUF_POOL_CAPACITY: usize = 16;
let binary_message_serializer = match config.version {
WsVersion::V1 => None,
WsVersion::V2 => Some(serialize_v2 as BinarySerializeFn),
WsVersion::V3 => Some(serialize_v3 as BinarySerializeFn),
};
let buf_pool = ArrayQueue::new(BUF_POOL_CAPACITY);
let mut in_use_bufs: Vec<ScopeGuard<InUseSerializeBuffer, _>> = Vec::with_capacity(BUF_POOL_CAPACITY);

while let Some(message) = messages.recv().await {
'send: while let Some(message) = messages.recv().await {
Copy link
Copy Markdown
Contributor

@Centril Centril Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this just for added readability? (I don't mind it.)

// Drop serialize buffers with no external referent,
// returning them to the pool.
in_use_bufs.retain(|in_use| !in_use.is_unique());
Expand All @@ -1306,55 +1321,70 @@ async fn ws_encode_task(

let in_use_buf = match message {
OutboundWsMessage::Error(message) => {
if config.version == WsVersion::V2 {
log::error!("dropping v1 error message sent to a v2 client: {:?}", message);
if config.version != WsVersion::V1 {
log::error!(
"dropping v1 error message sent to a binary websocket client: {:?}",
message
);
continue;
}
let (stats, in_use, mut frames) = ws_encode_message(config, buf, message, false, &bsatn_rlb_pool).await;
metrics.report(None, None, stats);
if frames.try_for_each(|frame| outgoing_frames.send(frame)).is_err() {
break;
}

let Ok(in_use) = ws_forward_frames(
&metrics,
&outgoing_frames,
None,
None,
ws_encode_message(config, buf, message, false, &bsatn_rlb_pool).await,
) else {
break 'send;
};
in_use
}
OutboundWsMessage::Message(message) => {
let workload = message.workload();
let num_rows = message.num_rows();
match message {
OutboundMessage::V2(server_message) => {
if config.version != WsVersion::V2 {
if config.version == WsVersion::V1 {
log::error!("dropping v2 message on v1 connection");
continue;
}

let (stats, in_use, mut frames) =
ws_encode_message_v2(config, buf, server_message, false, &bsatn_rlb_pool).await;
metrics.report(workload, num_rows, stats);
if frames.try_for_each(|frame| outgoing_frames.send(frame)).is_err() {
break;
}

let Ok(in_use) = ws_forward_frames(
&metrics,
&outgoing_frames,
workload,
num_rows,
ws_encode_binary_message(
config,
buf,
server_message,
binary_message_serializer.expect("v2 message should not be sent on a v1 connection"),
false,
&bsatn_rlb_pool,
)
.await,
) else {
break 'send;
};
in_use
}
OutboundMessage::V1(message) => {
if config.version == WsVersion::V2 {
log::error!(
"dropping v1 message for v2 connection until v2 serialization is implemented: {:?}",
message
);
if config.version != WsVersion::V1 {
log::error!("dropping v1 message for a binary websocket connection: {:?}", message);
continue;
}

let is_large = num_rows.is_some_and(|n| n > 1024);

let (stats, in_use, mut frames) =
ws_encode_message(config, buf, message, is_large, &bsatn_rlb_pool).await;
metrics.report(workload, num_rows, stats);
if frames.try_for_each(|frame| outgoing_frames.send(frame)).is_err() {
break;
}

let Ok(in_use) = ws_forward_frames(
&metrics,
&outgoing_frames,
workload,
num_rows,
ws_encode_message(config, buf, message, is_large, &bsatn_rlb_pool).await,
) else {
break 'send;
};
in_use
}
}
Expand All @@ -1370,6 +1400,24 @@ async fn ws_encode_task(
}
}

/// Reports encode metrics for an already-encoded message and forwards all of
/// its frames to the websocket send task.
fn ws_forward_frames<I>(
metrics: &SendMetrics,
outgoing_frames: &mpsc::UnboundedSender<Frame>,
workload: Option<WorkloadType>,
num_rows: Option<usize>,
encoded: (EncodeMetrics, InUseSerializeBuffer, I),
) -> Result<InUseSerializeBuffer, mpsc::error::SendError<Frame>>
where
I: Iterator<Item = Frame>,
{
Comment on lines +1403 to +1414
Copy link
Copy Markdown
Contributor

@Centril Centril Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Reports encode metrics for an already-encoded message and forwards all of
/// its frames to the websocket send task.
fn ws_forward_frames<I>(
metrics: &SendMetrics,
outgoing_frames: &mpsc::UnboundedSender<Frame>,
workload: Option<WorkloadType>,
num_rows: Option<usize>,
encoded: (EncodeMetrics, InUseSerializeBuffer, I),
) -> Result<InUseSerializeBuffer, mpsc::error::SendError<Frame>>
where
I: Iterator<Item = Frame>,
{
/// Reports encode metrics for an already-encoded message
/// and forwards all of its frames to the websocket send task.
fn ws_forward_frames(
metrics: &SendMetrics,
outgoing_frames: &mpsc::UnboundedSender<Frame>,
workload: Option<WorkloadType>,
num_rows: Option<usize>,
encoded: (EncodeMetrics, InUseSerializeBuffer, impl Iterator<Item = Frame>),
) -> Result<InUseSerializeBuffer, mpsc::error::SendError<Frame>> {

(not so important, just a nit)

let (stats, in_use, frames) = encoded;
metrics.report(workload, num_rows, stats);
frames.into_iter().try_for_each(|frame| outgoing_frames.send(frame))?;
Ok(in_use)
}

/// Some stats about serialization and compression.
///
/// Returned by [`ws_encode_message`].
Expand Down Expand Up @@ -1443,21 +1491,29 @@ async fn ws_encode_message(
(metrics, msg_alloc, frames)
}

#[allow(dead_code, unused_variables)]
async fn ws_encode_message_v2(
type BinarySerializeFn = fn(
&BsatnRowListBuilderPool,
SerializeBuffer,
ws_v2::ServerMessage,
ws_v1::Compression,
) -> (InUseSerializeBuffer, Bytes);

async fn ws_encode_binary_message(
config: ClientConfig,
buf: SerializeBuffer,
message: ws_v2::ServerMessage,
serialize_message: BinarySerializeFn,
is_large_message: bool,
bsatn_rlb_pool: &BsatnRowListBuilderPool,
) -> (EncodeMetrics, InUseSerializeBuffer, impl Iterator<Item = Frame> + use<>) {
let start = Instant::now();
let compression = config.compression;

let (in_use, data) = if is_large_message {
let bsatn_rlb_pool = bsatn_rlb_pool.clone();
spawn_rayon(move || serialize_v2(&bsatn_rlb_pool, buf, message, config.compression)).await
spawn_rayon(move || serialize_message(&bsatn_rlb_pool, buf, message, compression)).await
} else {
serialize_v2(bsatn_rlb_pool, buf, message, config.compression)
serialize_message(bsatn_rlb_pool, buf, message, compression)
};

let metrics = EncodeMetrics {
Expand Down Expand Up @@ -2298,9 +2354,11 @@ mod tests {

#[test]
fn confirmed_reads_default_depends_on_ws_version() {
assert!(resolve_confirmed_reads_default(WsVersion::V3, None));
assert!(resolve_confirmed_reads_default(WsVersion::V2, None));
assert!(!resolve_confirmed_reads_default(WsVersion::V1, None));
assert!(resolve_confirmed_reads_default(WsVersion::V1, Some(true)));
assert!(!resolve_confirmed_reads_default(WsVersion::V3, Some(false)));
assert!(!resolve_confirmed_reads_default(WsVersion::V2, Some(false)));
}

Expand Down
1 change: 1 addition & 0 deletions crates/core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod consume_each_list;
mod message_handlers;
mod message_handlers_v1;
mod message_handlers_v2;
mod message_handlers_v3;
pub mod messages;

pub use client_connection::{
Expand Down
3 changes: 2 additions & 1 deletion crates/core/src/client/client_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub enum Protocol {
pub enum WsVersion {
V1,
V2,
V3,
}

impl Protocol {
Expand Down Expand Up @@ -384,7 +385,7 @@ impl ClientConnectionSender {
debug_assert!(
matches!(
(&self.config.version, &message),
(WsVersion::V1, OutboundMessage::V1(_)) | (WsVersion::V2, OutboundMessage::V2(_))
(WsVersion::V1, OutboundMessage::V1(_)) | (WsVersion::V2 | WsVersion::V3, OutboundMessage::V2(_))
),
"attempted to send message variant that does not match client websocket version"
);
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/client/message_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst
match client.config.version {
WsVersion::V1 => super::message_handlers_v1::handle(client, message, timer).await,
WsVersion::V2 => super::message_handlers_v2::handle(client, message, timer).await,
WsVersion::V3 => super::message_handlers_v3::handle(client, message, timer).await,
}
}
8 changes: 8 additions & 0 deletions crates/core/src/client/message_handlers_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst
)))
}
};
handle_decoded_message(client, message, timer).await
}

pub(super) async fn handle_decoded_message(
client: &ClientConnection,
message: ws_v2::ClientMessage,
timer: Instant,
) -> Result<(), MessageHandleError> {
let module = client.module();
let mod_info = module.info();
let mod_metrics = &mod_info.metrics;
Expand Down
32 changes: 32 additions & 0 deletions crates/core/src/client/message_handlers_v3.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use super::{ClientConnection, DataMessage, MessageHandleError};
use serde::de::Error as _;
use spacetimedb_client_api_messages::websocket::{v2 as ws_v2, v3 as ws_v3};
use spacetimedb_lib::bsatn;
use std::time::Instant;

pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Instant) -> Result<(), MessageHandleError> {
client.observe_websocket_request_message(&message);
let frame = match message {
DataMessage::Binary(message_buf) => bsatn::from_slice::<ws_v3::ClientFrame>(&message_buf)?,
DataMessage::Text(_) => {
return Err(MessageHandleError::TextDecode(serde_json::Error::custom(
"v3 websocket does not support text messages",
)))
}
};

match frame {
ws_v3::ClientFrame::Single(message) => {
let message = bsatn::from_slice::<ws_v2::ClientMessage>(&message)?;
super::message_handlers_v2::handle_decoded_message(client, message, timer).await?;
}
ws_v3::ClientFrame::Batch(messages) => {
for message in messages {
let message = bsatn::from_slice::<ws_v2::ClientMessage>(&message)?;
super::message_handlers_v2::handle_decoded_message(client, message, timer).await?;
}
}
}
Comment on lines +9 to +29
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If ClientFrame is just message: Bytes (directly from DataMessage), the V2 and V3 code for handle could be the same and we wouldn't need the temporary allocation.


Ok(())
}
Loading
Loading