diff --git a/crates/core/src/db/durability.rs b/crates/core/src/db/durability.rs index 9c78d3d0b33..c17a10e9f63 100644 --- a/crates/core/src/db/durability.rs +++ b/crates/core/src/db/durability.rs @@ -1,410 +1,82 @@ -use std::{cmp::Reverse, collections::BinaryHeap, iter, num::NonZeroUsize, sync::Arc, time::Duration}; +use std::{sync::Arc, time::Duration}; -use futures::TryFutureExt as _; use log::{error, info}; -use prometheus::IntGauge; use spacetimedb_commitlog::payload::{ txdata::{Mutations, Ops}, Txdata, }; use spacetimedb_datastore::{execution_context::ReducerContext, traits::TxData}; -use spacetimedb_durability::{DurableOffset, Transaction, TxOffset}; +use spacetimedb_durability::Transaction; use spacetimedb_lib::Identity; -use thiserror::Error; -use tokio::{ - runtime, - sync::{ - futures::OwnedNotified, - mpsc::{self, channel, Receiver, Sender}, - oneshot, Notify, - }, - time::timeout, -}; -use tracing::{info_span, Instrument as _}; +use spacetimedb_sats::ProductValue; +use tokio::{runtime, time::timeout}; -use crate::{db::persistence::Durability, worker_metrics::WORKER_METRICS}; +use crate::db::persistence::Durability; -/// A request to persist a transaction or to terminate the actor. -pub struct DurabilityRequest { +pub(super) fn request_durability( + durability: &Durability, reducer_context: Option, - tx_data: Arc, -} - -type ShutdownReply = oneshot::Sender; - -/// Represents a handle to a background task that persists transactions -/// according to the [`Durability`] policy provided. -/// -/// This exists to avoid holding a transaction lock while -/// preparing the [TxData] for processing by the [Durability] layer. -/// -/// The durability worker is internal to [RelationalDB], which calls -/// [DurabilityWorker::request_durability] after committing a transaction. -/// -/// # Transaction ordering -/// -/// The backing datastore of [RelationalDB] is responsible for creating a total -/// ordering of transactions and must uphold that [TxOffset]s are monotonically -/// increasing without gaps. -/// -/// However, [RelationalDB::commit_tx] respectively [RelationalDB::commit_tx_downgrade] -/// may be called from multiple threads. Because those methods are not -/// synchronized, and release the transaction lock before requesting durability, -/// it is possible for [DurabilityRequest]s to appear slightly out-of-order on -/// the worker channel. -/// -/// To mitigate this, the worker keeps a window of up to `reorder_window_size` -/// requests if out-of-order requests are detected, and flushes it to the -/// underlying durability layer once it is able to linearize the offset sequence. -/// -/// Since we expect out-of-order requests to happen very rarely, this measure -/// should not negatively impact throughput in the common case, unlike holding -/// the transaction lock until request submission is complete. -/// -/// Note that the commitlog rejects out-of-order commits, so if a missing offset -/// arrives outside `reorder_window_size` (or never), already committed -/// transactions may be lost (by way of the durability worker crashing). -/// Those transactions will not be confirmed, however, so this is safe. -/// -/// [RelationalDB]: crate::db::relational_db::RelationalDB -pub struct DurabilityWorker { - database: Identity, - request_tx: Sender, - shutdown: Sender, - durability: Arc, - runtime: runtime::Handle, -} - -impl DurabilityWorker { - /// Create a new [`DurabilityWorker`] using the given `durability` policy. - /// - /// Background tasks will be spawned onto to provided tokio `runtime`. - pub fn new( - database: Identity, - durability: Arc, - runtime: runtime::Handle, - next_tx_offset: TxOffset, - reorder_window_size: NonZeroUsize, - ) -> Self { - let (request_tx, request_rx) = channel(4 * 4096); - let (shutdown_tx, shutdown_rx) = channel(1); - - let actor = DurabilityWorkerActor { - request_rx, - shutdown: shutdown_rx, - durability: durability.clone(), - reorder_window: ReorderWindow::new(next_tx_offset, reorder_window_size), - reorder_window_len: WORKER_METRICS - .durability_worker_reorder_window_length - .with_label_values(&database), - }; - let _enter = runtime.enter(); - tokio::spawn( - actor - .run() - .instrument(info_span!("durability_worker", database = %database)), + tx_data: &Arc, +) { + let Some(tx_offset) = tx_data.tx_offset() else { + let name = reducer_context.as_ref().map(|rcx| &rcx.name); + debug_assert!( + !tx_data.has_rows_or_connect_disconnect(name), + "tx_data has no rows but has connect/disconnect: `{name:?}`" ); - - Self { - database, - request_tx, - shutdown: shutdown_tx, - durability, - runtime, - } - } - - /// Request that a transaction be made durable. - /// That is, if `(tx_data, ctx)` should be appended to the commitlog, do so. - /// - /// Note that by this stage - /// [`spacetimedb_datastore::locking_tx_datastore::committed_state::tx_consumes_offset`] - /// has already decided based on the reducer and operations whether the transaction should be appended; - /// this method is responsible only for reading its decision out of the `tx_data` - /// and calling `durability.append_tx`. - /// - /// This method sends the work to an actor that collects data and calls `durability.append_tx`. - /// It blocks if the queue is at capacity. - /// - /// # Panics - /// - /// Panics if the durability worker has already closed the receive end of - /// its queue. This may happen if - /// - /// - the backing [Durability] has panicked, or - /// - [Self::shutdown] was called - /// - pub fn request_durability(&self, reducer_context: Option, tx_data: &Arc) { - // We first try to send it without blocking. - match self.request_tx.try_reserve() { - Ok(permit) => { - permit.send(DurabilityRequest { - reducer_context, - tx_data: tx_data.clone(), - }); - } - Err(mpsc::error::TrySendError::Closed(_)) => { - panic!("durability actor vanished database={}", self.database); - } - Err(mpsc::error::TrySendError::Full(_)) => { - // If the channel was full, we use the blocking version. - let start = std::time::Instant::now(); - let send = || { - self.request_tx.blocking_send(DurabilityRequest { - reducer_context, - tx_data: tx_data.clone(), - }) - }; - if tokio::runtime::Handle::try_current().is_ok() { - tokio::task::block_in_place(send) - } else { - send() - } - .unwrap_or_else(|_| panic!("durability actor vanished database={}", self.database)); - // We could cache this metric, but if we are already in the blocking code path, - // the extra time of looking up the metric is probably negligible. - WORKER_METRICS - .durability_blocking_send_duration - .with_label_values(&self.database) - .observe(start.elapsed().as_secs_f64()); - } - } - } - - /// Get the [`DurableOffset`] of this database. - pub fn durable_tx_offset(&self) -> DurableOffset { - self.durability.durable_tx_offset() - } - - /// Shut down the worker without dropping it, - /// flushing outstanding transaction. - /// - /// Closes the internal channel, then waits for the [DurableOffset] to - /// report the offset of the most recently enqueued transaction as durable. - /// - /// # Panics - /// - /// After this method was called, calling [Self::request_durability] - /// will panic. - pub async fn close(&self) -> Option { - let (done_tx, done_rx) = oneshot::channel(); - // Channel errors can be ignored. - // It just means that the actor already exited. - let _ = self - .shutdown - .send(done_tx) - .map_err(drop) - .and_then(|()| done_rx.map_err(drop)) - .and_then(|done| async move { - done.await; - Ok(()) - }) - .await; - self.durability.close().await - } - - /// Consume `self` and run [Self::close]. - /// - /// The `lock_file` is not dropped until the shutdown is complete (either - /// successfully or unsuccessfully). This is to prevent the database to be - /// re-opened for writing while there is still an active background task - /// writing to the commitlog. - /// - /// The shutdown task will be spawned onto the tokio runtime provided to - /// [Self::new]. This means that the task may still be running when this - /// method returns. - /// - /// `database_identity` is used to associate log records with the database - /// owning this durability worker. - /// - /// This method is used in the `Drop` impl for [crate::db::relational_db::RelationalDB]. - pub(super) fn spawn_close(self, database_identity: Identity) { - let rt = self.runtime.clone(); - rt.spawn(async move { - let label = format!("[{database_identity}]"); - // Apply a timeout, in case `Durability::close` doesn't terminate - // as advertised. This is a bug, but panicking here would not - // unwind at the call site. - match timeout(Duration::from_secs(10), self.close()).await { - Err(_elapsed) => { - error!("{label} timeout waiting for durability worker shutdown"); - } - Ok(offset) => { - info!("{label} durability worker shut down at tx offset: {offset:?}"); - } - } - }); - } + return; + }; + let tx_data = tx_data.clone(); + durability.append_tx(Box::new(move || { + prepare_tx_data_for_durability(tx_offset, reducer_context, &tx_data) + })); } -#[derive(Debug, Error)] -enum ReorderError { - #[error("reordering window exceeded")] - SizeExceeded, - #[error("transaction offset behind expected offset")] - TxBehind, -} - -/// A bounded collection of elements ordered by [TxOffset], backed by a [BinaryHeap]. -/// -/// This exists to tolerate slightly out-of-order requests. -/// See the struct docs for [DurabilityWorker] for more context. -struct ReorderWindow { - heap: BinaryHeap>>, - next_tx: TxOffset, - max_len: NonZeroUsize, -} - -impl ReorderWindow { - pub fn new(next_tx: TxOffset, max_len: NonZeroUsize) -> Self { - // We expect that requests usually arrive in order, - // so allocate only a single element for the common case. - let heap = BinaryHeap::with_capacity(1); - Self { heap, next_tx, max_len } - } - - /// Push a durability request onto the heap. - /// - /// # Errors - /// - /// The method returns an error if: - /// - /// - the window is full, i.e. `self.len() >= self.max_len` - /// - the `tx_offset` of the request is smaller than the next expected offset - /// - pub fn push(&mut self, req: TxOrdered) -> Result<(), ReorderError> { - if self.len() >= self.max_len.get() { - return Err(ReorderError::SizeExceeded); - } - if req.tx_offset < self.next_tx { - return Err(ReorderError::TxBehind); - } - // We've got an out-of-order request, - // eagerly allocate the max capacity. - if self.len() > 0 { - self.heap.reserve_exact(self.max_len.get()); - } - self.heap.push(Reverse(req)); - - Ok(()) - } - - /// Remove all [DurabilityRequest]s in order, until a gap in the offset - /// sequence is detected or the heap is empty. - pub fn drain(&mut self) -> impl Iterator { - iter::from_fn(|| { - let min_tx_offset = self.heap.peek().map(|Reverse(x)| x.tx_offset); - if min_tx_offset.is_some_and(|tx_offset| tx_offset == self.next_tx) { - let Reverse(TxOrdered { inner: request, .. }) = self.heap.pop().unwrap(); - self.next_tx += 1; - Some(request) - } else { - None +pub(super) fn spawn_close(durability: Arc, runtime: &runtime::Handle, database_identity: Identity) { + let rt = runtime.clone(); + rt.spawn(async move { + let label = format!("[{database_identity}]"); + match timeout(Duration::from_secs(10), durability.close()).await { + Err(_elapsed) => { + error!("{label} timeout waiting for durability shutdown"); } - }) - } - - pub fn len(&self) -> usize { - self.heap.len() - } -} - -pub struct DurabilityWorkerActor { - request_rx: mpsc::Receiver, - shutdown: Receiver, - durability: Arc, - reorder_window: ReorderWindow, - reorder_window_len: IntGauge, -} - -impl DurabilityWorkerActor { - /// Processes requests to do durability. - async fn run(mut self) { - // When this future completes or is cancelled, ensure that: - // - shutdown waiters are notified - // - metrics are reset - let done = scopeguard::guard(Arc::new(Notify::new()), |done| { - done.notify_waiters(); - self.reorder_window_len.set(0); - }); - - loop { - tokio::select! { - // Biased towards the shutdown channel, - // so that adding new requests is prevented promptly. - biased; - - Some(reply) = self.shutdown.recv() => { - self.request_rx.close(); - let _ = reply.send(done.clone().notified_owned()); - }, - - req = self.request_rx.recv() => { - let Some(request) = req else { - break; - }; - match request.tx_data.tx_offset() { - // Drop the request if it doesn't have a tx offset. - None => { - let name = request.reducer_context.as_ref().map(|rcx| &rcx.name); - debug_assert!( - !request.tx_data.has_rows_or_connect_disconnect(name), - "tx_data has no rows but has connect/disconnect: `{name:?}`" - ); - }, - // Otherwise, push to the reordering window. - Some(tx_offset) => { - let request = TxOrdered { tx_offset, inner: request }; - if let Err(e) = self.reorder_window.push(request) { - error!("{e}"); - break; - } - }, - } - } + Ok(offset) => { + info!("{label} durability shut down at tx offset: {offset:?}"); } - - // Drain all requests that are properly ordered. - self.reorder_window - .drain() - .for_each(|request| Self::do_durability(&*self.durability, request.reducer_context, &request.tx_data)); - self.reorder_window_len.set(self.reorder_window.len() as _); } + }); +} - info!("durability worker actor done"); - } - - pub fn do_durability(durability: &Durability, reducer_context: Option, tx_data: &TxData) { - let tx_offset = tx_data - .tx_offset() - .expect("txs without offset should have been dropped"); - - let mut inserts: Box<_> = tx_data - .persistent_inserts() - .map(|(table_id, rowdata)| Ops { table_id, rowdata }) - .collect(); - // What we get from `tx_data` is not necessarily sorted, - // but the durability layer expects by-table_id sorted data. - // Unstable sorts are valid, there will only ever be one entry per table_id. - inserts.sort_unstable_by_key(|ops| ops.table_id); - - let mut deletes: Box<_> = tx_data - .persistent_deletes() - .map(|(table_id, rowdata)| Ops { table_id, rowdata }) - .collect(); - deletes.sort_unstable_by_key(|ops| ops.table_id); - - let mut truncates: Box<[_]> = tx_data.persistent_truncates().collect(); - truncates.sort_unstable_by_key(|table_id| *table_id); - - let inputs = reducer_context.map(|rcx| rcx.into()); - - debug_assert!( - !(inserts.is_empty() && truncates.is_empty() && deletes.is_empty() && inputs.is_none()), - "empty transaction" - ); - - let txdata = Txdata { +fn prepare_tx_data_for_durability( + tx_offset: u64, + reducer_context: Option, + tx_data: &TxData, +) -> Transaction> { + let mut inserts: Box<_> = tx_data + .persistent_inserts() + .map(|(table_id, rowdata)| Ops { table_id, rowdata }) + .collect(); + inserts.sort_unstable_by_key(|ops| ops.table_id); + + let mut deletes: Box<_> = tx_data + .persistent_deletes() + .map(|(table_id, rowdata)| Ops { table_id, rowdata }) + .collect(); + deletes.sort_unstable_by_key(|ops| ops.table_id); + + let mut truncates: Box<[_]> = tx_data.persistent_truncates().collect(); + truncates.sort_unstable_by_key(|table_id| *table_id); + + let inputs = reducer_context.map(|rcx| rcx.into()); + + debug_assert!( + !(inserts.is_empty() && truncates.is_empty() && deletes.is_empty() && inputs.is_none()), + "empty transaction" + ); + + Transaction { + offset: tx_offset, + txdata: Txdata { inputs, outputs: None, mutations: Some(Mutations { @@ -412,213 +84,6 @@ impl DurabilityWorkerActor { deletes, truncates, }), - }; - - // This does not block, as per trait docs. - durability.append_tx(Transaction { - offset: tx_offset, - txdata, - }); - } -} - -/// Wrapper to sort [DurabilityRequest]s by [TxOffset]. -struct TxOrdered { - tx_offset: TxOffset, - inner: T, -} - -impl PartialEq for TxOrdered { - fn eq(&self, other: &Self) -> bool { - self.tx_offset == other.tx_offset - } -} - -impl Eq for TxOrdered {} - -#[allow(clippy::non_canonical_partial_ord_impl)] -impl PartialOrd for TxOrdered { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.tx_offset.cmp(&other.tx_offset)) - } -} - -impl Ord for TxOrdered { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.partial_cmp(other).unwrap() - } -} - -#[cfg(test)] -mod tests { - use std::{pin::pin, task::Poll}; - - use futures::FutureExt as _; - use pretty_assertions::assert_matches; - use spacetimedb_sats::product; - use spacetimedb_schema::table_name::TableName; - use tokio::sync::watch; - - use super::*; - use crate::db::relational_db::Txdata; - - #[derive(Default)] - struct CountingDurability { - appended: watch::Sender>, - durable: watch::Sender>, - } - - impl CountingDurability { - async fn mark_durable(&self, offset: TxOffset) { - self.appended - .subscribe() - .wait_for(|x| x.is_some_and(|appended_offset| appended_offset >= offset)) - .await - .unwrap(); - self.durable.send_modify(|durable_offset| { - durable_offset.replace(offset); - }); - } - } - - impl spacetimedb_durability::Durability for CountingDurability { - type TxData = Txdata; - - fn append_tx(&self, tx: Transaction) { - self.appended.send_modify(|offset| { - offset.replace(tx.offset); - }); - } - - fn durable_tx_offset(&self) -> DurableOffset { - self.durable.subscribe().into() - } - - fn close(&self) -> spacetimedb_durability::Close { - let mut durable = self.durable.subscribe(); - let appended = self.appended.subscribe(); - async move { - let durable_offset = durable - .wait_for(|durable| match (*durable).zip(*appended.borrow()) { - Some((durable_offset, appended_offset)) => durable_offset >= appended_offset, - None => false, - }) - .await - .unwrap(); - *durable_offset - } - .boxed() - } - } - - #[tokio::test(flavor = "multi_thread")] - async fn shutdown_waits_until_durable() { - let durability = Arc::new(CountingDurability::default()); - let worker = DurabilityWorker::new( - Identity::ONE, - durability.clone(), - runtime::Handle::current(), - 0, - NonZeroUsize::new(1).unwrap(), - ); - for i in 0..=10 { - let mut txdata = TxData::default(); - txdata.set_tx_offset(i); - // Ensure the transaction is non-empty. - txdata.set_inserts_for_table(4000.into(), &TableName::for_test("foo"), [product![42u8]].into()); - - worker.request_durability(None, &Arc::new(txdata)); - } - - let shutdown = worker.close(); - let mut shutdown_fut = pin!(shutdown); - assert_matches!( - futures::poll!(&mut shutdown_fut), - Poll::Pending, - "shutdown should be pending because requested > durable" - ); - - durability.mark_durable(5).await; - assert_matches!( - futures::poll!(&mut shutdown_fut), - Poll::Pending, - "shutdown should be pending because requested > durable" - ); - - durability.mark_durable(10).await; - assert_matches!( - futures::poll!(&mut shutdown_fut), - Poll::Ready(Some(10)), - "shutdown returns, reporting durable offset at 10" - ); - assert_eq!( - Some(10), - *durability.appended.borrow(), - "durability should have appended up to tx offset 10" - ); - } - - #[test] - fn reorder_window_sorts_by_tx_offset() { - let mut win = ReorderWindow::new(0, NonZeroUsize::new(5).unwrap()); - - for tx_offset in (0..5).rev() { - win.push(TxOrdered { - tx_offset, - inner: tx_offset, - }) - .unwrap(); - } - - let txs = win.drain().collect::>(); - assert_eq!(txs, &[0, 1, 2, 3, 4]); - } - - #[test] - fn reorder_window_stops_drain_at_gap() { - let mut win = ReorderWindow::new(0, NonZeroUsize::new(5).unwrap()); - - win.push(TxOrdered { tx_offset: 4, inner: 4 }).unwrap(); - assert!(win.drain().collect::>().is_empty()); - - for tx_offset in 0..4 { - win.push(TxOrdered { - tx_offset, - inner: tx_offset, - }) - .unwrap(); - } - - let txs = win.drain().collect::>(); - assert_eq!(&txs, &[0, 1, 2, 3, 4]); - } - - #[test] - fn reorder_window_error_when_full() { - let mut win = ReorderWindow::new(0, NonZeroUsize::new(1).unwrap()); - win.push(TxOrdered { - tx_offset: 0, - inner: (), - }) - .unwrap(); - assert_matches!( - win.push(TxOrdered { - tx_offset: 1, - inner: () - }), - Err(ReorderError::SizeExceeded) - ); - } - - #[test] - fn reorder_window_error_on_late_request() { - let mut win = ReorderWindow::new(1, NonZeroUsize::new(5).unwrap()); - assert_matches!( - win.push(TxOrdered { - tx_offset: 0, - inner: () - }), - Err(ReorderError::TxBehind) - ); + }, } } diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index f1c70bec8ee..e2ec35fcccf 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -1,4 +1,4 @@ -use crate::db::durability::DurabilityWorker; +use crate::db::durability::{request_durability, spawn_close as spawn_durability_close}; use crate::db::MetricsRecorderQueue; use crate::error::{DBError, RestoreSnapshotError}; use crate::subscription::ExecutionCounters; @@ -12,7 +12,7 @@ use spacetimedb_commitlog::{self as commitlog, Commitlog, SizeOnDisk}; use spacetimedb_data_structures::map::HashSet; use spacetimedb_datastore::db_metrics::DB_METRICS; use spacetimedb_datastore::error::{DatastoreError, TableError, ViewError}; -use spacetimedb_datastore::execution_context::{Workload, WorkloadType}; +use spacetimedb_datastore::execution_context::{ReducerContext, Workload, WorkloadType}; use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics; use spacetimedb_datastore::locking_tx_datastore::state_view::{ IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, StateView, @@ -58,7 +58,6 @@ use spacetimedb_table::table::{RowRef, TableScanIter}; use spacetimedb_table::table_index::IndexKey; use std::borrow::Cow; use std::io; -use std::num::NonZeroUsize; use std::ops::RangeBounds; use std::sync::Arc; use tokio::sync::watch; @@ -99,7 +98,8 @@ pub struct RelationalDB { owner_identity: Identity, inner: Locking, - durability: Option, + durability: Option>, + durability_runtime: Option, snapshot_worker: Option, row_count_fn: RowCountFn, @@ -134,8 +134,8 @@ impl std::fmt::Debug for RelationalDB { impl Drop for RelationalDB { fn drop(&mut self) { // Attempt to flush the outstanding transactions. - if let Some(worker) = self.durability.take() { - worker.spawn_close(self.database_identity); + if let (Some(durability), Some(runtime)) = (self.durability.take(), self.durability_runtime.take()) { + spawn_durability_close(durability, &runtime, self.database_identity); } } } @@ -151,21 +151,12 @@ impl RelationalDB { let workload_type_to_exec_counters = Arc::new(EnumMap::from_fn(|ty| ExecutionCounters::new(&ty, &database_identity))); - let (durability, disk_size_fn, snapshot_worker, rt) = Persistence::unzip(persistence); - let durability = durability.zip(rt).map(|(durability, rt)| { - let next_tx_offset = { - let tx = inner.begin_tx(Workload::Internal); - let next_tx_offset = tx.tx_offset(); - let _ = inner.release_tx(tx); - next_tx_offset.into_inner() - }; - let reorder_window_size = NonZeroUsize::new(8).unwrap(); - DurabilityWorker::new(database_identity, durability, rt, next_tx_offset, reorder_window_size) - }); + let (durability, disk_size_fn, snapshot_worker, durability_runtime) = Persistence::unzip(persistence); Self { inner, durability, + durability_runtime, snapshot_worker, database_identity, @@ -814,17 +805,15 @@ impl RelationalDB { let reducer_context = tx.ctx.reducer_context().cloned(); // TODO: Never returns `None` -- should it? - let Some((tx_offset, tx_data, tx_metrics, reducer)) = self.inner.commit_mut_tx(tx)? else { + let Some((tx_offset, tx_data, tx_metrics, reducer)) = self.inner.commit_mut_tx_and_then(tx, |tx_data| { + self.request_durability(reducer_context, tx_data); + })? + else { return Ok(None); }; self.maybe_do_snapshot(&tx_data); - let tx_data = Arc::new(tx_data); - if let Some(durability) = &self.durability { - durability.request_durability(reducer_context, &tx_data); - } - Ok(Some((tx_offset, tx_data, tx_metrics, reducer))) } @@ -832,15 +821,13 @@ impl RelationalDB { pub fn commit_tx_downgrade(&self, tx: MutTx, workload: Workload) -> (Arc, TxMetrics, Tx) { log::trace!("COMMIT MUT TX"); - let (tx_data, tx_metrics, tx) = self.inner.commit_mut_tx_downgrade(tx, workload); + let reducer_context = tx.ctx.reducer_context().cloned(); + let (tx_data, tx_metrics, tx) = self.inner.commit_mut_tx_downgrade_and_then(tx, workload, |tx_data| { + self.request_durability(reducer_context, tx_data); + }); self.maybe_do_snapshot(&tx_data); - let tx_data = Arc::new(tx_data); - if let Some(durability) = &self.durability { - durability.request_durability(tx.ctx.reducer_context().cloned(), &tx_data); - } - (tx_data, tx_metrics, tx) } @@ -852,6 +839,12 @@ impl RelationalDB { .map(|durability| durability.durable_tx_offset()) } + fn request_durability(&self, reducer_context: Option, tx_data: &Arc) { + if let Some(durability) = &self.durability { + request_durability(durability.as_ref(), reducer_context, tx_data); + } + } + /// Decide based on the `committed_state.next_tx_offset` /// whether to request that the [`SnapshotWorker`] in `self` capture a snapshot of the database. /// diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 92f296f3b8c..301bbc47767 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -1900,7 +1900,7 @@ mod tests { use spacetimedb_commitlog::{commitlog, repo}; use spacetimedb_data_structures::map::{HashCollectionExt as _, HashMap}; use spacetimedb_datastore::system_tables::{StRowLevelSecurityRow, ST_ROW_LEVEL_SECURITY_ID}; - use spacetimedb_durability::{Durability, EmptyHistory, Transaction, TxOffset}; + use spacetimedb_durability::{Durability, EmptyHistory, TxOffset}; use spacetimedb_execution::dml::MutDatastore; use spacetimedb_lib::bsatn::ToBsatn; use spacetimedb_lib::db::auth::StAccess; @@ -1987,7 +1987,8 @@ mod tests { impl Durability for ManualDurability { type TxData = Txdata; - fn append_tx(&self, tx: Transaction) { + fn append_tx(&self, tx: spacetimedb_durability::PreparedTx) { + let tx = tx.into_transaction(); let mut commitlog = self.commitlog.write().unwrap(); commitlog.commit([tx]).expect("commit failed"); commitlog.flush().expect("error flushing commitlog"); diff --git a/crates/core/src/worker_metrics/mod.rs b/crates/core/src/worker_metrics/mod.rs index 7f5492a7057..787fd9f6d57 100644 --- a/crates/core/src/worker_metrics/mod.rs +++ b/crates/core/src/worker_metrics/mod.rs @@ -491,11 +491,6 @@ metrics_group!( #[labels(database_identity: Identity)] #[buckets(0.001, 0.01, 0.1, 1.0, 10.0)] pub durability_blocking_send_duration: HistogramVec, - - #[name = spacetime_durability_worker_reorder_window_length] - #[help = "The number of transactions currently being held in the reorder window"] - #[labels(db: Identity)] - pub durability_worker_reorder_window_length: IntGaugeVec, } ); diff --git a/crates/datastore/src/locking_tx_datastore/datastore.rs b/crates/datastore/src/locking_tx_datastore/datastore.rs index 53327b55c8b..336476fa6e6 100644 --- a/crates/datastore/src/locking_tx_datastore/datastore.rs +++ b/crates/datastore/src/locking_tx_datastore/datastore.rs @@ -968,6 +968,27 @@ impl Locking { pub fn commit_mut_tx_downgrade(&self, tx: MutTxId, workload: Workload) -> (TxData, TxMetrics, TxId) { tx.commit_downgrade(workload) } + + /// Commit `tx` and invoke `before_release` while the write lock is still held. + #[allow(clippy::type_complexity)] + pub fn commit_mut_tx_and_then( + &self, + tx: MutTxId, + before_release: impl FnOnce(&Arc), + ) -> Result, TxMetrics, Option)>> { + Ok(Some(tx.commit_and_then(before_release))) + } + + /// Commit `tx`, invoke `before_downgrade` while the write lock is still held, + /// then downgrade the lock to a read-only transaction. + pub fn commit_mut_tx_downgrade_and_then( + &self, + tx: MutTxId, + workload: Workload, + before_downgrade: impl FnOnce(&Arc), + ) -> (Arc, TxMetrics, TxId) { + tx.commit_downgrade_and_then(workload, before_downgrade) + } } #[derive(Debug, Error)] diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index 15772807cba..6e817fbdccd 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -1953,7 +1953,17 @@ impl MutTxId { /// - [`TxData`], the set of inserts and deletes performed by this transaction. /// - [`TxMetrics`], various measurements of the work performed by this transaction. /// - `String`, the name of the reducer which ran during this transaction. - pub(super) fn commit(mut self) -> (TxOffset, TxData, TxMetrics, Option) { + pub(super) fn commit(self) -> (TxOffset, TxData, TxMetrics, Option) { + let (tx_offset, tx_data, tx_metrics, reducer) = self.commit_and_then(|_| {}); + let tx_data = + Arc::try_unwrap(tx_data).unwrap_or_else(|_| panic!("noop commit callback must not retain tx data")); + (tx_offset, tx_data, tx_metrics, reducer) + } + + pub(super) fn commit_and_then( + mut self, + before_release: impl FnOnce(&Arc), + ) -> (TxOffset, Arc, TxMetrics, Option) { let tx_offset = self.committed_state_write_lock.next_tx_offset; let tx_data = self .committed_state_write_lock @@ -1987,6 +1997,9 @@ impl MutTxId { tx_offset }; + let tx_data = Arc::new(tx_data); + before_release(&tx_data); + (tx_offset, tx_data, tx_metrics, reducer) } @@ -2003,7 +2016,18 @@ impl MutTxId { /// - [`TxData`], the set of inserts and deletes performed by this transaction. /// - [`TxMetrics`], various measurements of the work performed by this transaction. /// - [`TxId`], a read-only transaction with a shared lock on the committed state. - pub(super) fn commit_downgrade(mut self, workload: Workload) -> (TxData, TxMetrics, TxId) { + pub(super) fn commit_downgrade(self, workload: Workload) -> (TxData, TxMetrics, TxId) { + let (tx_data, tx_metrics, tx) = self.commit_downgrade_and_then(workload, |_| {}); + let tx_data = + Arc::try_unwrap(tx_data).unwrap_or_else(|_| panic!("noop commit callback must not retain tx data")); + (tx_data, tx_metrics, tx) + } + + pub(super) fn commit_downgrade_and_then( + mut self, + workload: Workload, + before_downgrade: impl FnOnce(&Arc), + ) -> (Arc, TxMetrics, TxId) { let tx_data = self .committed_state_write_lock .merge(self.tx_state, self.read_sets, &self.ctx); @@ -2022,6 +2046,9 @@ impl MutTxId { &self.committed_state_write_lock, ); + let tx_data = Arc::new(tx_data); + before_downgrade(&tx_data); + // Update the workload type of the execution context self.ctx.workload = workload.workload_type(); let tx = TxId { diff --git a/crates/durability/src/imp/local.rs b/crates/durability/src/imp/local.rs index 743740f6242..43388128eeb 100644 --- a/crates/durability/src/imp/local.rs +++ b/crates/durability/src/imp/local.rs @@ -22,7 +22,7 @@ use tokio::{ }; use tracing::{instrument, Span}; -use crate::{Close, Durability, DurableOffset, History, TxOffset}; +use crate::{Close, Durability, DurableOffset, History, PreparedTx, TxOffset}; pub use spacetimedb_commitlog::repo::{OnNewSegmentFn, SizeOnDisk}; @@ -38,7 +38,8 @@ pub struct Options { /// transactions that are currently in the queue, but shrink the buffer to /// `batch_capacity` if it had to make additional space during a burst. /// - /// The internal queue of [Local] is bounded to `2 * batch_capacity`. + /// The internal queue of [Local] is bounded to + /// `Options::QUEUE_CAPACITY_MULTIPLIER * batch_capacity`. /// /// Default: 4096 pub batch_capacity: NonZeroUsize, @@ -48,6 +49,11 @@ pub struct Options { impl Options { pub const DEFAULT_BATCH_CAPACITY: NonZeroUsize = NonZeroUsize::new(4096).unwrap(); + pub const QUEUE_CAPACITY_MULTIPLIER: usize = 4; + + fn queue_capacity(self) -> usize { + Self::QUEUE_CAPACITY_MULTIPLIER * self.batch_capacity.get() + } } impl Default for Options { @@ -89,9 +95,11 @@ pub struct Local { /// Backlog of transactions to be written to disk by the background /// [`PersisterTask`]. /// - /// The queue is bounded to `4 * Option::batch_capacity`. - queue: mpsc::Sender>>, - /// How many transactions are sitting in the `queue`. + /// The queue is bounded to + /// `Options::QUEUE_CAPACITY_MULTIPLIER * Options::batch_capacity`. + queue: mpsc::Sender>>, + /// How many transactions are pending durability, including items buffered + /// in the queue and items currently being written by the actor. /// /// This is mainly for observability purposes, and can thus be updated with /// relaxed memory ordering. @@ -128,7 +136,8 @@ impl Local { opts.commitlog, on_new_segment, )?); - let (queue, txdata_rx) = mpsc::channel(4 * opts.batch_capacity.get()); + let queue_capacity = opts.queue_capacity(); + let (queue, txdata_rx) = mpsc::channel(queue_capacity); let queue_depth = Arc::new(AtomicU64::new(0)); let (durable_tx, durable_rx) = watch::channel(clog.max_committed_offset()); let (shutdown_tx, shutdown_rx) = mpsc::channel(1); @@ -209,7 +218,7 @@ impl Actor { #[instrument(name = "durability::local::actor", skip_all)] async fn run( self, - mut transactions_rx: mpsc::Receiver>>, + mut transactions_rx: mpsc::Receiver>>, mut shutdown_rx: mpsc::Receiver>, ) { info!("starting durability actor"); @@ -239,11 +248,16 @@ impl Actor { if n == 0 { break; } - self.queue_depth.fetch_sub(n as u64, Relaxed); + if tx_buf.is_empty() { + continue; + } + let clog = self.clog.clone(); - tx_buf = spawn_blocking(move || -> io::Result>>> { + let ready_len = tx_buf.len(); + self.queue_depth.fetch_sub(ready_len as u64, Relaxed); + tx_buf = spawn_blocking(move || -> io::Result>>> { for tx in tx_buf.drain(..) { - clog.commit([tx])?; + clog.commit([tx.into_transaction()])?; } Ok(tx_buf) }) @@ -329,24 +343,30 @@ impl Drop for Lock { impl Durability for Local { type TxData = Txdata; - fn append_tx(&self, tx: Transaction) { - match self.queue.try_reserve() { - Ok(permit) => permit.send(tx), + fn append_tx(&self, tx: PreparedTx) { + let mut tx = Some(tx); + let blocked = match self.queue.try_reserve() { + Ok(permit) => { + permit.send(tx.take().expect("tx already sent")); + false + } Err(mpsc::error::TrySendError::Closed(_)) => { panic!("durability actor crashed"); } Err(mpsc::error::TrySendError::Full(_)) => { - let send = || self.queue.blocking_send(tx); + let mut send = || self.queue.blocking_send(tx.take().expect("tx already sent")); if tokio::runtime::Handle::try_current().is_ok() { tokio::task::block_in_place(send) } else { send() } .expect("durability actor crashed"); + true } - } + }; self.queue_depth.fetch_add(1, Relaxed); + let _ = blocked; } fn durable_tx_offset(&self) -> DurableOffset { diff --git a/crates/durability/src/imp/mod.rs b/crates/durability/src/imp/mod.rs index 1636e05cc51..3e00ae21ee1 100644 --- a/crates/durability/src/imp/mod.rs +++ b/crates/durability/src/imp/mod.rs @@ -15,7 +15,7 @@ mod testing { use futures::FutureExt as _; use tokio::sync::watch; - use crate::{Close, Durability, DurableOffset, Transaction, TxOffset}; + use crate::{Close, Durability, DurableOffset, PreparedTx, TxOffset}; /// A [`Durability`] impl that sends all transactions into the void. /// @@ -41,7 +41,7 @@ mod testing { impl Durability for NoDurability { type TxData = T; - fn append_tx(&self, _: Transaction) { + fn append_tx(&self, _: PreparedTx) { if self.closed.load(Ordering::Relaxed) { panic!("`close` was called on this `NoDurability` instance"); } diff --git a/crates/durability/src/lib.rs b/crates/durability/src/lib.rs index 7722d86914a..2b2aadcbbfb 100644 --- a/crates/durability/src/lib.rs +++ b/crates/durability/src/lib.rs @@ -91,6 +91,29 @@ impl From>> for DurableOffset { /// can be used as a trait object without knowing the type of the `close` future. pub type Close = BoxFuture<'static, Option>; +/// Object-safe conversion into an owned [Transaction]. +pub trait IntoTransaction: Send { + fn into_transaction(self: Box) -> Transaction; +} + +impl IntoTransaction for Transaction { + fn into_transaction(self: Box) -> Transaction { + *self + } +} + +impl IntoTransaction for F +where + T: Send + 'static, + F: FnOnce() -> Transaction + Send + 'static, +{ + fn into_transaction(self: Box) -> Transaction { + self() + } +} + +pub type PreparedTx = Box>; + /// The durability API. /// /// NOTE: This is a preliminary definition, still under consideration. @@ -105,7 +128,7 @@ pub trait Durability: Send + Sync { /// The payload representing a single transaction. type TxData; - /// Submit a [Transaction] to be made durable. + /// Submit work that yields a [Transaction] to be made durable. /// /// This method must never block, and accept new transactions even if they /// cannot be made durable immediately. @@ -121,7 +144,7 @@ pub trait Durability: Send + Sync { // (i.e. a torn write will corrupt all transactions contained in it), and it // is very unclear when it is both correct and beneficial to bundle more // than a single transaction into a commit. - fn append_tx(&self, tx: Transaction); + fn append_tx(&self, tx: PreparedTx); /// Obtain a handle to the [DurableOffset]. fn durable_tx_offset(&self) -> DurableOffset; diff --git a/crates/durability/tests/io/fallocate.rs b/crates/durability/tests/io/fallocate.rs index b4c6dbd1a7b..05695e20302 100644 --- a/crates/durability/tests/io/fallocate.rs +++ b/crates/durability/tests/io/fallocate.rs @@ -36,7 +36,7 @@ use spacetimedb_commitlog::{ segment, tests::helpers::enable_logging, }; -use spacetimedb_durability::{local::OpenError, Durability, Txdata}; +use spacetimedb_durability::{local::OpenError, Durability, Transaction, Txdata}; use spacetimedb_paths::{server::ReplicaDir, FromPathUnchecked}; use tempfile::{NamedTempFile, TempDir}; use tokio::{sync::watch, time::sleep}; @@ -99,7 +99,7 @@ async fn local_durability_crashes_on_new_segment_if_not_enough_space() { new_segment_rx.borrow_and_update(); // Write past available space. for offset in 0..256 { - durability.append_tx((offset, txdata.clone()).into()); + durability.append_tx(Box::new(Transaction::from((offset, txdata.clone())))); } // Ensure new segment is created. new_segment_rx.changed().await?; @@ -107,7 +107,7 @@ async fn local_durability_crashes_on_new_segment_if_not_enough_space() { sleep(Duration::from_millis(5)).await; // Durability actor should have crashed, so this should panic. info!("trying append on crashed durability"); - durability.append_tx((256, txdata.clone()).into()); + durability.append_tx(Box::new(Transaction::from((256, txdata.clone())))); } Ok(())