-
Notifications
You must be signed in to change notification settings - Fork 989
v3 websocket protocol #4761
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
v3 websocket protocol #4761
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| use spacetimedb_client_api_messages::websocket::v3::{ClientFrame, ServerFrame}; | ||
| use spacetimedb_lib::ser::serde::SerializeWrapper; | ||
| use spacetimedb_lib::{RawModuleDef, RawModuleDefV8}; | ||
|
|
||
| fn main() -> Result<(), serde_json::Error> { | ||
| let module = RawModuleDefV8::with_builder(|module| { | ||
| module.add_type::<ClientFrame>(); | ||
| module.add_type::<ServerFrame>(); | ||
| }); | ||
| let module = RawModuleDef::V8BackCompat(module); | ||
|
|
||
| serde_json::to_writer(std::io::stdout().lock(), SerializeWrapper::from_ref(&module)) | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,3 +17,4 @@ | |
| pub mod common; | ||
| pub mod v1; | ||
| pub mod v2; | ||
| pub mod v3; | ||
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,28 @@ | ||||||||||
| use bytes::Bytes; | ||||||||||
| pub use spacetimedb_sats::SpacetimeType; | ||||||||||
|
|
||||||||||
| pub const BIN_PROTOCOL: &str = "v3.bsatn.spacetimedb"; | ||||||||||
|
|
||||||||||
| /// Transport envelopes sent by the client over the v3 websocket protocol. | ||||||||||
| /// | ||||||||||
| /// The inner bytes are BSATN-encoded v2 [`crate::websocket::v2::ClientMessage`] values. | ||||||||||
| #[derive(SpacetimeType, Debug)] | ||||||||||
| #[sats(crate = spacetimedb_lib)] | ||||||||||
| pub enum ClientFrame { | ||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm; both
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Alternatively, we could go for a more structured / typed representation here, with enum ClientFrame {
Single(super::v2::ClientMessage),
Batch(Box<[super::v2::ClientMessage]>),
}And, you know, doing the same thing to
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah if we go for the tagged representation then a typed one seems better. However, given that the goal of this PR is perf, it seems to me that we should go for the representation with the least overhead. (This PR actually is a regression for the single case, but with my suggestion, the added overhead is a single branch rather than a full extra allocation / taking from the pool + memcpy.) |
||||||||||
| /// A single logical client message. | ||||||||||
| Single(Bytes), | ||||||||||
| /// Multiple logical client messages that should be processed in-order. | ||||||||||
| Batch(Box<[Bytes]>), | ||||||||||
| } | ||||||||||
|
|
||||||||||
| /// Transport envelopes sent by the server over the v3 websocket protocol. | ||||||||||
| /// | ||||||||||
| /// The inner bytes are BSATN-encoded v2 [`crate::websocket::v2::ServerMessage`] values. | ||||||||||
| #[derive(SpacetimeType, Debug)] | ||||||||||
| #[sats(crate = spacetimedb_lib)] | ||||||||||
| pub enum ServerFrame { | ||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||||||||||
| /// A single logical server message. | ||||||||||
| Single(Bytes), | ||||||||||
| /// Multiple logical server messages that should be processed in-order. | ||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What are the intended semantics for
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was also wondering this xD |
||||||||||
| Batch(Box<[Bytes]>), | ||||||||||
| } | ||||||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -23,8 +23,8 @@ use prometheus::{Histogram, IntGauge}; | |||||||||||||||||||||||||||||||||||||||||||
| use scopeguard::{defer, ScopeGuard}; | ||||||||||||||||||||||||||||||||||||||||||||
| use serde::Deserialize; | ||||||||||||||||||||||||||||||||||||||||||||
| use spacetimedb::client::messages::{ | ||||||||||||||||||||||||||||||||||||||||||||
| serialize, serialize_v2, IdentityTokenMessage, InUseSerializeBuffer, SerializeBuffer, SwitchedServerMessage, | ||||||||||||||||||||||||||||||||||||||||||||
| ToProtocol, | ||||||||||||||||||||||||||||||||||||||||||||
| serialize, serialize_v2, serialize_v3, IdentityTokenMessage, InUseSerializeBuffer, SerializeBuffer, | ||||||||||||||||||||||||||||||||||||||||||||
| SwitchedServerMessage, ToProtocol, | ||||||||||||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||||||||||||
| use spacetimedb::client::{ | ||||||||||||||||||||||||||||||||||||||||||||
| ClientActorId, ClientConfig, ClientConnection, ClientConnectionReceiver, DataMessage, MessageExecutionError, | ||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -38,6 +38,7 @@ use spacetimedb::worker_metrics::WORKER_METRICS; | |||||||||||||||||||||||||||||||||||||||||||
| use spacetimedb::Identity; | ||||||||||||||||||||||||||||||||||||||||||||
| use spacetimedb_client_api_messages::websocket::v1 as ws_v1; | ||||||||||||||||||||||||||||||||||||||||||||
| use spacetimedb_client_api_messages::websocket::v2 as ws_v2; | ||||||||||||||||||||||||||||||||||||||||||||
| use spacetimedb_client_api_messages::websocket::v3 as ws_v3; | ||||||||||||||||||||||||||||||||||||||||||||
| use spacetimedb_datastore::execution_context::WorkloadType; | ||||||||||||||||||||||||||||||||||||||||||||
| use spacetimedb_lib::connection_id::{ConnectionId, ConnectionIdForUrl}; | ||||||||||||||||||||||||||||||||||||||||||||
| use tokio::sync::{mpsc, watch}; | ||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -62,6 +63,8 @@ pub const TEXT_PROTOCOL: HeaderValue = HeaderValue::from_static(ws_v1::TEXT_PROT | |||||||||||||||||||||||||||||||||||||||||||
| pub const BIN_PROTOCOL: HeaderValue = HeaderValue::from_static(ws_v1::BIN_PROTOCOL); | ||||||||||||||||||||||||||||||||||||||||||||
| #[allow(clippy::declare_interior_mutable_const)] | ||||||||||||||||||||||||||||||||||||||||||||
| pub const V2_BIN_PROTOCOL: HeaderValue = HeaderValue::from_static(ws_v2::BIN_PROTOCOL); | ||||||||||||||||||||||||||||||||||||||||||||
| #[allow(clippy::declare_interior_mutable_const)] | ||||||||||||||||||||||||||||||||||||||||||||
| pub const V3_BIN_PROTOCOL: HeaderValue = HeaderValue::from_static(ws_v3::BIN_PROTOCOL); | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| pub trait HasWebSocketOptions { | ||||||||||||||||||||||||||||||||||||||||||||
| fn websocket_options(&self) -> WebSocketOptions; | ||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -101,7 +104,7 @@ fn resolve_confirmed_reads_default(version: WsVersion, confirmed: Option<bool>) | |||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
| match version { | ||||||||||||||||||||||||||||||||||||||||||||
| WsVersion::V1 => false, | ||||||||||||||||||||||||||||||||||||||||||||
| WsVersion::V2 => crate::DEFAULT_CONFIRMED_READS, | ||||||||||||||||||||||||||||||||||||||||||||
| WsVersion::V2 | WsVersion::V3 => crate::DEFAULT_CONFIRMED_READS, | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -151,6 +154,13 @@ where | |||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| let (res, ws_upgrade, protocol) = ws.select_protocol([ | ||||||||||||||||||||||||||||||||||||||||||||
| ( | ||||||||||||||||||||||||||||||||||||||||||||
| V3_BIN_PROTOCOL, | ||||||||||||||||||||||||||||||||||||||||||||
| NegotiatedProtocol { | ||||||||||||||||||||||||||||||||||||||||||||
| protocol: Protocol::Binary, | ||||||||||||||||||||||||||||||||||||||||||||
| version: WsVersion::V3, | ||||||||||||||||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||||||||||||||||
| ), | ||||||||||||||||||||||||||||||||||||||||||||
| ( | ||||||||||||||||||||||||||||||||||||||||||||
| V2_BIN_PROTOCOL, | ||||||||||||||||||||||||||||||||||||||||||||
| NegotiatedProtocol { | ||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -284,7 +294,7 @@ where | |||||||||||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||||||||||||
| client.send_message(None, OutboundMessage::V1(message.into())) | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
| WsVersion::V2 => { | ||||||||||||||||||||||||||||||||||||||||||||
| WsVersion::V2 | WsVersion::V3 => { | ||||||||||||||||||||||||||||||||||||||||||||
| let message = ws_v2::ServerMessage::InitialConnection(ws_v2::InitialConnection { | ||||||||||||||||||||||||||||||||||||||||||||
| identity: client_identity, | ||||||||||||||||||||||||||||||||||||||||||||
| connection_id, | ||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -1293,10 +1303,15 @@ async fn ws_encode_task( | |||||||||||||||||||||||||||||||||||||||||||
| // copied to the wire. Since we don't know when that will happen, we prepare | ||||||||||||||||||||||||||||||||||||||||||||
| // for a few messages to be in-flight, i.e. encoded but not yet sent. | ||||||||||||||||||||||||||||||||||||||||||||
| const BUF_POOL_CAPACITY: usize = 16; | ||||||||||||||||||||||||||||||||||||||||||||
| let binary_message_serializer = match config.version { | ||||||||||||||||||||||||||||||||||||||||||||
| WsVersion::V1 => None, | ||||||||||||||||||||||||||||||||||||||||||||
| WsVersion::V2 => Some(serialize_v2 as BinarySerializeFn), | ||||||||||||||||||||||||||||||||||||||||||||
| WsVersion::V3 => Some(serialize_v3 as BinarySerializeFn), | ||||||||||||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||||||||||||
| let buf_pool = ArrayQueue::new(BUF_POOL_CAPACITY); | ||||||||||||||||||||||||||||||||||||||||||||
| let mut in_use_bufs: Vec<ScopeGuard<InUseSerializeBuffer, _>> = Vec::with_capacity(BUF_POOL_CAPACITY); | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| while let Some(message) = messages.recv().await { | ||||||||||||||||||||||||||||||||||||||||||||
| 'send: while let Some(message) = messages.recv().await { | ||||||||||||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this just for added readability? (I don't mind it.) |
||||||||||||||||||||||||||||||||||||||||||||
| // Drop serialize buffers with no external referent, | ||||||||||||||||||||||||||||||||||||||||||||
| // returning them to the pool. | ||||||||||||||||||||||||||||||||||||||||||||
| in_use_bufs.retain(|in_use| !in_use.is_unique()); | ||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -1306,55 +1321,70 @@ async fn ws_encode_task( | |||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| let in_use_buf = match message { | ||||||||||||||||||||||||||||||||||||||||||||
| OutboundWsMessage::Error(message) => { | ||||||||||||||||||||||||||||||||||||||||||||
| if config.version == WsVersion::V2 { | ||||||||||||||||||||||||||||||||||||||||||||
| log::error!("dropping v1 error message sent to a v2 client: {:?}", message); | ||||||||||||||||||||||||||||||||||||||||||||
| if config.version != WsVersion::V1 { | ||||||||||||||||||||||||||||||||||||||||||||
| log::error!( | ||||||||||||||||||||||||||||||||||||||||||||
| "dropping v1 error message sent to a binary websocket client: {:?}", | ||||||||||||||||||||||||||||||||||||||||||||
| message | ||||||||||||||||||||||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||||||||||||||||||||||
| continue; | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
| let (stats, in_use, mut frames) = ws_encode_message(config, buf, message, false, &bsatn_rlb_pool).await; | ||||||||||||||||||||||||||||||||||||||||||||
| metrics.report(None, None, stats); | ||||||||||||||||||||||||||||||||||||||||||||
| if frames.try_for_each(|frame| outgoing_frames.send(frame)).is_err() { | ||||||||||||||||||||||||||||||||||||||||||||
| break; | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| let Ok(in_use) = ws_forward_frames( | ||||||||||||||||||||||||||||||||||||||||||||
| &metrics, | ||||||||||||||||||||||||||||||||||||||||||||
| &outgoing_frames, | ||||||||||||||||||||||||||||||||||||||||||||
| None, | ||||||||||||||||||||||||||||||||||||||||||||
| None, | ||||||||||||||||||||||||||||||||||||||||||||
| ws_encode_message(config, buf, message, false, &bsatn_rlb_pool).await, | ||||||||||||||||||||||||||||||||||||||||||||
| ) else { | ||||||||||||||||||||||||||||||||||||||||||||
| break 'send; | ||||||||||||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||||||||||||
| in_use | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
| OutboundWsMessage::Message(message) => { | ||||||||||||||||||||||||||||||||||||||||||||
| let workload = message.workload(); | ||||||||||||||||||||||||||||||||||||||||||||
| let num_rows = message.num_rows(); | ||||||||||||||||||||||||||||||||||||||||||||
| match message { | ||||||||||||||||||||||||||||||||||||||||||||
| OutboundMessage::V2(server_message) => { | ||||||||||||||||||||||||||||||||||||||||||||
| if config.version != WsVersion::V2 { | ||||||||||||||||||||||||||||||||||||||||||||
| if config.version == WsVersion::V1 { | ||||||||||||||||||||||||||||||||||||||||||||
| log::error!("dropping v2 message on v1 connection"); | ||||||||||||||||||||||||||||||||||||||||||||
| continue; | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| let (stats, in_use, mut frames) = | ||||||||||||||||||||||||||||||||||||||||||||
| ws_encode_message_v2(config, buf, server_message, false, &bsatn_rlb_pool).await; | ||||||||||||||||||||||||||||||||||||||||||||
| metrics.report(workload, num_rows, stats); | ||||||||||||||||||||||||||||||||||||||||||||
| if frames.try_for_each(|frame| outgoing_frames.send(frame)).is_err() { | ||||||||||||||||||||||||||||||||||||||||||||
| break; | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| let Ok(in_use) = ws_forward_frames( | ||||||||||||||||||||||||||||||||||||||||||||
| &metrics, | ||||||||||||||||||||||||||||||||||||||||||||
| &outgoing_frames, | ||||||||||||||||||||||||||||||||||||||||||||
| workload, | ||||||||||||||||||||||||||||||||||||||||||||
| num_rows, | ||||||||||||||||||||||||||||||||||||||||||||
| ws_encode_binary_message( | ||||||||||||||||||||||||||||||||||||||||||||
| config, | ||||||||||||||||||||||||||||||||||||||||||||
| buf, | ||||||||||||||||||||||||||||||||||||||||||||
| server_message, | ||||||||||||||||||||||||||||||||||||||||||||
| binary_message_serializer.expect("v2 message should not be sent on a v1 connection"), | ||||||||||||||||||||||||||||||||||||||||||||
| false, | ||||||||||||||||||||||||||||||||||||||||||||
| &bsatn_rlb_pool, | ||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||
| .await, | ||||||||||||||||||||||||||||||||||||||||||||
| ) else { | ||||||||||||||||||||||||||||||||||||||||||||
| break 'send; | ||||||||||||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||||||||||||
| in_use | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
| OutboundMessage::V1(message) => { | ||||||||||||||||||||||||||||||||||||||||||||
| if config.version == WsVersion::V2 { | ||||||||||||||||||||||||||||||||||||||||||||
| log::error!( | ||||||||||||||||||||||||||||||||||||||||||||
| "dropping v1 message for v2 connection until v2 serialization is implemented: {:?}", | ||||||||||||||||||||||||||||||||||||||||||||
| message | ||||||||||||||||||||||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||||||||||||||||||||||
| if config.version != WsVersion::V1 { | ||||||||||||||||||||||||||||||||||||||||||||
| log::error!("dropping v1 message for a binary websocket connection: {:?}", message); | ||||||||||||||||||||||||||||||||||||||||||||
| continue; | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| let is_large = num_rows.is_some_and(|n| n > 1024); | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| let (stats, in_use, mut frames) = | ||||||||||||||||||||||||||||||||||||||||||||
| ws_encode_message(config, buf, message, is_large, &bsatn_rlb_pool).await; | ||||||||||||||||||||||||||||||||||||||||||||
| metrics.report(workload, num_rows, stats); | ||||||||||||||||||||||||||||||||||||||||||||
| if frames.try_for_each(|frame| outgoing_frames.send(frame)).is_err() { | ||||||||||||||||||||||||||||||||||||||||||||
| break; | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| let Ok(in_use) = ws_forward_frames( | ||||||||||||||||||||||||||||||||||||||||||||
| &metrics, | ||||||||||||||||||||||||||||||||||||||||||||
| &outgoing_frames, | ||||||||||||||||||||||||||||||||||||||||||||
| workload, | ||||||||||||||||||||||||||||||||||||||||||||
| num_rows, | ||||||||||||||||||||||||||||||||||||||||||||
| ws_encode_message(config, buf, message, is_large, &bsatn_rlb_pool).await, | ||||||||||||||||||||||||||||||||||||||||||||
| ) else { | ||||||||||||||||||||||||||||||||||||||||||||
| break 'send; | ||||||||||||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||||||||||||
| in_use | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -1370,6 +1400,24 @@ async fn ws_encode_task( | |||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| /// Reports encode metrics for an already-encoded message and forwards all of | ||||||||||||||||||||||||||||||||||||||||||||
| /// its frames to the websocket send task. | ||||||||||||||||||||||||||||||||||||||||||||
| fn ws_forward_frames<I>( | ||||||||||||||||||||||||||||||||||||||||||||
| metrics: &SendMetrics, | ||||||||||||||||||||||||||||||||||||||||||||
| outgoing_frames: &mpsc::UnboundedSender<Frame>, | ||||||||||||||||||||||||||||||||||||||||||||
| workload: Option<WorkloadType>, | ||||||||||||||||||||||||||||||||||||||||||||
| num_rows: Option<usize>, | ||||||||||||||||||||||||||||||||||||||||||||
| encoded: (EncodeMetrics, InUseSerializeBuffer, I), | ||||||||||||||||||||||||||||||||||||||||||||
| ) -> Result<InUseSerializeBuffer, mpsc::error::SendError<Frame>> | ||||||||||||||||||||||||||||||||||||||||||||
| where | ||||||||||||||||||||||||||||||||||||||||||||
| I: Iterator<Item = Frame>, | ||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+1403
to
+1414
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
(not so important, just a nit) |
||||||||||||||||||||||||||||||||||||||||||||
| let (stats, in_use, frames) = encoded; | ||||||||||||||||||||||||||||||||||||||||||||
| metrics.report(workload, num_rows, stats); | ||||||||||||||||||||||||||||||||||||||||||||
| frames.into_iter().try_for_each(|frame| outgoing_frames.send(frame))?; | ||||||||||||||||||||||||||||||||||||||||||||
| Ok(in_use) | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| /// Some stats about serialization and compression. | ||||||||||||||||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||||||||||||||||
| /// Returned by [`ws_encode_message`]. | ||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -1443,21 +1491,29 @@ async fn ws_encode_message( | |||||||||||||||||||||||||||||||||||||||||||
| (metrics, msg_alloc, frames) | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| #[allow(dead_code, unused_variables)] | ||||||||||||||||||||||||||||||||||||||||||||
| async fn ws_encode_message_v2( | ||||||||||||||||||||||||||||||||||||||||||||
| type BinarySerializeFn = fn( | ||||||||||||||||||||||||||||||||||||||||||||
| &BsatnRowListBuilderPool, | ||||||||||||||||||||||||||||||||||||||||||||
| SerializeBuffer, | ||||||||||||||||||||||||||||||||||||||||||||
| ws_v2::ServerMessage, | ||||||||||||||||||||||||||||||||||||||||||||
| ws_v1::Compression, | ||||||||||||||||||||||||||||||||||||||||||||
| ) -> (InUseSerializeBuffer, Bytes); | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| async fn ws_encode_binary_message( | ||||||||||||||||||||||||||||||||||||||||||||
| config: ClientConfig, | ||||||||||||||||||||||||||||||||||||||||||||
| buf: SerializeBuffer, | ||||||||||||||||||||||||||||||||||||||||||||
| message: ws_v2::ServerMessage, | ||||||||||||||||||||||||||||||||||||||||||||
| serialize_message: BinarySerializeFn, | ||||||||||||||||||||||||||||||||||||||||||||
| is_large_message: bool, | ||||||||||||||||||||||||||||||||||||||||||||
| bsatn_rlb_pool: &BsatnRowListBuilderPool, | ||||||||||||||||||||||||||||||||||||||||||||
| ) -> (EncodeMetrics, InUseSerializeBuffer, impl Iterator<Item = Frame> + use<>) { | ||||||||||||||||||||||||||||||||||||||||||||
| let start = Instant::now(); | ||||||||||||||||||||||||||||||||||||||||||||
| let compression = config.compression; | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| let (in_use, data) = if is_large_message { | ||||||||||||||||||||||||||||||||||||||||||||
| let bsatn_rlb_pool = bsatn_rlb_pool.clone(); | ||||||||||||||||||||||||||||||||||||||||||||
| spawn_rayon(move || serialize_v2(&bsatn_rlb_pool, buf, message, config.compression)).await | ||||||||||||||||||||||||||||||||||||||||||||
| spawn_rayon(move || serialize_message(&bsatn_rlb_pool, buf, message, compression)).await | ||||||||||||||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||||||||||||||
| serialize_v2(bsatn_rlb_pool, buf, message, config.compression) | ||||||||||||||||||||||||||||||||||||||||||||
| serialize_message(bsatn_rlb_pool, buf, message, compression) | ||||||||||||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| let metrics = EncodeMetrics { | ||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -2298,9 +2354,11 @@ mod tests { | |||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| #[test] | ||||||||||||||||||||||||||||||||||||||||||||
| fn confirmed_reads_default_depends_on_ws_version() { | ||||||||||||||||||||||||||||||||||||||||||||
| assert!(resolve_confirmed_reads_default(WsVersion::V3, None)); | ||||||||||||||||||||||||||||||||||||||||||||
| assert!(resolve_confirmed_reads_default(WsVersion::V2, None)); | ||||||||||||||||||||||||||||||||||||||||||||
| assert!(!resolve_confirmed_reads_default(WsVersion::V1, None)); | ||||||||||||||||||||||||||||||||||||||||||||
| assert!(resolve_confirmed_reads_default(WsVersion::V1, Some(true))); | ||||||||||||||||||||||||||||||||||||||||||||
| assert!(!resolve_confirmed_reads_default(WsVersion::V3, Some(false))); | ||||||||||||||||||||||||||||||||||||||||||||
| assert!(!resolve_confirmed_reads_default(WsVersion::V2, Some(false))); | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,32 @@ | ||
| use super::{ClientConnection, DataMessage, MessageHandleError}; | ||
| use serde::de::Error as _; | ||
| use spacetimedb_client_api_messages::websocket::{v2 as ws_v2, v3 as ws_v3}; | ||
| use spacetimedb_lib::bsatn; | ||
| use std::time::Instant; | ||
|
|
||
| pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Instant) -> Result<(), MessageHandleError> { | ||
| client.observe_websocket_request_message(&message); | ||
| let frame = match message { | ||
| DataMessage::Binary(message_buf) => bsatn::from_slice::<ws_v3::ClientFrame>(&message_buf)?, | ||
| DataMessage::Text(_) => { | ||
| return Err(MessageHandleError::TextDecode(serde_json::Error::custom( | ||
| "v3 websocket does not support text messages", | ||
| ))) | ||
| } | ||
| }; | ||
|
|
||
| match frame { | ||
| ws_v3::ClientFrame::Single(message) => { | ||
| let message = bsatn::from_slice::<ws_v2::ClientMessage>(&message)?; | ||
| super::message_handlers_v2::handle_decoded_message(client, message, timer).await?; | ||
| } | ||
| ws_v3::ClientFrame::Batch(messages) => { | ||
| for message in messages { | ||
| let message = bsatn::from_slice::<ws_v2::ClientMessage>(&message)?; | ||
| super::message_handlers_v2::handle_decoded_message(client, message, timer).await?; | ||
| } | ||
| } | ||
| } | ||
|
Comment on lines
+9
to
+29
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If |
||
|
|
||
| Ok(()) | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you prefer adding a new version with a wrapper, rather than adding new message variants to
v2?