From 25763151bd7859a89801580694c8e68ee1f0fd5f Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Wed, 8 Apr 2026 23:34:00 -0700 Subject: [PATCH 1/3] Merge durability actors to reduce per-tx scheduler handoffs --- crates/core/src/db/durability.rs | 425 +++++++----------- crates/core/src/db/persistence.rs | 27 +- crates/core/src/db/relational_db.rs | 36 +- .../subscription/module_subscription_actor.rs | 1 + crates/durability/src/imp/local.rs | 151 +++++-- crates/durability/src/lib.rs | 177 +++++++- crates/snapshot/tests/remote.rs | 1 + 7 files changed, 512 insertions(+), 306 deletions(-) diff --git a/crates/core/src/db/durability.rs b/crates/core/src/db/durability.rs index 9c78d3d0b33..84c7f9b4f53 100644 --- a/crates/core/src/db/durability.rs +++ b/crates/core/src/db/durability.rs @@ -1,4 +1,4 @@ -use std::{cmp::Reverse, collections::BinaryHeap, iter, num::NonZeroUsize, sync::Arc, time::Duration}; +use std::{num::NonZeroUsize, sync::Arc, time::Duration}; use futures::TryFutureExt as _; use log::{error, info}; @@ -8,9 +8,10 @@ use spacetimedb_commitlog::payload::{ Txdata, }; use spacetimedb_datastore::{execution_context::ReducerContext, traits::TxData}; -use spacetimedb_durability::{DurableOffset, Transaction, TxOffset}; +use spacetimedb_durability::Durability as _; +use spacetimedb_durability::{DurableOffset, ReorderWindow, Transaction, TxOffset}; use spacetimedb_lib::Identity; -use thiserror::Error; +use spacetimedb_sats::ProductValue; use tokio::{ runtime, sync::{ @@ -22,7 +23,10 @@ use tokio::{ }; use tracing::{info_span, Instrument as _}; -use crate::{db::persistence::Durability, worker_metrics::WORKER_METRICS}; +use crate::{ + db::{persistence::Durability, relational_db::LocalDurability}, + worker_metrics::WORKER_METRICS, +}; /// A request to persist a transaction or to terminate the actor. pub struct DurabilityRequest { @@ -69,10 +73,19 @@ type ShutdownReply = oneshot::Sender; /// [RelationalDB]: crate::db::relational_db::RelationalDB pub struct DurabilityWorker { database: Identity, - request_tx: Sender, - shutdown: Sender, - durability: Arc, runtime: runtime::Handle, + inner: DurabilityWorkerInner, +} + +enum DurabilityWorkerInner { + Generic { + request_tx: Sender, + shutdown: Sender, + durability: Arc, + }, + Local { + durability: LocalDurability, + }, } impl DurabilityWorker { @@ -107,10 +120,23 @@ impl DurabilityWorker { Self { database, - request_tx, - shutdown: shutdown_tx, - durability, runtime, + inner: DurabilityWorkerInner::Generic { + request_tx, + shutdown: shutdown_tx, + durability, + }, + } + } + + /// Create a [`DurabilityWorker`] that uses the local commitlog durability + /// actor directly. This removes the extra core durability actor so the + /// local path has only one queued background worker. + pub fn new_local(database: Identity, durability: LocalDurability, runtime: runtime::Handle) -> Self { + Self { + database, + runtime, + inner: DurabilityWorkerInner::Local { durability }, } } @@ -123,8 +149,8 @@ impl DurabilityWorker { /// this method is responsible only for reading its decision out of the `tx_data` /// and calling `durability.append_tx`. /// - /// This method sends the work to an actor that collects data and calls `durability.append_tx`. - /// It blocks if the queue is at capacity. + /// This method queues the work for durability processing. + /// It blocks if the active queue is at capacity. /// /// # Panics /// @@ -135,45 +161,74 @@ impl DurabilityWorker { /// - [Self::shutdown] was called /// pub fn request_durability(&self, reducer_context: Option, tx_data: &Arc) { - // We first try to send it without blocking. - match self.request_tx.try_reserve() { - Ok(permit) => { - permit.send(DurabilityRequest { - reducer_context, - tx_data: tx_data.clone(), - }); - } - Err(mpsc::error::TrySendError::Closed(_)) => { - panic!("durability actor vanished database={}", self.database); + match &self.inner { + DurabilityWorkerInner::Generic { request_tx, .. } => { + // We first try to send it without blocking. + match request_tx.try_reserve() { + Ok(permit) => { + permit.send(DurabilityRequest { + reducer_context, + tx_data: tx_data.clone(), + }); + } + Err(mpsc::error::TrySendError::Closed(_)) => { + panic!("durability actor vanished database={}", self.database); + } + Err(mpsc::error::TrySendError::Full(_)) => { + // If the channel was full, we use the blocking version. + let start = std::time::Instant::now(); + let send = || { + request_tx.blocking_send(DurabilityRequest { + reducer_context, + tx_data: tx_data.clone(), + }) + }; + if tokio::runtime::Handle::try_current().is_ok() { + tokio::task::block_in_place(send) + } else { + send() + } + .unwrap_or_else(|_| panic!("durability actor vanished database={}", self.database)); + // We could cache this metric, but if we are already in the blocking code path, + // the extra time of looking up the metric is probably negligible. + WORKER_METRICS + .durability_blocking_send_duration + .with_label_values(&self.database) + .observe(start.elapsed().as_secs_f64()); + } + } } - Err(mpsc::error::TrySendError::Full(_)) => { - // If the channel was full, we use the blocking version. - let start = std::time::Instant::now(); - let send = || { - self.request_tx.blocking_send(DurabilityRequest { - reducer_context, - tx_data: tx_data.clone(), - }) + DurabilityWorkerInner::Local { durability } => { + let Some(tx_offset) = tx_data.tx_offset() else { + let name = reducer_context.as_ref().map(|rcx| &rcx.name); + debug_assert!( + !tx_data.has_rows_or_connect_disconnect(name), + "tx_data has no rows but has connect/disconnect: `{name:?}`" + ); + return; }; - if tokio::runtime::Handle::try_current().is_ok() { - tokio::task::block_in_place(send) - } else { - send() + + let start = std::time::Instant::now(); + let tx_data = tx_data.clone(); + let blocked = durability.append_tx_deferred(tx_offset, move || { + prepare_tx_data_for_durability(tx_offset, reducer_context, &tx_data) + }); + if blocked { + WORKER_METRICS + .durability_blocking_send_duration + .with_label_values(&self.database) + .observe(start.elapsed().as_secs_f64()); } - .unwrap_or_else(|_| panic!("durability actor vanished database={}", self.database)); - // We could cache this metric, but if we are already in the blocking code path, - // the extra time of looking up the metric is probably negligible. - WORKER_METRICS - .durability_blocking_send_duration - .with_label_values(&self.database) - .observe(start.elapsed().as_secs_f64()); } } } /// Get the [`DurableOffset`] of this database. pub fn durable_tx_offset(&self) -> DurableOffset { - self.durability.durable_tx_offset() + match &self.inner { + DurabilityWorkerInner::Generic { durability, .. } => durability.durable_tx_offset(), + DurabilityWorkerInner::Local { durability } => durability.durable_tx_offset(), + } } /// Shut down the worker without dropping it, @@ -187,20 +242,26 @@ impl DurabilityWorker { /// After this method was called, calling [Self::request_durability] /// will panic. pub async fn close(&self) -> Option { - let (done_tx, done_rx) = oneshot::channel(); - // Channel errors can be ignored. - // It just means that the actor already exited. - let _ = self - .shutdown - .send(done_tx) - .map_err(drop) - .and_then(|()| done_rx.map_err(drop)) - .and_then(|done| async move { - done.await; - Ok(()) - }) - .await; - self.durability.close().await + match &self.inner { + DurabilityWorkerInner::Generic { + shutdown, durability, .. + } => { + let (done_tx, done_rx) = oneshot::channel(); + // Channel errors can be ignored. + // It just means that the actor already exited. + let _ = shutdown + .send(done_tx) + .map_err(drop) + .and_then(|()| done_rx.map_err(drop)) + .and_then(|done| async move { + done.await; + Ok(()) + }) + .await; + durability.close().await + } + DurabilityWorkerInner::Local { durability } => durability.close().await, + } } /// Consume `self` and run [Self::close]. @@ -237,78 +298,6 @@ impl DurabilityWorker { } } -#[derive(Debug, Error)] -enum ReorderError { - #[error("reordering window exceeded")] - SizeExceeded, - #[error("transaction offset behind expected offset")] - TxBehind, -} - -/// A bounded collection of elements ordered by [TxOffset], backed by a [BinaryHeap]. -/// -/// This exists to tolerate slightly out-of-order requests. -/// See the struct docs for [DurabilityWorker] for more context. -struct ReorderWindow { - heap: BinaryHeap>>, - next_tx: TxOffset, - max_len: NonZeroUsize, -} - -impl ReorderWindow { - pub fn new(next_tx: TxOffset, max_len: NonZeroUsize) -> Self { - // We expect that requests usually arrive in order, - // so allocate only a single element for the common case. - let heap = BinaryHeap::with_capacity(1); - Self { heap, next_tx, max_len } - } - - /// Push a durability request onto the heap. - /// - /// # Errors - /// - /// The method returns an error if: - /// - /// - the window is full, i.e. `self.len() >= self.max_len` - /// - the `tx_offset` of the request is smaller than the next expected offset - /// - pub fn push(&mut self, req: TxOrdered) -> Result<(), ReorderError> { - if self.len() >= self.max_len.get() { - return Err(ReorderError::SizeExceeded); - } - if req.tx_offset < self.next_tx { - return Err(ReorderError::TxBehind); - } - // We've got an out-of-order request, - // eagerly allocate the max capacity. - if self.len() > 0 { - self.heap.reserve_exact(self.max_len.get()); - } - self.heap.push(Reverse(req)); - - Ok(()) - } - - /// Remove all [DurabilityRequest]s in order, until a gap in the offset - /// sequence is detected or the heap is empty. - pub fn drain(&mut self) -> impl Iterator { - iter::from_fn(|| { - let min_tx_offset = self.heap.peek().map(|Reverse(x)| x.tx_offset); - if min_tx_offset.is_some_and(|tx_offset| tx_offset == self.next_tx) { - let Reverse(TxOrdered { inner: request, .. }) = self.heap.pop().unwrap(); - self.next_tx += 1; - Some(request) - } else { - None - } - }) - } - - pub fn len(&self) -> usize { - self.heap.len() - } -} - pub struct DurabilityWorkerActor { request_rx: mpsc::Receiver, shutdown: Receiver, @@ -354,8 +343,7 @@ impl DurabilityWorkerActor { }, // Otherwise, push to the reordering window. Some(tx_offset) => { - let request = TxOrdered { tx_offset, inner: request }; - if let Err(e) = self.reorder_window.push(request) { + if let Err(e) = self.reorder_window.push(tx_offset, request) { error!("{e}"); break; } @@ -378,74 +366,55 @@ impl DurabilityWorkerActor { let tx_offset = tx_data .tx_offset() .expect("txs without offset should have been dropped"); - - let mut inserts: Box<_> = tx_data - .persistent_inserts() - .map(|(table_id, rowdata)| Ops { table_id, rowdata }) - .collect(); - // What we get from `tx_data` is not necessarily sorted, - // but the durability layer expects by-table_id sorted data. - // Unstable sorts are valid, there will only ever be one entry per table_id. - inserts.sort_unstable_by_key(|ops| ops.table_id); - - let mut deletes: Box<_> = tx_data - .persistent_deletes() - .map(|(table_id, rowdata)| Ops { table_id, rowdata }) - .collect(); - deletes.sort_unstable_by_key(|ops| ops.table_id); - - let mut truncates: Box<[_]> = tx_data.persistent_truncates().collect(); - truncates.sort_unstable_by_key(|table_id| *table_id); - - let inputs = reducer_context.map(|rcx| rcx.into()); - - debug_assert!( - !(inserts.is_empty() && truncates.is_empty() && deletes.is_empty() && inputs.is_none()), - "empty transaction" - ); - - let txdata = Txdata { - inputs, - outputs: None, - mutations: Some(Mutations { - inserts, - deletes, - truncates, - }), - }; - + let tx = prepare_tx_data_for_durability(tx_offset, reducer_context, tx_data); // This does not block, as per trait docs. - durability.append_tx(Transaction { - offset: tx_offset, - txdata, - }); + durability.append_tx(tx); } } -/// Wrapper to sort [DurabilityRequest]s by [TxOffset]. -struct TxOrdered { +fn prepare_tx_data_for_durability( tx_offset: TxOffset, - inner: T, -} - -impl PartialEq for TxOrdered { - fn eq(&self, other: &Self) -> bool { - self.tx_offset == other.tx_offset - } -} - -impl Eq for TxOrdered {} - -#[allow(clippy::non_canonical_partial_ord_impl)] -impl PartialOrd for TxOrdered { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.tx_offset.cmp(&other.tx_offset)) - } -} - -impl Ord for TxOrdered { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.partial_cmp(other).unwrap() + reducer_context: Option, + tx_data: &TxData, +) -> Transaction> { + let mut inserts: Box<_> = tx_data + .persistent_inserts() + .map(|(table_id, rowdata)| Ops { table_id, rowdata }) + .collect(); + // What we get from `tx_data` is not necessarily sorted, + // but the durability layer expects by-table_id sorted data. + // Unstable sorts are valid, there will only ever be one entry per table_id. + inserts.sort_unstable_by_key(|ops| ops.table_id); + + let mut deletes: Box<_> = tx_data + .persistent_deletes() + .map(|(table_id, rowdata)| Ops { table_id, rowdata }) + .collect(); + deletes.sort_unstable_by_key(|ops| ops.table_id); + + let mut truncates: Box<[_]> = tx_data.persistent_truncates().collect(); + truncates.sort_unstable_by_key(|table_id| *table_id); + + let inputs = reducer_context.map(|rcx| rcx.into()); + + debug_assert!( + !(inserts.is_empty() && truncates.is_empty() && deletes.is_empty() && inputs.is_none()), + "empty transaction" + ); + + let txdata = Txdata { + inputs, + outputs: None, + mutations: Some(Mutations { + inserts, + deletes, + truncates, + }), + }; + + Transaction { + offset: tx_offset, + txdata, } } @@ -547,9 +516,9 @@ mod tests { durability.mark_durable(10).await; assert_matches!( - futures::poll!(&mut shutdown_fut), - Poll::Ready(Some(10)), - "shutdown returns, reporting durable offset at 10" + timeout(Duration::from_secs(1), shutdown_fut).await, + Ok(Some(10)), + "shutdown should complete shortly after durable catches up" ); assert_eq!( Some(10), @@ -557,68 +526,4 @@ mod tests { "durability should have appended up to tx offset 10" ); } - - #[test] - fn reorder_window_sorts_by_tx_offset() { - let mut win = ReorderWindow::new(0, NonZeroUsize::new(5).unwrap()); - - for tx_offset in (0..5).rev() { - win.push(TxOrdered { - tx_offset, - inner: tx_offset, - }) - .unwrap(); - } - - let txs = win.drain().collect::>(); - assert_eq!(txs, &[0, 1, 2, 3, 4]); - } - - #[test] - fn reorder_window_stops_drain_at_gap() { - let mut win = ReorderWindow::new(0, NonZeroUsize::new(5).unwrap()); - - win.push(TxOrdered { tx_offset: 4, inner: 4 }).unwrap(); - assert!(win.drain().collect::>().is_empty()); - - for tx_offset in 0..4 { - win.push(TxOrdered { - tx_offset, - inner: tx_offset, - }) - .unwrap(); - } - - let txs = win.drain().collect::>(); - assert_eq!(&txs, &[0, 1, 2, 3, 4]); - } - - #[test] - fn reorder_window_error_when_full() { - let mut win = ReorderWindow::new(0, NonZeroUsize::new(1).unwrap()); - win.push(TxOrdered { - tx_offset: 0, - inner: (), - }) - .unwrap(); - assert_matches!( - win.push(TxOrdered { - tx_offset: 1, - inner: () - }), - Err(ReorderError::SizeExceeded) - ); - } - - #[test] - fn reorder_window_error_on_late_request() { - let mut win = ReorderWindow::new(1, NonZeroUsize::new(5).unwrap()); - assert_matches!( - win.push(TxOrdered { - tx_offset: 0, - inner: () - }), - Err(ReorderError::TxBehind) - ); - } } diff --git a/crates/core/src/db/persistence.rs b/crates/core/src/db/persistence.rs index e837506da38..ed3ffdd2698 100644 --- a/crates/core/src/db/persistence.rs +++ b/crates/core/src/db/persistence.rs @@ -9,7 +9,7 @@ use spacetimedb_snapshot::SnapshotRepository; use crate::{messages::control_db::Database, util::asyncify}; use super::{ - relational_db::{self, Txdata}, + relational_db::{self, LocalDurability, Txdata}, snapshot::{self, SnapshotDatabaseState, SnapshotWorker}, }; @@ -30,6 +30,10 @@ pub type DiskSizeFn = Arc io::Result + Send + Sync>; pub struct Persistence { /// The [Durability] to use, for persisting transactions. pub durability: Arc, + /// The concrete local durability handle, when the database uses the built-in + /// local commitlog path. This allows the core DB layer to bypass helper + /// actors on the local hot path without changing the generic trait object. + pub local_durability: Option, /// The [DiskSizeFn]. /// /// Currently the expectation is that the reported size is the commitlog @@ -55,6 +59,7 @@ impl Persistence { ) -> Self { Self { durability: Arc::new(durability), + local_durability: None, disk_size: Arc::new(disk_size), snapshots, runtime, @@ -83,12 +88,14 @@ impl Persistence { /// Convenience to deconstruct an [Option] into parts. /// - /// Returns `(Some(durability), Some(disk_size), Option, Some(runtime))` - /// if `this` is `Some`, and `(None, None, None, None)` if `this` is `None`. + /// Returns `(Some(durability), local_durability, Some(disk_size), Option, Some(runtime))` + /// if `this` is `Some`, and `(None, None, None, None, None)` if `this` is `None`. + #[allow(clippy::type_complexity)] pub(super) fn unzip( this: Option, ) -> ( Option>, + Option, Option, Option, Option, @@ -96,10 +103,19 @@ impl Persistence { this.map( |Self { durability, + local_durability, disk_size, snapshots, runtime, - }| (Some(durability), Some(disk_size), snapshots, Some(runtime)), + }| { + ( + Some(durability), + local_durability, + Some(disk_size), + snapshots, + Some(runtime), + ) + }, ) .unwrap_or_default() } @@ -159,7 +175,8 @@ impl PersistenceProvider for LocalPersistenceProvider { )); Ok(Persistence { - durability, + durability: durability.clone(), + local_durability: Some(durability), disk_size, snapshots: Some(snapshot_worker), runtime: tokio::runtime::Handle::current(), diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index f1c70bec8ee..f50ad84f4ae 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -122,6 +122,7 @@ pub struct RelationalDB { // this value, later introduction of dynamic configuration will allow the // compiler to find external dependencies. pub const SNAPSHOT_FREQUENCY: u64 = 1_000_000; +const GENERIC_DURABILITY_REORDER_WINDOW_SIZE: NonZeroUsize = NonZeroUsize::new(8).unwrap(); impl std::fmt::Debug for RelationalDB { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -151,17 +152,28 @@ impl RelationalDB { let workload_type_to_exec_counters = Arc::new(EnumMap::from_fn(|ty| ExecutionCounters::new(&ty, &database_identity))); - let (durability, disk_size_fn, snapshot_worker, rt) = Persistence::unzip(persistence); - let durability = durability.zip(rt).map(|(durability, rt)| { - let next_tx_offset = { - let tx = inner.begin_tx(Workload::Internal); - let next_tx_offset = tx.tx_offset(); - let _ = inner.release_tx(tx); - next_tx_offset.into_inner() - }; - let reorder_window_size = NonZeroUsize::new(8).unwrap(); - DurabilityWorker::new(database_identity, durability, rt, next_tx_offset, reorder_window_size) - }); + let (durability, local_durability, disk_size_fn, snapshot_worker, rt) = Persistence::unzip(persistence); + let durability = match (local_durability, durability, rt) { + (Some(local_durability), _, Some(rt)) => { + Some(DurabilityWorker::new_local(database_identity, local_durability, rt)) + } + (None, Some(durability), Some(rt)) => { + let next_tx_offset = { + let tx = inner.begin_tx(Workload::Internal); + let next_tx_offset = tx.tx_offset(); + let _ = inner.release_tx(tx); + next_tx_offset.into_inner() + }; + Some(DurabilityWorker::new( + database_identity, + durability, + rt, + next_tx_offset, + GENERIC_DURABILITY_REORDER_WINDOW_SIZE, + )) + } + _ => None, + }; Self { inner, @@ -1974,6 +1986,7 @@ pub mod tests_utils { let persistence = Persistence { durability: local.clone(), + local_durability: Some(local.clone()), disk_size: disk_size_fn, snapshots, runtime: rt, @@ -2095,6 +2108,7 @@ pub mod tests_utils { let history = local.as_history(); let persistence = Persistence { durability: local.clone(), + local_durability: Some(local.clone()), disk_size: disk_size_fn, snapshots, runtime: rt, diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 92f296f3b8c..35b39839f8a 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -2043,6 +2043,7 @@ mod tests { EmptyHistory::new(), Some(Persistence { durability: durability.clone(), + local_durability: None, disk_size: Arc::new(|| Ok(<_>::default())), snapshots: None, runtime: rt, diff --git a/crates/durability/src/imp/local.rs b/crates/durability/src/imp/local.rs index 743740f6242..f29c461aacd 100644 --- a/crates/durability/src/imp/local.rs +++ b/crates/durability/src/imp/local.rs @@ -22,7 +22,7 @@ use tokio::{ }; use tracing::{instrument, Span}; -use crate::{Close, Durability, DurableOffset, History, TxOffset}; +use crate::{Close, Durability, DurableOffset, History, ReorderWindow, TxOffset}; pub use spacetimedb_commitlog::repo::{OnNewSegmentFn, SizeOnDisk}; @@ -38,7 +38,8 @@ pub struct Options { /// transactions that are currently in the queue, but shrink the buffer to /// `batch_capacity` if it had to make additional space during a burst. /// - /// The internal queue of [Local] is bounded to `2 * batch_capacity`. + /// The internal queue of [Local] is bounded to + /// `Options::QUEUE_CAPACITY_MULTIPLIER * batch_capacity`. /// /// Default: 4096 pub batch_capacity: NonZeroUsize, @@ -48,6 +49,11 @@ pub struct Options { impl Options { pub const DEFAULT_BATCH_CAPACITY: NonZeroUsize = NonZeroUsize::new(4096).unwrap(); + pub const QUEUE_CAPACITY_MULTIPLIER: usize = 4; + + fn queue_capacity(self) -> usize { + Self::QUEUE_CAPACITY_MULTIPLIER * self.batch_capacity.get() + } } impl Default for Options { @@ -69,6 +75,43 @@ pub enum OpenError { type ShutdownReply = oneshot::Sender; +enum QueueItem { + Ready(Transaction>), + Deferred { + tx_offset: TxOffset, + prepare: Box>, + }, +} + +impl QueueItem { + fn tx_offset(&self) -> TxOffset { + match self { + Self::Ready(tx) => tx.offset, + Self::Deferred { tx_offset, .. } => *tx_offset, + } + } + + fn prepare(self) -> Transaction> { + match self { + Self::Ready(tx) => tx, + Self::Deferred { prepare, .. } => prepare.prepare(), + } + } +} + +trait DeferredTx: Send + Sync { + fn prepare(self: Box) -> Transaction>; +} + +impl DeferredTx for F +where + F: FnOnce() -> Transaction> + Send + Sync + 'static, +{ + fn prepare(self: Box) -> Transaction> { + self() + } +} + /// [`Durability`] implementation backed by a [`Commitlog`] on local storage. /// /// The commitlog is constrained to store the canonical [`Txdata`] payload, @@ -89,9 +132,11 @@ pub struct Local { /// Backlog of transactions to be written to disk by the background /// [`PersisterTask`]. /// - /// The queue is bounded to `4 * Option::batch_capacity`. - queue: mpsc::Sender>>, - /// How many transactions are sitting in the `queue`. + /// The queue is bounded to + /// `Options::QUEUE_CAPACITY_MULTIPLIER * Options::batch_capacity`. + queue: mpsc::Sender>, + /// How many transactions are pending durability, including items buffered + /// in the queue and items waiting in the reorder window. /// /// This is mainly for observability purposes, and can thus be updated with /// relaxed memory ordering. @@ -128,7 +173,12 @@ impl Local { opts.commitlog, on_new_segment, )?); - let (queue, txdata_rx) = mpsc::channel(4 * opts.batch_capacity.get()); + let next_tx_offset = clog.max_committed_offset().map_or(0, |offset| offset + 1); + let queue_capacity = opts.queue_capacity(); + // The reorder window should be the same size as the queue capacity + let reorder_window_size = + NonZeroUsize::new(queue_capacity).expect("local durability queue capacity is non-zero"); + let (queue, txdata_rx) = mpsc::channel(queue_capacity); let queue_depth = Arc::new(AtomicU64::new(0)); let (durable_tx, durable_rx) = watch::channel(clog.max_committed_offset()); let (shutdown_tx, shutdown_rx) = mpsc::channel(1); @@ -142,6 +192,7 @@ impl Local { queue_depth: queue_depth.clone(), batch_capacity: opts.batch_capacity, + reorder_window: ReorderWindow::new(next_tx_offset, reorder_window_size), lock, } @@ -200,6 +251,7 @@ struct Actor { queue_depth: Arc, batch_capacity: NonZeroUsize, + reorder_window: ReorderWindow>, #[allow(unused)] lock: Lock, @@ -208,8 +260,8 @@ struct Actor { impl Actor { #[instrument(name = "durability::local::actor", skip_all)] async fn run( - self, - mut transactions_rx: mpsc::Receiver>>, + mut self, + mut transactions_rx: mpsc::Receiver>, mut shutdown_rx: mpsc::Receiver>, ) { info!("starting durability actor"); @@ -239,12 +291,31 @@ impl Actor { if n == 0 { break; } - self.queue_depth.fetch_sub(n as u64, Relaxed); + // `recv_many` preserves arrival order, not tx order. + // Sort first so gaps already present in this batch can close immediately. + tx_buf.sort_unstable_by_key(QueueItem::tx_offset); + let ordered = tx_buf.drain(..).map(|item| (item.tx_offset(), item)); + match self.reorder_window.push_batch_ready(ordered) { + Ok(ready) => tx_buf = ready, + Err(e) => { + log::error!("durability actor reorder failure: {e}"); + sync_on_exit = false; + break; + } + } + if tx_buf.is_empty() { + continue; + } + let clog = self.clog.clone(); - tx_buf = spawn_blocking(move || -> io::Result>>> { - for tx in tx_buf.drain(..) { - clog.commit([tx])?; + let ready_len = tx_buf.len(); + self.queue_depth.fetch_sub(ready_len as u64, Relaxed); + tx_buf = spawn_blocking(move || -> io::Result>> { + let mut txs = Vec::with_capacity(tx_buf.len()); + for item in tx_buf.drain(..) { + txs.push(item.prepare()); } + clog.commit(txs)?; Ok(tx_buf) }) .await @@ -330,23 +401,7 @@ impl Durability for Local { type TxData = Txdata; fn append_tx(&self, tx: Transaction) { - match self.queue.try_reserve() { - Ok(permit) => permit.send(tx), - Err(mpsc::error::TrySendError::Closed(_)) => { - panic!("durability actor crashed"); - } - Err(mpsc::error::TrySendError::Full(_)) => { - let send = || self.queue.blocking_send(tx); - if tokio::runtime::Handle::try_current().is_ok() { - tokio::task::block_in_place(send) - } else { - send() - } - .expect("durability actor crashed"); - } - } - - self.queue_depth.fetch_add(1, Relaxed); + let _ = self.send_item(QueueItem::Ready(tx)); } fn durable_tx_offset(&self) -> DurableOffset { @@ -385,6 +440,44 @@ impl Durability for Local { } } +impl Local { + fn send_item(&self, item: QueueItem) -> bool { + let blocked = match self.queue.try_reserve() { + Ok(permit) => { + permit.send(item); + false + } + Err(mpsc::error::TrySendError::Closed(_)) => { + panic!("durability actor crashed"); + } + Err(mpsc::error::TrySendError::Full(_)) => { + let send = || self.queue.blocking_send(item); + if tokio::runtime::Handle::try_current().is_ok() { + tokio::task::block_in_place(send) + } else { + send() + } + .expect("durability actor crashed"); + true + } + }; + + self.queue_depth.fetch_add(1, Relaxed); + blocked + } + + pub fn append_tx_deferred( + &self, + tx_offset: TxOffset, + prepare: impl FnOnce() -> Transaction> + Send + Sync + 'static, + ) -> bool { + self.send_item(QueueItem::Deferred { + tx_offset, + prepare: Box::new(prepare), + }) + } +} + impl History for Commitlog> { type TxData = Txdata; diff --git a/crates/durability/src/lib.rs b/crates/durability/src/lib.rs index 7722d86914a..6a267e9c873 100644 --- a/crates/durability/src/lib.rs +++ b/crates/durability/src/lib.rs @@ -1,4 +1,4 @@ -use std::{iter, marker::PhantomData, sync::Arc}; +use std::{cmp::Reverse, collections::BinaryHeap, iter, marker::PhantomData, num::NonZeroUsize, sync::Arc}; use futures::future::BoxFuture; use thiserror::Error; @@ -19,6 +19,114 @@ pub use imp::*; /// of all offsets smaller than it. pub type TxOffset = u64; +#[derive(Debug, Error)] +pub enum ReorderError { + #[error("reordering window exceeded")] + SizeExceeded, + #[error("transaction offset behind expected offset")] + TxBehind, +} + +/// A bounded collection of elements ordered by [TxOffset], backed by a [BinaryHeap]. +/// +/// This exists to tolerate slightly out-of-order transaction requests while +/// still preserving contiguous commit order. +pub struct ReorderWindow { + heap: BinaryHeap>>, + next_tx: TxOffset, + max_len: NonZeroUsize, +} + +impl ReorderWindow { + pub fn new(next_tx: TxOffset, max_len: NonZeroUsize) -> Self { + Self { + heap: BinaryHeap::with_capacity(1), + next_tx, + max_len, + } + } + + pub fn push(&mut self, tx_offset: TxOffset, inner: T) -> Result<(), ReorderError> { + if self.len() >= self.max_len.get() { + return Err(ReorderError::SizeExceeded); + } + self.push_maybe_overfull(TxOrdered { tx_offset, inner }) + } + + pub fn push_batch_ready(&mut self, items: impl IntoIterator) -> Result, ReorderError> { + let mut ready = Vec::new(); + for (tx_offset, inner) in items { + // A drained batch may include both the missing next offset and later offsets + // queued behind it. Fail only if the expected offset is still missing and the + // reorder window exceeds capacity. + self.push_maybe_overfull(TxOrdered { tx_offset, inner })?; + ready.extend(self.drain()); + if self.len() > self.max_len.get() { + return Err(ReorderError::SizeExceeded); + } + } + Ok(ready) + } + + pub fn drain(&mut self) -> impl Iterator + '_ { + iter::from_fn(|| { + let min_tx_offset = self.heap.peek().map(|Reverse(item)| item.tx_offset); + if min_tx_offset.is_some_and(|tx_offset| tx_offset == self.next_tx) { + let Reverse(TxOrdered { inner, .. }) = self.heap.pop().unwrap(); + self.next_tx += 1; + Some(inner) + } else { + None + } + }) + } + + pub fn len(&self) -> usize { + self.heap.len() + } + + pub fn is_empty(&self) -> bool { + self.heap.is_empty() + } + + fn push_maybe_overfull(&mut self, item: TxOrdered) -> Result<(), ReorderError> { + if item.tx_offset < self.next_tx { + return Err(ReorderError::TxBehind); + } + if !self.heap.is_empty() { + self.heap.reserve_exact(self.max_len.get()); + } + self.heap.push(Reverse(item)); + Ok(()) + } +} + +struct TxOrdered { + tx_offset: TxOffset, + inner: T, +} + +impl PartialEq for TxOrdered { + fn eq(&self, other: &Self) -> bool { + self.tx_offset == other.tx_offset + } +} + +impl Eq for TxOrdered {} + +#[allow(clippy::non_canonical_partial_ord_impl)] +impl PartialOrd for TxOrdered { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.tx_offset.cmp(&other.tx_offset)) + } +} + +impl Ord for TxOrdered { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.partial_cmp(other).unwrap() + } +} + #[derive(Debug, Error)] #[error("the database's durability layer went away")] pub struct DurabilityExited; @@ -263,3 +371,70 @@ impl History for EmptyHistory { (0, Some(0)) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn reorder_window_sorts_by_tx_offset() { + let mut win = ReorderWindow::new(0, NonZeroUsize::new(5).unwrap()); + + for tx_offset in (0..5).rev() { + win.push(tx_offset, tx_offset).unwrap(); + } + + let txs = win.drain().collect::>(); + assert_eq!(txs, &[0, 1, 2, 3, 4]); + } + + #[test] + fn reorder_window_stops_drain_at_gap() { + let mut win = ReorderWindow::new(0, NonZeroUsize::new(5).unwrap()); + + win.push(4, 4).unwrap(); + assert!(win.drain().collect::>().is_empty()); + + for tx_offset in 0..4 { + win.push(tx_offset, tx_offset).unwrap(); + } + + let txs = win.drain().collect::>(); + assert_eq!(&txs, &[0, 1, 2, 3, 4]); + } + + #[test] + fn reorder_window_error_when_full() { + let mut win = ReorderWindow::new(0, NonZeroUsize::new(1).unwrap()); + win.push(0, ()).unwrap(); + assert!(matches!(win.push(1, ()), Err(ReorderError::SizeExceeded))); + } + + #[test] + fn reorder_window_error_on_late_request() { + let mut win = ReorderWindow::new(1, NonZeroUsize::new(5).unwrap()); + assert!(matches!(win.push(0, ()), Err(ReorderError::TxBehind))); + } + + #[test] + fn reorder_window_allows_batch_to_close_gap() { + let mut win = ReorderWindow::new(0, NonZeroUsize::new(8).unwrap()); + + let ready = win + .push_batch_ready((0..=8).rev().map(|tx_offset| (tx_offset, tx_offset))) + .unwrap(); + + assert_eq!(ready, (0..=8).collect::>()); + } + + #[test] + fn reorder_window_errors_when_gap_exceeds_capacity() { + let mut win = ReorderWindow::new(0, NonZeroUsize::new(8).unwrap()); + + let err = win + .push_batch_ready((1..=9).map(|tx_offset| (tx_offset, tx_offset))) + .unwrap_err(); + + assert!(matches!(err, ReorderError::SizeExceeded)); + } +} diff --git a/crates/snapshot/tests/remote.rs b/crates/snapshot/tests/remote.rs index 41097b33abd..7ec8aef425f 100644 --- a/crates/snapshot/tests/remote.rs +++ b/crates/snapshot/tests/remote.rs @@ -233,6 +233,7 @@ async fn create_snapshot(repo: Arc) -> anyhow::Result::default())), snapshots: Some(SnapshotWorker::new(repo, snapshot::Compression::Disabled)), runtime: rt, From f68c938ad5fd06c35f7000075682f1717ef496e3 Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Thu, 9 Apr 2026 17:23:24 -0700 Subject: [PATCH 2/3] Drop datastore lock after requesting durability --- crates/core/src/db/durability.rs | 120 +++--------- crates/core/src/db/persistence.rs | 7 +- crates/core/src/db/relational_db.rs | 42 ++--- crates/core/src/worker_metrics/mod.rs | 5 - .../src/locking_tx_datastore/datastore.rs | 21 +++ .../src/locking_tx_datastore/mut_tx.rs | 31 ++- crates/durability/src/imp/local.rs | 49 +---- crates/durability/src/lib.rs | 177 +----------------- 8 files changed, 99 insertions(+), 353 deletions(-) diff --git a/crates/core/src/db/durability.rs b/crates/core/src/db/durability.rs index 84c7f9b4f53..2403568ad06 100644 --- a/crates/core/src/db/durability.rs +++ b/crates/core/src/db/durability.rs @@ -1,15 +1,14 @@ -use std::{num::NonZeroUsize, sync::Arc, time::Duration}; +use std::{sync::Arc, time::Duration}; use futures::TryFutureExt as _; use log::{error, info}; -use prometheus::IntGauge; use spacetimedb_commitlog::payload::{ txdata::{Mutations, Ops}, Txdata, }; use spacetimedb_datastore::{execution_context::ReducerContext, traits::TxData}; use spacetimedb_durability::Durability as _; -use spacetimedb_durability::{DurableOffset, ReorderWindow, Transaction, TxOffset}; +use spacetimedb_durability::{DurableOffset, Transaction, TxOffset}; use spacetimedb_lib::Identity; use spacetimedb_sats::ProductValue; use tokio::{ @@ -39,36 +38,8 @@ type ShutdownReply = oneshot::Sender; /// Represents a handle to a background task that persists transactions /// according to the [`Durability`] policy provided. /// -/// This exists to avoid holding a transaction lock while -/// preparing the [TxData] for processing by the [Durability] layer. -/// -/// The durability worker is internal to [RelationalDB], which calls -/// [DurabilityWorker::request_durability] after committing a transaction. -/// -/// # Transaction ordering -/// -/// The backing datastore of [RelationalDB] is responsible for creating a total -/// ordering of transactions and must uphold that [TxOffset]s are monotonically -/// increasing without gaps. -/// -/// However, [RelationalDB::commit_tx] respectively [RelationalDB::commit_tx_downgrade] -/// may be called from multiple threads. Because those methods are not -/// synchronized, and release the transaction lock before requesting durability, -/// it is possible for [DurabilityRequest]s to appear slightly out-of-order on -/// the worker channel. -/// -/// To mitigate this, the worker keeps a window of up to `reorder_window_size` -/// requests if out-of-order requests are detected, and flushes it to the -/// underlying durability layer once it is able to linearize the offset sequence. -/// -/// Since we expect out-of-order requests to happen very rarely, this measure -/// should not negatively impact throughput in the common case, unlike holding -/// the transaction lock until request submission is complete. -/// -/// Note that the commitlog rejects out-of-order commits, so if a missing offset -/// arrives outside `reorder_window_size` (or never), already committed -/// transactions may be lost (by way of the durability worker crashing). -/// Those transactions will not be confirmed, however, so this is safe. +/// The durability worker is internal to [RelationalDB], which uses +/// [DurabilityWorker::request_durability] while finalizing a transaction. /// /// [RelationalDB]: crate::db::relational_db::RelationalDB pub struct DurabilityWorker { @@ -92,13 +63,7 @@ impl DurabilityWorker { /// Create a new [`DurabilityWorker`] using the given `durability` policy. /// /// Background tasks will be spawned onto to provided tokio `runtime`. - pub fn new( - database: Identity, - durability: Arc, - runtime: runtime::Handle, - next_tx_offset: TxOffset, - reorder_window_size: NonZeroUsize, - ) -> Self { + pub fn new(database: Identity, durability: Arc, runtime: runtime::Handle) -> Self { let (request_tx, request_rx) = channel(4 * 4096); let (shutdown_tx, shutdown_rx) = channel(1); @@ -106,10 +71,6 @@ impl DurabilityWorker { request_rx, shutdown: shutdown_rx, durability: durability.clone(), - reorder_window: ReorderWindow::new(next_tx_offset, reorder_window_size), - reorder_window_len: WORKER_METRICS - .durability_worker_reorder_window_length - .with_label_values(&database), }; let _enter = runtime.enter(); tokio::spawn( @@ -129,9 +90,7 @@ impl DurabilityWorker { } } - /// Create a [`DurabilityWorker`] that uses the local commitlog durability - /// actor directly. This removes the extra core durability actor so the - /// local path has only one queued background worker. + /// Create a [`DurabilityWorker`] that uses the local commitlog durability actor directly. pub fn new_local(database: Identity, durability: LocalDurability, runtime: runtime::Handle) -> Self { Self { database, @@ -161,6 +120,15 @@ impl DurabilityWorker { /// - [Self::shutdown] was called /// pub fn request_durability(&self, reducer_context: Option, tx_data: &Arc) { + let Some(tx_offset) = tx_data.tx_offset() else { + let name = reducer_context.as_ref().map(|rcx| &rcx.name); + debug_assert!( + !tx_data.has_rows_or_connect_disconnect(name), + "tx_data has no rows but has connect/disconnect: `{name:?}`" + ); + return; + }; + match &self.inner { DurabilityWorkerInner::Generic { request_tx, .. } => { // We first try to send it without blocking. @@ -199,20 +167,10 @@ impl DurabilityWorker { } } DurabilityWorkerInner::Local { durability } => { - let Some(tx_offset) = tx_data.tx_offset() else { - let name = reducer_context.as_ref().map(|rcx| &rcx.name); - debug_assert!( - !tx_data.has_rows_or_connect_disconnect(name), - "tx_data has no rows but has connect/disconnect: `{name:?}`" - ); - return; - }; - let start = std::time::Instant::now(); let tx_data = tx_data.clone(); - let blocked = durability.append_tx_deferred(tx_offset, move || { - prepare_tx_data_for_durability(tx_offset, reducer_context, &tx_data) - }); + let blocked = durability + .append_tx_deferred(move || prepare_tx_data_for_durability(tx_offset, reducer_context, &tx_data)); if blocked { WORKER_METRICS .durability_blocking_send_duration @@ -302,8 +260,6 @@ pub struct DurabilityWorkerActor { request_rx: mpsc::Receiver, shutdown: Receiver, durability: Arc, - reorder_window: ReorderWindow, - reorder_window_len: IntGauge, } impl DurabilityWorkerActor { @@ -311,11 +267,8 @@ impl DurabilityWorkerActor { async fn run(mut self) { // When this future completes or is cancelled, ensure that: // - shutdown waiters are notified - // - metrics are reset - let done = scopeguard::guard(Arc::new(Notify::new()), |done| { - done.notify_waiters(); - self.reorder_window_len.set(0); - }); + let done = scopeguard::guard(Arc::new(Notify::new()), |done| done.notify_waiters()); + let mut request_buf = Vec::with_capacity(4096); loop { tokio::select! { @@ -328,35 +281,16 @@ impl DurabilityWorkerActor { let _ = reply.send(done.clone().notified_owned()); }, - req = self.request_rx.recv() => { - let Some(request) = req else { + n = self.request_rx.recv_many(&mut request_buf, usize::MAX) => { + if n == 0 { break; - }; - match request.tx_data.tx_offset() { - // Drop the request if it doesn't have a tx offset. - None => { - let name = request.reducer_context.as_ref().map(|rcx| &rcx.name); - debug_assert!( - !request.tx_data.has_rows_or_connect_disconnect(name), - "tx_data has no rows but has connect/disconnect: `{name:?}`" - ); - }, - // Otherwise, push to the reordering window. - Some(tx_offset) => { - if let Err(e) = self.reorder_window.push(tx_offset, request) { - error!("{e}"); - break; - } - }, } } } - // Drain all requests that are properly ordered. - self.reorder_window - .drain() + request_buf + .drain(..) .for_each(|request| Self::do_durability(&*self.durability, request.reducer_context, &request.tx_data)); - self.reorder_window_len.set(self.reorder_window.len() as _); } info!("durability worker actor done"); @@ -483,13 +417,7 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn shutdown_waits_until_durable() { let durability = Arc::new(CountingDurability::default()); - let worker = DurabilityWorker::new( - Identity::ONE, - durability.clone(), - runtime::Handle::current(), - 0, - NonZeroUsize::new(1).unwrap(), - ); + let worker = DurabilityWorker::new(Identity::ONE, durability.clone(), runtime::Handle::current()); for i in 0..=10 { let mut txdata = TxData::default(); txdata.set_tx_offset(i); diff --git a/crates/core/src/db/persistence.rs b/crates/core/src/db/persistence.rs index ed3ffdd2698..3f9c17dd98e 100644 --- a/crates/core/src/db/persistence.rs +++ b/crates/core/src/db/persistence.rs @@ -30,9 +30,10 @@ pub type DiskSizeFn = Arc io::Result + Send + Sync>; pub struct Persistence { /// The [Durability] to use, for persisting transactions. pub durability: Arc, - /// The concrete local durability handle, when the database uses the built-in - /// local commitlog path. This allows the core DB layer to bypass helper - /// actors on the local hot path without changing the generic trait object. + /// TODO: Merge this local durability handle with the generic trait object above. + /// This allows us to bypass an actor whose only responsibility is to generate a + /// commitlog payload from `TxData`. Ultimately though this should just be a part + /// of the [Durability] implementation itself. pub local_durability: Option, /// The [DiskSizeFn]. /// diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index f50ad84f4ae..944a43afcd3 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -58,7 +58,6 @@ use spacetimedb_table::table::{RowRef, TableScanIter}; use spacetimedb_table::table_index::IndexKey; use std::borrow::Cow; use std::io; -use std::num::NonZeroUsize; use std::ops::RangeBounds; use std::sync::Arc; use tokio::sync::watch; @@ -122,7 +121,6 @@ pub struct RelationalDB { // this value, later introduction of dynamic configuration will allow the // compiler to find external dependencies. pub const SNAPSHOT_FREQUENCY: u64 = 1_000_000; -const GENERIC_DURABILITY_REORDER_WINDOW_SIZE: NonZeroUsize = NonZeroUsize::new(8).unwrap(); impl std::fmt::Debug for RelationalDB { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -157,21 +155,7 @@ impl RelationalDB { (Some(local_durability), _, Some(rt)) => { Some(DurabilityWorker::new_local(database_identity, local_durability, rt)) } - (None, Some(durability), Some(rt)) => { - let next_tx_offset = { - let tx = inner.begin_tx(Workload::Internal); - let next_tx_offset = tx.tx_offset(); - let _ = inner.release_tx(tx); - next_tx_offset.into_inner() - }; - Some(DurabilityWorker::new( - database_identity, - durability, - rt, - next_tx_offset, - GENERIC_DURABILITY_REORDER_WINDOW_SIZE, - )) - } + (None, Some(durability), Some(rt)) => Some(DurabilityWorker::new(database_identity, durability, rt)), _ => None, }; @@ -826,17 +810,17 @@ impl RelationalDB { let reducer_context = tx.ctx.reducer_context().cloned(); // TODO: Never returns `None` -- should it? - let Some((tx_offset, tx_data, tx_metrics, reducer)) = self.inner.commit_mut_tx(tx)? else { + let Some((tx_offset, tx_data, tx_metrics, reducer)) = self.inner.commit_mut_tx_and_then(tx, |tx_data| { + if let Some(durability) = &self.durability { + durability.request_durability(reducer_context, tx_data); + } + })? + else { return Ok(None); }; self.maybe_do_snapshot(&tx_data); - let tx_data = Arc::new(tx_data); - if let Some(durability) = &self.durability { - durability.request_durability(reducer_context, &tx_data); - } - Ok(Some((tx_offset, tx_data, tx_metrics, reducer))) } @@ -844,15 +828,15 @@ impl RelationalDB { pub fn commit_tx_downgrade(&self, tx: MutTx, workload: Workload) -> (Arc, TxMetrics, Tx) { log::trace!("COMMIT MUT TX"); - let (tx_data, tx_metrics, tx) = self.inner.commit_mut_tx_downgrade(tx, workload); + let reducer_context = tx.ctx.reducer_context().cloned(); + let (tx_data, tx_metrics, tx) = self.inner.commit_mut_tx_downgrade_and_then(tx, workload, |tx_data| { + if let Some(durability) = &self.durability { + durability.request_durability(reducer_context, tx_data); + } + }); self.maybe_do_snapshot(&tx_data); - let tx_data = Arc::new(tx_data); - if let Some(durability) = &self.durability { - durability.request_durability(tx.ctx.reducer_context().cloned(), &tx_data); - } - (tx_data, tx_metrics, tx) } diff --git a/crates/core/src/worker_metrics/mod.rs b/crates/core/src/worker_metrics/mod.rs index 7f5492a7057..787fd9f6d57 100644 --- a/crates/core/src/worker_metrics/mod.rs +++ b/crates/core/src/worker_metrics/mod.rs @@ -491,11 +491,6 @@ metrics_group!( #[labels(database_identity: Identity)] #[buckets(0.001, 0.01, 0.1, 1.0, 10.0)] pub durability_blocking_send_duration: HistogramVec, - - #[name = spacetime_durability_worker_reorder_window_length] - #[help = "The number of transactions currently being held in the reorder window"] - #[labels(db: Identity)] - pub durability_worker_reorder_window_length: IntGaugeVec, } ); diff --git a/crates/datastore/src/locking_tx_datastore/datastore.rs b/crates/datastore/src/locking_tx_datastore/datastore.rs index 53327b55c8b..336476fa6e6 100644 --- a/crates/datastore/src/locking_tx_datastore/datastore.rs +++ b/crates/datastore/src/locking_tx_datastore/datastore.rs @@ -968,6 +968,27 @@ impl Locking { pub fn commit_mut_tx_downgrade(&self, tx: MutTxId, workload: Workload) -> (TxData, TxMetrics, TxId) { tx.commit_downgrade(workload) } + + /// Commit `tx` and invoke `before_release` while the write lock is still held. + #[allow(clippy::type_complexity)] + pub fn commit_mut_tx_and_then( + &self, + tx: MutTxId, + before_release: impl FnOnce(&Arc), + ) -> Result, TxMetrics, Option)>> { + Ok(Some(tx.commit_and_then(before_release))) + } + + /// Commit `tx`, invoke `before_downgrade` while the write lock is still held, + /// then downgrade the lock to a read-only transaction. + pub fn commit_mut_tx_downgrade_and_then( + &self, + tx: MutTxId, + workload: Workload, + before_downgrade: impl FnOnce(&Arc), + ) -> (Arc, TxMetrics, TxId) { + tx.commit_downgrade_and_then(workload, before_downgrade) + } } #[derive(Debug, Error)] diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index 15772807cba..6e817fbdccd 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -1953,7 +1953,17 @@ impl MutTxId { /// - [`TxData`], the set of inserts and deletes performed by this transaction. /// - [`TxMetrics`], various measurements of the work performed by this transaction. /// - `String`, the name of the reducer which ran during this transaction. - pub(super) fn commit(mut self) -> (TxOffset, TxData, TxMetrics, Option) { + pub(super) fn commit(self) -> (TxOffset, TxData, TxMetrics, Option) { + let (tx_offset, tx_data, tx_metrics, reducer) = self.commit_and_then(|_| {}); + let tx_data = + Arc::try_unwrap(tx_data).unwrap_or_else(|_| panic!("noop commit callback must not retain tx data")); + (tx_offset, tx_data, tx_metrics, reducer) + } + + pub(super) fn commit_and_then( + mut self, + before_release: impl FnOnce(&Arc), + ) -> (TxOffset, Arc, TxMetrics, Option) { let tx_offset = self.committed_state_write_lock.next_tx_offset; let tx_data = self .committed_state_write_lock @@ -1987,6 +1997,9 @@ impl MutTxId { tx_offset }; + let tx_data = Arc::new(tx_data); + before_release(&tx_data); + (tx_offset, tx_data, tx_metrics, reducer) } @@ -2003,7 +2016,18 @@ impl MutTxId { /// - [`TxData`], the set of inserts and deletes performed by this transaction. /// - [`TxMetrics`], various measurements of the work performed by this transaction. /// - [`TxId`], a read-only transaction with a shared lock on the committed state. - pub(super) fn commit_downgrade(mut self, workload: Workload) -> (TxData, TxMetrics, TxId) { + pub(super) fn commit_downgrade(self, workload: Workload) -> (TxData, TxMetrics, TxId) { + let (tx_data, tx_metrics, tx) = self.commit_downgrade_and_then(workload, |_| {}); + let tx_data = + Arc::try_unwrap(tx_data).unwrap_or_else(|_| panic!("noop commit callback must not retain tx data")); + (tx_data, tx_metrics, tx) + } + + pub(super) fn commit_downgrade_and_then( + mut self, + workload: Workload, + before_downgrade: impl FnOnce(&Arc), + ) -> (Arc, TxMetrics, TxId) { let tx_data = self .committed_state_write_lock .merge(self.tx_state, self.read_sets, &self.ctx); @@ -2022,6 +2046,9 @@ impl MutTxId { &self.committed_state_write_lock, ); + let tx_data = Arc::new(tx_data); + before_downgrade(&tx_data); + // Update the workload type of the execution context self.ctx.workload = workload.workload_type(); let tx = TxId { diff --git a/crates/durability/src/imp/local.rs b/crates/durability/src/imp/local.rs index f29c461aacd..2d3de149a7f 100644 --- a/crates/durability/src/imp/local.rs +++ b/crates/durability/src/imp/local.rs @@ -22,7 +22,7 @@ use tokio::{ }; use tracing::{instrument, Span}; -use crate::{Close, Durability, DurableOffset, History, ReorderWindow, TxOffset}; +use crate::{Close, Durability, DurableOffset, History, TxOffset}; pub use spacetimedb_commitlog::repo::{OnNewSegmentFn, SizeOnDisk}; @@ -77,24 +77,14 @@ type ShutdownReply = oneshot::Sender; enum QueueItem { Ready(Transaction>), - Deferred { - tx_offset: TxOffset, - prepare: Box>, - }, + Deferred { prepare: Box> }, } impl QueueItem { - fn tx_offset(&self) -> TxOffset { - match self { - Self::Ready(tx) => tx.offset, - Self::Deferred { tx_offset, .. } => *tx_offset, - } - } - fn prepare(self) -> Transaction> { match self { Self::Ready(tx) => tx, - Self::Deferred { prepare, .. } => prepare.prepare(), + Self::Deferred { prepare } => prepare.prepare(), } } } @@ -136,7 +126,7 @@ pub struct Local { /// `Options::QUEUE_CAPACITY_MULTIPLIER * Options::batch_capacity`. queue: mpsc::Sender>, /// How many transactions are pending durability, including items buffered - /// in the queue and items waiting in the reorder window. + /// in the queue and items currently being written by the actor. /// /// This is mainly for observability purposes, and can thus be updated with /// relaxed memory ordering. @@ -173,11 +163,7 @@ impl Local { opts.commitlog, on_new_segment, )?); - let next_tx_offset = clog.max_committed_offset().map_or(0, |offset| offset + 1); let queue_capacity = opts.queue_capacity(); - // The reorder window should be the same size as the queue capacity - let reorder_window_size = - NonZeroUsize::new(queue_capacity).expect("local durability queue capacity is non-zero"); let (queue, txdata_rx) = mpsc::channel(queue_capacity); let queue_depth = Arc::new(AtomicU64::new(0)); let (durable_tx, durable_rx) = watch::channel(clog.max_committed_offset()); @@ -192,7 +178,6 @@ impl Local { queue_depth: queue_depth.clone(), batch_capacity: opts.batch_capacity, - reorder_window: ReorderWindow::new(next_tx_offset, reorder_window_size), lock, } @@ -251,7 +236,6 @@ struct Actor { queue_depth: Arc, batch_capacity: NonZeroUsize, - reorder_window: ReorderWindow>, #[allow(unused)] lock: Lock, @@ -260,7 +244,7 @@ struct Actor { impl Actor { #[instrument(name = "durability::local::actor", skip_all)] async fn run( - mut self, + self, mut transactions_rx: mpsc::Receiver>, mut shutdown_rx: mpsc::Receiver>, ) { @@ -291,18 +275,6 @@ impl Actor { if n == 0 { break; } - // `recv_many` preserves arrival order, not tx order. - // Sort first so gaps already present in this batch can close immediately. - tx_buf.sort_unstable_by_key(QueueItem::tx_offset); - let ordered = tx_buf.drain(..).map(|item| (item.tx_offset(), item)); - match self.reorder_window.push_batch_ready(ordered) { - Ok(ready) => tx_buf = ready, - Err(e) => { - log::error!("durability actor reorder failure: {e}"); - sync_on_exit = false; - break; - } - } if tx_buf.is_empty() { continue; } @@ -311,11 +283,9 @@ impl Actor { let ready_len = tx_buf.len(); self.queue_depth.fetch_sub(ready_len as u64, Relaxed); tx_buf = spawn_blocking(move || -> io::Result>> { - let mut txs = Vec::with_capacity(tx_buf.len()); for item in tx_buf.drain(..) { - txs.push(item.prepare()); + clog.commit([item.prepare()])?; } - clog.commit(txs)?; Ok(tx_buf) }) .await @@ -466,13 +436,8 @@ impl Local { blocked } - pub fn append_tx_deferred( - &self, - tx_offset: TxOffset, - prepare: impl FnOnce() -> Transaction> + Send + Sync + 'static, - ) -> bool { + pub fn append_tx_deferred(&self, prepare: impl FnOnce() -> Transaction> + Send + Sync + 'static) -> bool { self.send_item(QueueItem::Deferred { - tx_offset, prepare: Box::new(prepare), }) } diff --git a/crates/durability/src/lib.rs b/crates/durability/src/lib.rs index 6a267e9c873..7722d86914a 100644 --- a/crates/durability/src/lib.rs +++ b/crates/durability/src/lib.rs @@ -1,4 +1,4 @@ -use std::{cmp::Reverse, collections::BinaryHeap, iter, marker::PhantomData, num::NonZeroUsize, sync::Arc}; +use std::{iter, marker::PhantomData, sync::Arc}; use futures::future::BoxFuture; use thiserror::Error; @@ -19,114 +19,6 @@ pub use imp::*; /// of all offsets smaller than it. pub type TxOffset = u64; -#[derive(Debug, Error)] -pub enum ReorderError { - #[error("reordering window exceeded")] - SizeExceeded, - #[error("transaction offset behind expected offset")] - TxBehind, -} - -/// A bounded collection of elements ordered by [TxOffset], backed by a [BinaryHeap]. -/// -/// This exists to tolerate slightly out-of-order transaction requests while -/// still preserving contiguous commit order. -pub struct ReorderWindow { - heap: BinaryHeap>>, - next_tx: TxOffset, - max_len: NonZeroUsize, -} - -impl ReorderWindow { - pub fn new(next_tx: TxOffset, max_len: NonZeroUsize) -> Self { - Self { - heap: BinaryHeap::with_capacity(1), - next_tx, - max_len, - } - } - - pub fn push(&mut self, tx_offset: TxOffset, inner: T) -> Result<(), ReorderError> { - if self.len() >= self.max_len.get() { - return Err(ReorderError::SizeExceeded); - } - self.push_maybe_overfull(TxOrdered { tx_offset, inner }) - } - - pub fn push_batch_ready(&mut self, items: impl IntoIterator) -> Result, ReorderError> { - let mut ready = Vec::new(); - for (tx_offset, inner) in items { - // A drained batch may include both the missing next offset and later offsets - // queued behind it. Fail only if the expected offset is still missing and the - // reorder window exceeds capacity. - self.push_maybe_overfull(TxOrdered { tx_offset, inner })?; - ready.extend(self.drain()); - if self.len() > self.max_len.get() { - return Err(ReorderError::SizeExceeded); - } - } - Ok(ready) - } - - pub fn drain(&mut self) -> impl Iterator + '_ { - iter::from_fn(|| { - let min_tx_offset = self.heap.peek().map(|Reverse(item)| item.tx_offset); - if min_tx_offset.is_some_and(|tx_offset| tx_offset == self.next_tx) { - let Reverse(TxOrdered { inner, .. }) = self.heap.pop().unwrap(); - self.next_tx += 1; - Some(inner) - } else { - None - } - }) - } - - pub fn len(&self) -> usize { - self.heap.len() - } - - pub fn is_empty(&self) -> bool { - self.heap.is_empty() - } - - fn push_maybe_overfull(&mut self, item: TxOrdered) -> Result<(), ReorderError> { - if item.tx_offset < self.next_tx { - return Err(ReorderError::TxBehind); - } - if !self.heap.is_empty() { - self.heap.reserve_exact(self.max_len.get()); - } - self.heap.push(Reverse(item)); - Ok(()) - } -} - -struct TxOrdered { - tx_offset: TxOffset, - inner: T, -} - -impl PartialEq for TxOrdered { - fn eq(&self, other: &Self) -> bool { - self.tx_offset == other.tx_offset - } -} - -impl Eq for TxOrdered {} - -#[allow(clippy::non_canonical_partial_ord_impl)] -impl PartialOrd for TxOrdered { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.tx_offset.cmp(&other.tx_offset)) - } -} - -impl Ord for TxOrdered { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.partial_cmp(other).unwrap() - } -} - #[derive(Debug, Error)] #[error("the database's durability layer went away")] pub struct DurabilityExited; @@ -371,70 +263,3 @@ impl History for EmptyHistory { (0, Some(0)) } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn reorder_window_sorts_by_tx_offset() { - let mut win = ReorderWindow::new(0, NonZeroUsize::new(5).unwrap()); - - for tx_offset in (0..5).rev() { - win.push(tx_offset, tx_offset).unwrap(); - } - - let txs = win.drain().collect::>(); - assert_eq!(txs, &[0, 1, 2, 3, 4]); - } - - #[test] - fn reorder_window_stops_drain_at_gap() { - let mut win = ReorderWindow::new(0, NonZeroUsize::new(5).unwrap()); - - win.push(4, 4).unwrap(); - assert!(win.drain().collect::>().is_empty()); - - for tx_offset in 0..4 { - win.push(tx_offset, tx_offset).unwrap(); - } - - let txs = win.drain().collect::>(); - assert_eq!(&txs, &[0, 1, 2, 3, 4]); - } - - #[test] - fn reorder_window_error_when_full() { - let mut win = ReorderWindow::new(0, NonZeroUsize::new(1).unwrap()); - win.push(0, ()).unwrap(); - assert!(matches!(win.push(1, ()), Err(ReorderError::SizeExceeded))); - } - - #[test] - fn reorder_window_error_on_late_request() { - let mut win = ReorderWindow::new(1, NonZeroUsize::new(5).unwrap()); - assert!(matches!(win.push(0, ()), Err(ReorderError::TxBehind))); - } - - #[test] - fn reorder_window_allows_batch_to_close_gap() { - let mut win = ReorderWindow::new(0, NonZeroUsize::new(8).unwrap()); - - let ready = win - .push_batch_ready((0..=8).rev().map(|tx_offset| (tx_offset, tx_offset))) - .unwrap(); - - assert_eq!(ready, (0..=8).collect::>()); - } - - #[test] - fn reorder_window_errors_when_gap_exceeds_capacity() { - let mut win = ReorderWindow::new(0, NonZeroUsize::new(8).unwrap()); - - let err = win - .push_batch_ready((1..=9).map(|tx_offset| (tx_offset, tx_offset))) - .unwrap_err(); - - assert!(matches!(err, ReorderError::SizeExceeded)); - } -} From 04b1a24537e9a3ac9f4fe44356df8b0903eff02c Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Fri, 10 Apr 2026 09:50:12 -0700 Subject: [PATCH 3/3] Move payload conversion into Durability trait And remove intermediate worker from generic durability path entirely. --- crates/core/src/db/durability.rs | 446 ++---------------- crates/core/src/db/persistence.rs | 28 +- crates/core/src/db/relational_db.rs | 37 +- .../subscription/module_subscription_actor.rs | 6 +- crates/durability/src/imp/local.rs | 98 ++-- crates/durability/src/imp/mod.rs | 4 +- crates/durability/src/lib.rs | 27 +- crates/durability/tests/io/fallocate.rs | 6 +- crates/snapshot/tests/remote.rs | 1 - 9 files changed, 123 insertions(+), 530 deletions(-) diff --git a/crates/core/src/db/durability.rs b/crates/core/src/db/durability.rs index 2403568ad06..c17a10e9f63 100644 --- a/crates/core/src/db/durability.rs +++ b/crates/core/src/db/durability.rs @@ -1,313 +1,54 @@ use std::{sync::Arc, time::Duration}; -use futures::TryFutureExt as _; use log::{error, info}; use spacetimedb_commitlog::payload::{ txdata::{Mutations, Ops}, Txdata, }; use spacetimedb_datastore::{execution_context::ReducerContext, traits::TxData}; -use spacetimedb_durability::Durability as _; -use spacetimedb_durability::{DurableOffset, Transaction, TxOffset}; +use spacetimedb_durability::Transaction; use spacetimedb_lib::Identity; use spacetimedb_sats::ProductValue; -use tokio::{ - runtime, - sync::{ - futures::OwnedNotified, - mpsc::{self, channel, Receiver, Sender}, - oneshot, Notify, - }, - time::timeout, -}; -use tracing::{info_span, Instrument as _}; +use tokio::{runtime, time::timeout}; -use crate::{ - db::{persistence::Durability, relational_db::LocalDurability}, - worker_metrics::WORKER_METRICS, -}; +use crate::db::persistence::Durability; -/// A request to persist a transaction or to terminate the actor. -pub struct DurabilityRequest { +pub(super) fn request_durability( + durability: &Durability, reducer_context: Option, - tx_data: Arc, -} - -type ShutdownReply = oneshot::Sender; - -/// Represents a handle to a background task that persists transactions -/// according to the [`Durability`] policy provided. -/// -/// The durability worker is internal to [RelationalDB], which uses -/// [DurabilityWorker::request_durability] while finalizing a transaction. -/// -/// [RelationalDB]: crate::db::relational_db::RelationalDB -pub struct DurabilityWorker { - database: Identity, - runtime: runtime::Handle, - inner: DurabilityWorkerInner, -} - -enum DurabilityWorkerInner { - Generic { - request_tx: Sender, - shutdown: Sender, - durability: Arc, - }, - Local { - durability: LocalDurability, - }, -} - -impl DurabilityWorker { - /// Create a new [`DurabilityWorker`] using the given `durability` policy. - /// - /// Background tasks will be spawned onto to provided tokio `runtime`. - pub fn new(database: Identity, durability: Arc, runtime: runtime::Handle) -> Self { - let (request_tx, request_rx) = channel(4 * 4096); - let (shutdown_tx, shutdown_rx) = channel(1); - - let actor = DurabilityWorkerActor { - request_rx, - shutdown: shutdown_rx, - durability: durability.clone(), - }; - let _enter = runtime.enter(); - tokio::spawn( - actor - .run() - .instrument(info_span!("durability_worker", database = %database)), + tx_data: &Arc, +) { + let Some(tx_offset) = tx_data.tx_offset() else { + let name = reducer_context.as_ref().map(|rcx| &rcx.name); + debug_assert!( + !tx_data.has_rows_or_connect_disconnect(name), + "tx_data has no rows but has connect/disconnect: `{name:?}`" ); - - Self { - database, - runtime, - inner: DurabilityWorkerInner::Generic { - request_tx, - shutdown: shutdown_tx, - durability, - }, - } - } - - /// Create a [`DurabilityWorker`] that uses the local commitlog durability actor directly. - pub fn new_local(database: Identity, durability: LocalDurability, runtime: runtime::Handle) -> Self { - Self { - database, - runtime, - inner: DurabilityWorkerInner::Local { durability }, - } - } - - /// Request that a transaction be made durable. - /// That is, if `(tx_data, ctx)` should be appended to the commitlog, do so. - /// - /// Note that by this stage - /// [`spacetimedb_datastore::locking_tx_datastore::committed_state::tx_consumes_offset`] - /// has already decided based on the reducer and operations whether the transaction should be appended; - /// this method is responsible only for reading its decision out of the `tx_data` - /// and calling `durability.append_tx`. - /// - /// This method queues the work for durability processing. - /// It blocks if the active queue is at capacity. - /// - /// # Panics - /// - /// Panics if the durability worker has already closed the receive end of - /// its queue. This may happen if - /// - /// - the backing [Durability] has panicked, or - /// - [Self::shutdown] was called - /// - pub fn request_durability(&self, reducer_context: Option, tx_data: &Arc) { - let Some(tx_offset) = tx_data.tx_offset() else { - let name = reducer_context.as_ref().map(|rcx| &rcx.name); - debug_assert!( - !tx_data.has_rows_or_connect_disconnect(name), - "tx_data has no rows but has connect/disconnect: `{name:?}`" - ); - return; - }; - - match &self.inner { - DurabilityWorkerInner::Generic { request_tx, .. } => { - // We first try to send it without blocking. - match request_tx.try_reserve() { - Ok(permit) => { - permit.send(DurabilityRequest { - reducer_context, - tx_data: tx_data.clone(), - }); - } - Err(mpsc::error::TrySendError::Closed(_)) => { - panic!("durability actor vanished database={}", self.database); - } - Err(mpsc::error::TrySendError::Full(_)) => { - // If the channel was full, we use the blocking version. - let start = std::time::Instant::now(); - let send = || { - request_tx.blocking_send(DurabilityRequest { - reducer_context, - tx_data: tx_data.clone(), - }) - }; - if tokio::runtime::Handle::try_current().is_ok() { - tokio::task::block_in_place(send) - } else { - send() - } - .unwrap_or_else(|_| panic!("durability actor vanished database={}", self.database)); - // We could cache this metric, but if we are already in the blocking code path, - // the extra time of looking up the metric is probably negligible. - WORKER_METRICS - .durability_blocking_send_duration - .with_label_values(&self.database) - .observe(start.elapsed().as_secs_f64()); - } - } - } - DurabilityWorkerInner::Local { durability } => { - let start = std::time::Instant::now(); - let tx_data = tx_data.clone(); - let blocked = durability - .append_tx_deferred(move || prepare_tx_data_for_durability(tx_offset, reducer_context, &tx_data)); - if blocked { - WORKER_METRICS - .durability_blocking_send_duration - .with_label_values(&self.database) - .observe(start.elapsed().as_secs_f64()); - } - } - } - } - - /// Get the [`DurableOffset`] of this database. - pub fn durable_tx_offset(&self) -> DurableOffset { - match &self.inner { - DurabilityWorkerInner::Generic { durability, .. } => durability.durable_tx_offset(), - DurabilityWorkerInner::Local { durability } => durability.durable_tx_offset(), - } - } - - /// Shut down the worker without dropping it, - /// flushing outstanding transaction. - /// - /// Closes the internal channel, then waits for the [DurableOffset] to - /// report the offset of the most recently enqueued transaction as durable. - /// - /// # Panics - /// - /// After this method was called, calling [Self::request_durability] - /// will panic. - pub async fn close(&self) -> Option { - match &self.inner { - DurabilityWorkerInner::Generic { - shutdown, durability, .. - } => { - let (done_tx, done_rx) = oneshot::channel(); - // Channel errors can be ignored. - // It just means that the actor already exited. - let _ = shutdown - .send(done_tx) - .map_err(drop) - .and_then(|()| done_rx.map_err(drop)) - .and_then(|done| async move { - done.await; - Ok(()) - }) - .await; - durability.close().await - } - DurabilityWorkerInner::Local { durability } => durability.close().await, - } - } - - /// Consume `self` and run [Self::close]. - /// - /// The `lock_file` is not dropped until the shutdown is complete (either - /// successfully or unsuccessfully). This is to prevent the database to be - /// re-opened for writing while there is still an active background task - /// writing to the commitlog. - /// - /// The shutdown task will be spawned onto the tokio runtime provided to - /// [Self::new]. This means that the task may still be running when this - /// method returns. - /// - /// `database_identity` is used to associate log records with the database - /// owning this durability worker. - /// - /// This method is used in the `Drop` impl for [crate::db::relational_db::RelationalDB]. - pub(super) fn spawn_close(self, database_identity: Identity) { - let rt = self.runtime.clone(); - rt.spawn(async move { - let label = format!("[{database_identity}]"); - // Apply a timeout, in case `Durability::close` doesn't terminate - // as advertised. This is a bug, but panicking here would not - // unwind at the call site. - match timeout(Duration::from_secs(10), self.close()).await { - Err(_elapsed) => { - error!("{label} timeout waiting for durability worker shutdown"); - } - Ok(offset) => { - info!("{label} durability worker shut down at tx offset: {offset:?}"); - } - } - }); - } -} - -pub struct DurabilityWorkerActor { - request_rx: mpsc::Receiver, - shutdown: Receiver, - durability: Arc, + return; + }; + let tx_data = tx_data.clone(); + durability.append_tx(Box::new(move || { + prepare_tx_data_for_durability(tx_offset, reducer_context, &tx_data) + })); } -impl DurabilityWorkerActor { - /// Processes requests to do durability. - async fn run(mut self) { - // When this future completes or is cancelled, ensure that: - // - shutdown waiters are notified - let done = scopeguard::guard(Arc::new(Notify::new()), |done| done.notify_waiters()); - let mut request_buf = Vec::with_capacity(4096); - - loop { - tokio::select! { - // Biased towards the shutdown channel, - // so that adding new requests is prevented promptly. - biased; - - Some(reply) = self.shutdown.recv() => { - self.request_rx.close(); - let _ = reply.send(done.clone().notified_owned()); - }, - - n = self.request_rx.recv_many(&mut request_buf, usize::MAX) => { - if n == 0 { - break; - } - } +pub(super) fn spawn_close(durability: Arc, runtime: &runtime::Handle, database_identity: Identity) { + let rt = runtime.clone(); + rt.spawn(async move { + let label = format!("[{database_identity}]"); + match timeout(Duration::from_secs(10), durability.close()).await { + Err(_elapsed) => { + error!("{label} timeout waiting for durability shutdown"); + } + Ok(offset) => { + info!("{label} durability shut down at tx offset: {offset:?}"); } - - request_buf - .drain(..) - .for_each(|request| Self::do_durability(&*self.durability, request.reducer_context, &request.tx_data)); } - - info!("durability worker actor done"); - } - - pub fn do_durability(durability: &Durability, reducer_context: Option, tx_data: &TxData) { - let tx_offset = tx_data - .tx_offset() - .expect("txs without offset should have been dropped"); - let tx = prepare_tx_data_for_durability(tx_offset, reducer_context, tx_data); - // This does not block, as per trait docs. - durability.append_tx(tx); - } + }); } fn prepare_tx_data_for_durability( - tx_offset: TxOffset, + tx_offset: u64, reducer_context: Option, tx_data: &TxData, ) -> Transaction> { @@ -315,9 +56,6 @@ fn prepare_tx_data_for_durability( .persistent_inserts() .map(|(table_id, rowdata)| Ops { table_id, rowdata }) .collect(); - // What we get from `tx_data` is not necessarily sorted, - // but the durability layer expects by-table_id sorted data. - // Unstable sorts are valid, there will only ever be one entry per table_id. inserts.sort_unstable_by_key(|ops| ops.table_id); let mut deletes: Box<_> = tx_data @@ -336,122 +74,16 @@ fn prepare_tx_data_for_durability( "empty transaction" ); - let txdata = Txdata { - inputs, - outputs: None, - mutations: Some(Mutations { - inserts, - deletes, - truncates, - }), - }; - Transaction { offset: tx_offset, - txdata, - } -} - -#[cfg(test)] -mod tests { - use std::{pin::pin, task::Poll}; - - use futures::FutureExt as _; - use pretty_assertions::assert_matches; - use spacetimedb_sats::product; - use spacetimedb_schema::table_name::TableName; - use tokio::sync::watch; - - use super::*; - use crate::db::relational_db::Txdata; - - #[derive(Default)] - struct CountingDurability { - appended: watch::Sender>, - durable: watch::Sender>, - } - - impl CountingDurability { - async fn mark_durable(&self, offset: TxOffset) { - self.appended - .subscribe() - .wait_for(|x| x.is_some_and(|appended_offset| appended_offset >= offset)) - .await - .unwrap(); - self.durable.send_modify(|durable_offset| { - durable_offset.replace(offset); - }); - } - } - - impl spacetimedb_durability::Durability for CountingDurability { - type TxData = Txdata; - - fn append_tx(&self, tx: Transaction) { - self.appended.send_modify(|offset| { - offset.replace(tx.offset); - }); - } - - fn durable_tx_offset(&self) -> DurableOffset { - self.durable.subscribe().into() - } - - fn close(&self) -> spacetimedb_durability::Close { - let mut durable = self.durable.subscribe(); - let appended = self.appended.subscribe(); - async move { - let durable_offset = durable - .wait_for(|durable| match (*durable).zip(*appended.borrow()) { - Some((durable_offset, appended_offset)) => durable_offset >= appended_offset, - None => false, - }) - .await - .unwrap(); - *durable_offset - } - .boxed() - } - } - - #[tokio::test(flavor = "multi_thread")] - async fn shutdown_waits_until_durable() { - let durability = Arc::new(CountingDurability::default()); - let worker = DurabilityWorker::new(Identity::ONE, durability.clone(), runtime::Handle::current()); - for i in 0..=10 { - let mut txdata = TxData::default(); - txdata.set_tx_offset(i); - // Ensure the transaction is non-empty. - txdata.set_inserts_for_table(4000.into(), &TableName::for_test("foo"), [product![42u8]].into()); - - worker.request_durability(None, &Arc::new(txdata)); - } - - let shutdown = worker.close(); - let mut shutdown_fut = pin!(shutdown); - assert_matches!( - futures::poll!(&mut shutdown_fut), - Poll::Pending, - "shutdown should be pending because requested > durable" - ); - - durability.mark_durable(5).await; - assert_matches!( - futures::poll!(&mut shutdown_fut), - Poll::Pending, - "shutdown should be pending because requested > durable" - ); - - durability.mark_durable(10).await; - assert_matches!( - timeout(Duration::from_secs(1), shutdown_fut).await, - Ok(Some(10)), - "shutdown should complete shortly after durable catches up" - ); - assert_eq!( - Some(10), - *durability.appended.borrow(), - "durability should have appended up to tx offset 10" - ); + txdata: Txdata { + inputs, + outputs: None, + mutations: Some(Mutations { + inserts, + deletes, + truncates, + }), + }, } } diff --git a/crates/core/src/db/persistence.rs b/crates/core/src/db/persistence.rs index 3f9c17dd98e..e837506da38 100644 --- a/crates/core/src/db/persistence.rs +++ b/crates/core/src/db/persistence.rs @@ -9,7 +9,7 @@ use spacetimedb_snapshot::SnapshotRepository; use crate::{messages::control_db::Database, util::asyncify}; use super::{ - relational_db::{self, LocalDurability, Txdata}, + relational_db::{self, Txdata}, snapshot::{self, SnapshotDatabaseState, SnapshotWorker}, }; @@ -30,11 +30,6 @@ pub type DiskSizeFn = Arc io::Result + Send + Sync>; pub struct Persistence { /// The [Durability] to use, for persisting transactions. pub durability: Arc, - /// TODO: Merge this local durability handle with the generic trait object above. - /// This allows us to bypass an actor whose only responsibility is to generate a - /// commitlog payload from `TxData`. Ultimately though this should just be a part - /// of the [Durability] implementation itself. - pub local_durability: Option, /// The [DiskSizeFn]. /// /// Currently the expectation is that the reported size is the commitlog @@ -60,7 +55,6 @@ impl Persistence { ) -> Self { Self { durability: Arc::new(durability), - local_durability: None, disk_size: Arc::new(disk_size), snapshots, runtime, @@ -89,14 +83,12 @@ impl Persistence { /// Convenience to deconstruct an [Option] into parts. /// - /// Returns `(Some(durability), local_durability, Some(disk_size), Option, Some(runtime))` - /// if `this` is `Some`, and `(None, None, None, None, None)` if `this` is `None`. - #[allow(clippy::type_complexity)] + /// Returns `(Some(durability), Some(disk_size), Option, Some(runtime))` + /// if `this` is `Some`, and `(None, None, None, None)` if `this` is `None`. pub(super) fn unzip( this: Option, ) -> ( Option>, - Option, Option, Option, Option, @@ -104,19 +96,10 @@ impl Persistence { this.map( |Self { durability, - local_durability, disk_size, snapshots, runtime, - }| { - ( - Some(durability), - local_durability, - Some(disk_size), - snapshots, - Some(runtime), - ) - }, + }| (Some(durability), Some(disk_size), snapshots, Some(runtime)), ) .unwrap_or_default() } @@ -176,8 +159,7 @@ impl PersistenceProvider for LocalPersistenceProvider { )); Ok(Persistence { - durability: durability.clone(), - local_durability: Some(durability), + durability, disk_size, snapshots: Some(snapshot_worker), runtime: tokio::runtime::Handle::current(), diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 944a43afcd3..e2ec35fcccf 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -1,4 +1,4 @@ -use crate::db::durability::DurabilityWorker; +use crate::db::durability::{request_durability, spawn_close as spawn_durability_close}; use crate::db::MetricsRecorderQueue; use crate::error::{DBError, RestoreSnapshotError}; use crate::subscription::ExecutionCounters; @@ -12,7 +12,7 @@ use spacetimedb_commitlog::{self as commitlog, Commitlog, SizeOnDisk}; use spacetimedb_data_structures::map::HashSet; use spacetimedb_datastore::db_metrics::DB_METRICS; use spacetimedb_datastore::error::{DatastoreError, TableError, ViewError}; -use spacetimedb_datastore::execution_context::{Workload, WorkloadType}; +use spacetimedb_datastore::execution_context::{ReducerContext, Workload, WorkloadType}; use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics; use spacetimedb_datastore::locking_tx_datastore::state_view::{ IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, StateView, @@ -98,7 +98,8 @@ pub struct RelationalDB { owner_identity: Identity, inner: Locking, - durability: Option, + durability: Option>, + durability_runtime: Option, snapshot_worker: Option, row_count_fn: RowCountFn, @@ -133,8 +134,8 @@ impl std::fmt::Debug for RelationalDB { impl Drop for RelationalDB { fn drop(&mut self) { // Attempt to flush the outstanding transactions. - if let Some(worker) = self.durability.take() { - worker.spawn_close(self.database_identity); + if let (Some(durability), Some(runtime)) = (self.durability.take(), self.durability_runtime.take()) { + spawn_durability_close(durability, &runtime, self.database_identity); } } } @@ -150,18 +151,12 @@ impl RelationalDB { let workload_type_to_exec_counters = Arc::new(EnumMap::from_fn(|ty| ExecutionCounters::new(&ty, &database_identity))); - let (durability, local_durability, disk_size_fn, snapshot_worker, rt) = Persistence::unzip(persistence); - let durability = match (local_durability, durability, rt) { - (Some(local_durability), _, Some(rt)) => { - Some(DurabilityWorker::new_local(database_identity, local_durability, rt)) - } - (None, Some(durability), Some(rt)) => Some(DurabilityWorker::new(database_identity, durability, rt)), - _ => None, - }; + let (durability, disk_size_fn, snapshot_worker, durability_runtime) = Persistence::unzip(persistence); Self { inner, durability, + durability_runtime, snapshot_worker, database_identity, @@ -811,9 +806,7 @@ impl RelationalDB { let reducer_context = tx.ctx.reducer_context().cloned(); // TODO: Never returns `None` -- should it? let Some((tx_offset, tx_data, tx_metrics, reducer)) = self.inner.commit_mut_tx_and_then(tx, |tx_data| { - if let Some(durability) = &self.durability { - durability.request_durability(reducer_context, tx_data); - } + self.request_durability(reducer_context, tx_data); })? else { return Ok(None); @@ -830,9 +823,7 @@ impl RelationalDB { let reducer_context = tx.ctx.reducer_context().cloned(); let (tx_data, tx_metrics, tx) = self.inner.commit_mut_tx_downgrade_and_then(tx, workload, |tx_data| { - if let Some(durability) = &self.durability { - durability.request_durability(reducer_context, tx_data); - } + self.request_durability(reducer_context, tx_data); }); self.maybe_do_snapshot(&tx_data); @@ -848,6 +839,12 @@ impl RelationalDB { .map(|durability| durability.durable_tx_offset()) } + fn request_durability(&self, reducer_context: Option, tx_data: &Arc) { + if let Some(durability) = &self.durability { + request_durability(durability.as_ref(), reducer_context, tx_data); + } + } + /// Decide based on the `committed_state.next_tx_offset` /// whether to request that the [`SnapshotWorker`] in `self` capture a snapshot of the database. /// @@ -1970,7 +1967,6 @@ pub mod tests_utils { let persistence = Persistence { durability: local.clone(), - local_durability: Some(local.clone()), disk_size: disk_size_fn, snapshots, runtime: rt, @@ -2092,7 +2088,6 @@ pub mod tests_utils { let history = local.as_history(); let persistence = Persistence { durability: local.clone(), - local_durability: Some(local.clone()), disk_size: disk_size_fn, snapshots, runtime: rt, diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 35b39839f8a..301bbc47767 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -1900,7 +1900,7 @@ mod tests { use spacetimedb_commitlog::{commitlog, repo}; use spacetimedb_data_structures::map::{HashCollectionExt as _, HashMap}; use spacetimedb_datastore::system_tables::{StRowLevelSecurityRow, ST_ROW_LEVEL_SECURITY_ID}; - use spacetimedb_durability::{Durability, EmptyHistory, Transaction, TxOffset}; + use spacetimedb_durability::{Durability, EmptyHistory, TxOffset}; use spacetimedb_execution::dml::MutDatastore; use spacetimedb_lib::bsatn::ToBsatn; use spacetimedb_lib::db::auth::StAccess; @@ -1987,7 +1987,8 @@ mod tests { impl Durability for ManualDurability { type TxData = Txdata; - fn append_tx(&self, tx: Transaction) { + fn append_tx(&self, tx: spacetimedb_durability::PreparedTx) { + let tx = tx.into_transaction(); let mut commitlog = self.commitlog.write().unwrap(); commitlog.commit([tx]).expect("commit failed"); commitlog.flush().expect("error flushing commitlog"); @@ -2043,7 +2044,6 @@ mod tests { EmptyHistory::new(), Some(Persistence { durability: durability.clone(), - local_durability: None, disk_size: Arc::new(|| Ok(<_>::default())), snapshots: None, runtime: rt, diff --git a/crates/durability/src/imp/local.rs b/crates/durability/src/imp/local.rs index 2d3de149a7f..43388128eeb 100644 --- a/crates/durability/src/imp/local.rs +++ b/crates/durability/src/imp/local.rs @@ -22,7 +22,7 @@ use tokio::{ }; use tracing::{instrument, Span}; -use crate::{Close, Durability, DurableOffset, History, TxOffset}; +use crate::{Close, Durability, DurableOffset, History, PreparedTx, TxOffset}; pub use spacetimedb_commitlog::repo::{OnNewSegmentFn, SizeOnDisk}; @@ -75,33 +75,6 @@ pub enum OpenError { type ShutdownReply = oneshot::Sender; -enum QueueItem { - Ready(Transaction>), - Deferred { prepare: Box> }, -} - -impl QueueItem { - fn prepare(self) -> Transaction> { - match self { - Self::Ready(tx) => tx, - Self::Deferred { prepare } => prepare.prepare(), - } - } -} - -trait DeferredTx: Send + Sync { - fn prepare(self: Box) -> Transaction>; -} - -impl DeferredTx for F -where - F: FnOnce() -> Transaction> + Send + Sync + 'static, -{ - fn prepare(self: Box) -> Transaction> { - self() - } -} - /// [`Durability`] implementation backed by a [`Commitlog`] on local storage. /// /// The commitlog is constrained to store the canonical [`Txdata`] payload, @@ -124,7 +97,7 @@ pub struct Local { /// /// The queue is bounded to /// `Options::QUEUE_CAPACITY_MULTIPLIER * Options::batch_capacity`. - queue: mpsc::Sender>, + queue: mpsc::Sender>>, /// How many transactions are pending durability, including items buffered /// in the queue and items currently being written by the actor. /// @@ -245,7 +218,7 @@ impl Actor { #[instrument(name = "durability::local::actor", skip_all)] async fn run( self, - mut transactions_rx: mpsc::Receiver>, + mut transactions_rx: mpsc::Receiver>>, mut shutdown_rx: mpsc::Receiver>, ) { info!("starting durability actor"); @@ -282,9 +255,9 @@ impl Actor { let clog = self.clog.clone(); let ready_len = tx_buf.len(); self.queue_depth.fetch_sub(ready_len as u64, Relaxed); - tx_buf = spawn_blocking(move || -> io::Result>> { - for item in tx_buf.drain(..) { - clog.commit([item.prepare()])?; + tx_buf = spawn_blocking(move || -> io::Result>>> { + for tx in tx_buf.drain(..) { + clog.commit([tx.into_transaction()])?; } Ok(tx_buf) }) @@ -370,8 +343,30 @@ impl Drop for Lock { impl Durability for Local { type TxData = Txdata; - fn append_tx(&self, tx: Transaction) { - let _ = self.send_item(QueueItem::Ready(tx)); + fn append_tx(&self, tx: PreparedTx) { + let mut tx = Some(tx); + let blocked = match self.queue.try_reserve() { + Ok(permit) => { + permit.send(tx.take().expect("tx already sent")); + false + } + Err(mpsc::error::TrySendError::Closed(_)) => { + panic!("durability actor crashed"); + } + Err(mpsc::error::TrySendError::Full(_)) => { + let mut send = || self.queue.blocking_send(tx.take().expect("tx already sent")); + if tokio::runtime::Handle::try_current().is_ok() { + tokio::task::block_in_place(send) + } else { + send() + } + .expect("durability actor crashed"); + true + } + }; + + self.queue_depth.fetch_add(1, Relaxed); + let _ = blocked; } fn durable_tx_offset(&self) -> DurableOffset { @@ -410,39 +405,6 @@ impl Durability for Local { } } -impl Local { - fn send_item(&self, item: QueueItem) -> bool { - let blocked = match self.queue.try_reserve() { - Ok(permit) => { - permit.send(item); - false - } - Err(mpsc::error::TrySendError::Closed(_)) => { - panic!("durability actor crashed"); - } - Err(mpsc::error::TrySendError::Full(_)) => { - let send = || self.queue.blocking_send(item); - if tokio::runtime::Handle::try_current().is_ok() { - tokio::task::block_in_place(send) - } else { - send() - } - .expect("durability actor crashed"); - true - } - }; - - self.queue_depth.fetch_add(1, Relaxed); - blocked - } - - pub fn append_tx_deferred(&self, prepare: impl FnOnce() -> Transaction> + Send + Sync + 'static) -> bool { - self.send_item(QueueItem::Deferred { - prepare: Box::new(prepare), - }) - } -} - impl History for Commitlog> { type TxData = Txdata; diff --git a/crates/durability/src/imp/mod.rs b/crates/durability/src/imp/mod.rs index 1636e05cc51..3e00ae21ee1 100644 --- a/crates/durability/src/imp/mod.rs +++ b/crates/durability/src/imp/mod.rs @@ -15,7 +15,7 @@ mod testing { use futures::FutureExt as _; use tokio::sync::watch; - use crate::{Close, Durability, DurableOffset, Transaction, TxOffset}; + use crate::{Close, Durability, DurableOffset, PreparedTx, TxOffset}; /// A [`Durability`] impl that sends all transactions into the void. /// @@ -41,7 +41,7 @@ mod testing { impl Durability for NoDurability { type TxData = T; - fn append_tx(&self, _: Transaction) { + fn append_tx(&self, _: PreparedTx) { if self.closed.load(Ordering::Relaxed) { panic!("`close` was called on this `NoDurability` instance"); } diff --git a/crates/durability/src/lib.rs b/crates/durability/src/lib.rs index 7722d86914a..2b2aadcbbfb 100644 --- a/crates/durability/src/lib.rs +++ b/crates/durability/src/lib.rs @@ -91,6 +91,29 @@ impl From>> for DurableOffset { /// can be used as a trait object without knowing the type of the `close` future. pub type Close = BoxFuture<'static, Option>; +/// Object-safe conversion into an owned [Transaction]. +pub trait IntoTransaction: Send { + fn into_transaction(self: Box) -> Transaction; +} + +impl IntoTransaction for Transaction { + fn into_transaction(self: Box) -> Transaction { + *self + } +} + +impl IntoTransaction for F +where + T: Send + 'static, + F: FnOnce() -> Transaction + Send + 'static, +{ + fn into_transaction(self: Box) -> Transaction { + self() + } +} + +pub type PreparedTx = Box>; + /// The durability API. /// /// NOTE: This is a preliminary definition, still under consideration. @@ -105,7 +128,7 @@ pub trait Durability: Send + Sync { /// The payload representing a single transaction. type TxData; - /// Submit a [Transaction] to be made durable. + /// Submit work that yields a [Transaction] to be made durable. /// /// This method must never block, and accept new transactions even if they /// cannot be made durable immediately. @@ -121,7 +144,7 @@ pub trait Durability: Send + Sync { // (i.e. a torn write will corrupt all transactions contained in it), and it // is very unclear when it is both correct and beneficial to bundle more // than a single transaction into a commit. - fn append_tx(&self, tx: Transaction); + fn append_tx(&self, tx: PreparedTx); /// Obtain a handle to the [DurableOffset]. fn durable_tx_offset(&self) -> DurableOffset; diff --git a/crates/durability/tests/io/fallocate.rs b/crates/durability/tests/io/fallocate.rs index b4c6dbd1a7b..05695e20302 100644 --- a/crates/durability/tests/io/fallocate.rs +++ b/crates/durability/tests/io/fallocate.rs @@ -36,7 +36,7 @@ use spacetimedb_commitlog::{ segment, tests::helpers::enable_logging, }; -use spacetimedb_durability::{local::OpenError, Durability, Txdata}; +use spacetimedb_durability::{local::OpenError, Durability, Transaction, Txdata}; use spacetimedb_paths::{server::ReplicaDir, FromPathUnchecked}; use tempfile::{NamedTempFile, TempDir}; use tokio::{sync::watch, time::sleep}; @@ -99,7 +99,7 @@ async fn local_durability_crashes_on_new_segment_if_not_enough_space() { new_segment_rx.borrow_and_update(); // Write past available space. for offset in 0..256 { - durability.append_tx((offset, txdata.clone()).into()); + durability.append_tx(Box::new(Transaction::from((offset, txdata.clone())))); } // Ensure new segment is created. new_segment_rx.changed().await?; @@ -107,7 +107,7 @@ async fn local_durability_crashes_on_new_segment_if_not_enough_space() { sleep(Duration::from_millis(5)).await; // Durability actor should have crashed, so this should panic. info!("trying append on crashed durability"); - durability.append_tx((256, txdata.clone()).into()); + durability.append_tx(Box::new(Transaction::from((256, txdata.clone())))); } Ok(()) diff --git a/crates/snapshot/tests/remote.rs b/crates/snapshot/tests/remote.rs index 7ec8aef425f..41097b33abd 100644 --- a/crates/snapshot/tests/remote.rs +++ b/crates/snapshot/tests/remote.rs @@ -233,7 +233,6 @@ async fn create_snapshot(repo: Arc) -> anyhow::Result::default())), snapshots: Some(SnapshotWorker::new(repo, snapshot::Compression::Disabled)), runtime: rt,