Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
659 changes: 62 additions & 597 deletions crates/core/src/db/durability.rs

Large diffs are not rendered by default.

51 changes: 22 additions & 29 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::db::durability::DurabilityWorker;
use crate::db::durability::{request_durability, spawn_close as spawn_durability_close};
use crate::db::MetricsRecorderQueue;
use crate::error::{DBError, RestoreSnapshotError};
use crate::subscription::ExecutionCounters;
Expand All @@ -12,7 +12,7 @@ use spacetimedb_commitlog::{self as commitlog, Commitlog, SizeOnDisk};
use spacetimedb_data_structures::map::HashSet;
use spacetimedb_datastore::db_metrics::DB_METRICS;
use spacetimedb_datastore::error::{DatastoreError, TableError, ViewError};
use spacetimedb_datastore::execution_context::{Workload, WorkloadType};
use spacetimedb_datastore::execution_context::{ReducerContext, Workload, WorkloadType};
use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics;
use spacetimedb_datastore::locking_tx_datastore::state_view::{
IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, StateView,
Expand Down Expand Up @@ -58,7 +58,6 @@ use spacetimedb_table::table::{RowRef, TableScanIter};
use spacetimedb_table::table_index::IndexKey;
use std::borrow::Cow;
use std::io;
use std::num::NonZeroUsize;
use std::ops::RangeBounds;
use std::sync::Arc;
use tokio::sync::watch;
Expand Down Expand Up @@ -99,7 +98,8 @@ pub struct RelationalDB {
owner_identity: Identity,

inner: Locking,
durability: Option<DurabilityWorker>,
durability: Option<Arc<Durability>>,
durability_runtime: Option<tokio::runtime::Handle>,
snapshot_worker: Option<SnapshotWorker>,

row_count_fn: RowCountFn,
Expand Down Expand Up @@ -134,8 +134,8 @@ impl std::fmt::Debug for RelationalDB {
impl Drop for RelationalDB {
fn drop(&mut self) {
// Attempt to flush the outstanding transactions.
if let Some(worker) = self.durability.take() {
worker.spawn_close(self.database_identity);
if let (Some(durability), Some(runtime)) = (self.durability.take(), self.durability_runtime.take()) {
spawn_durability_close(durability, &runtime, self.database_identity);
}
}
}
Expand All @@ -151,21 +151,12 @@ impl RelationalDB {
let workload_type_to_exec_counters =
Arc::new(EnumMap::from_fn(|ty| ExecutionCounters::new(&ty, &database_identity)));

let (durability, disk_size_fn, snapshot_worker, rt) = Persistence::unzip(persistence);
let durability = durability.zip(rt).map(|(durability, rt)| {
let next_tx_offset = {
let tx = inner.begin_tx(Workload::Internal);
let next_tx_offset = tx.tx_offset();
let _ = inner.release_tx(tx);
next_tx_offset.into_inner()
};
let reorder_window_size = NonZeroUsize::new(8).unwrap();
DurabilityWorker::new(database_identity, durability, rt, next_tx_offset, reorder_window_size)
});
let (durability, disk_size_fn, snapshot_worker, durability_runtime) = Persistence::unzip(persistence);

Self {
inner,
durability,
durability_runtime,
snapshot_worker,

database_identity,
Expand Down Expand Up @@ -814,33 +805,29 @@ impl RelationalDB {

let reducer_context = tx.ctx.reducer_context().cloned();
// TODO: Never returns `None` -- should it?
let Some((tx_offset, tx_data, tx_metrics, reducer)) = self.inner.commit_mut_tx(tx)? else {
let Some((tx_offset, tx_data, tx_metrics, reducer)) = self.inner.commit_mut_tx_and_then(tx, |tx_data| {
self.request_durability(reducer_context, tx_data);
})?
else {
return Ok(None);
};

self.maybe_do_snapshot(&tx_data);

let tx_data = Arc::new(tx_data);
if let Some(durability) = &self.durability {
durability.request_durability(reducer_context, &tx_data);
}

Ok(Some((tx_offset, tx_data, tx_metrics, reducer)))
}

#[tracing::instrument(level = "trace", skip_all)]
pub fn commit_tx_downgrade(&self, tx: MutTx, workload: Workload) -> (Arc<TxData>, TxMetrics, Tx) {
log::trace!("COMMIT MUT TX");

let (tx_data, tx_metrics, tx) = self.inner.commit_mut_tx_downgrade(tx, workload);
let reducer_context = tx.ctx.reducer_context().cloned();
let (tx_data, tx_metrics, tx) = self.inner.commit_mut_tx_downgrade_and_then(tx, workload, |tx_data| {
self.request_durability(reducer_context, tx_data);
});

self.maybe_do_snapshot(&tx_data);

let tx_data = Arc::new(tx_data);
if let Some(durability) = &self.durability {
durability.request_durability(tx.ctx.reducer_context().cloned(), &tx_data);
}

(tx_data, tx_metrics, tx)
}

Expand All @@ -852,6 +839,12 @@ impl RelationalDB {
.map(|durability| durability.durable_tx_offset())
}

fn request_durability(&self, reducer_context: Option<ReducerContext>, tx_data: &Arc<TxData>) {
if let Some(durability) = &self.durability {
request_durability(durability.as_ref(), reducer_context, tx_data);
}
}

/// Decide based on the `committed_state.next_tx_offset`
/// whether to request that the [`SnapshotWorker`] in `self` capture a snapshot of the database.
///
Expand Down
5 changes: 3 additions & 2 deletions crates/core/src/subscription/module_subscription_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1900,7 +1900,7 @@ mod tests {
use spacetimedb_commitlog::{commitlog, repo};
use spacetimedb_data_structures::map::{HashCollectionExt as _, HashMap};
use spacetimedb_datastore::system_tables::{StRowLevelSecurityRow, ST_ROW_LEVEL_SECURITY_ID};
use spacetimedb_durability::{Durability, EmptyHistory, Transaction, TxOffset};
use spacetimedb_durability::{Durability, EmptyHistory, TxOffset};
use spacetimedb_execution::dml::MutDatastore;
use spacetimedb_lib::bsatn::ToBsatn;
use spacetimedb_lib::db::auth::StAccess;
Expand Down Expand Up @@ -1987,7 +1987,8 @@ mod tests {
impl Durability for ManualDurability {
type TxData = Txdata;

fn append_tx(&self, tx: Transaction<Self::TxData>) {
fn append_tx(&self, tx: spacetimedb_durability::PreparedTx<Self::TxData>) {
let tx = tx.into_transaction();
let mut commitlog = self.commitlog.write().unwrap();
commitlog.commit([tx]).expect("commit failed");
commitlog.flush().expect("error flushing commitlog");
Expand Down
5 changes: 0 additions & 5 deletions crates/core/src/worker_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,11 +491,6 @@ metrics_group!(
#[labels(database_identity: Identity)]
#[buckets(0.001, 0.01, 0.1, 1.0, 10.0)]
pub durability_blocking_send_duration: HistogramVec,

#[name = spacetime_durability_worker_reorder_window_length]
#[help = "The number of transactions currently being held in the reorder window"]
#[labels(db: Identity)]
pub durability_worker_reorder_window_length: IntGaugeVec,
}
);

Expand Down
21 changes: 21 additions & 0 deletions crates/datastore/src/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -968,6 +968,27 @@ impl Locking {
pub fn commit_mut_tx_downgrade(&self, tx: MutTxId, workload: Workload) -> (TxData, TxMetrics, TxId) {
tx.commit_downgrade(workload)
}

/// Commit `tx` and invoke `before_release` while the write lock is still held.
#[allow(clippy::type_complexity)]
pub fn commit_mut_tx_and_then(
&self,
tx: MutTxId,
before_release: impl FnOnce(&Arc<TxData>),
) -> Result<Option<(TxOffset, Arc<TxData>, TxMetrics, Option<ReducerName>)>> {
Ok(Some(tx.commit_and_then(before_release)))
}

/// Commit `tx`, invoke `before_downgrade` while the write lock is still held,
/// then downgrade the lock to a read-only transaction.
pub fn commit_mut_tx_downgrade_and_then(
&self,
tx: MutTxId,
workload: Workload,
before_downgrade: impl FnOnce(&Arc<TxData>),
) -> (Arc<TxData>, TxMetrics, TxId) {
tx.commit_downgrade_and_then(workload, before_downgrade)
}
}

#[derive(Debug, Error)]
Expand Down
31 changes: 29 additions & 2 deletions crates/datastore/src/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1953,7 +1953,17 @@ impl MutTxId {
/// - [`TxData`], the set of inserts and deletes performed by this transaction.
/// - [`TxMetrics`], various measurements of the work performed by this transaction.
/// - `String`, the name of the reducer which ran during this transaction.
pub(super) fn commit(mut self) -> (TxOffset, TxData, TxMetrics, Option<ReducerName>) {
pub(super) fn commit(self) -> (TxOffset, TxData, TxMetrics, Option<ReducerName>) {
let (tx_offset, tx_data, tx_metrics, reducer) = self.commit_and_then(|_| {});
let tx_data =
Arc::try_unwrap(tx_data).unwrap_or_else(|_| panic!("noop commit callback must not retain tx data"));
(tx_offset, tx_data, tx_metrics, reducer)
}

pub(super) fn commit_and_then(
mut self,
before_release: impl FnOnce(&Arc<TxData>),
) -> (TxOffset, Arc<TxData>, TxMetrics, Option<ReducerName>) {
let tx_offset = self.committed_state_write_lock.next_tx_offset;
let tx_data = self
.committed_state_write_lock
Expand Down Expand Up @@ -1987,6 +1997,9 @@ impl MutTxId {
tx_offset
};

let tx_data = Arc::new(tx_data);
before_release(&tx_data);

(tx_offset, tx_data, tx_metrics, reducer)
}

Expand All @@ -2003,7 +2016,18 @@ impl MutTxId {
/// - [`TxData`], the set of inserts and deletes performed by this transaction.
/// - [`TxMetrics`], various measurements of the work performed by this transaction.
/// - [`TxId`], a read-only transaction with a shared lock on the committed state.
pub(super) fn commit_downgrade(mut self, workload: Workload) -> (TxData, TxMetrics, TxId) {
pub(super) fn commit_downgrade(self, workload: Workload) -> (TxData, TxMetrics, TxId) {
let (tx_data, tx_metrics, tx) = self.commit_downgrade_and_then(workload, |_| {});
let tx_data =
Arc::try_unwrap(tx_data).unwrap_or_else(|_| panic!("noop commit callback must not retain tx data"));
(tx_data, tx_metrics, tx)
}

pub(super) fn commit_downgrade_and_then(
mut self,
workload: Workload,
before_downgrade: impl FnOnce(&Arc<TxData>),
) -> (Arc<TxData>, TxMetrics, TxId) {
let tx_data = self
.committed_state_write_lock
.merge(self.tx_state, self.read_sets, &self.ctx);
Expand All @@ -2022,6 +2046,9 @@ impl MutTxId {
&self.committed_state_write_lock,
);

let tx_data = Arc::new(tx_data);
before_downgrade(&tx_data);

// Update the workload type of the execution context
self.ctx.workload = workload.workload_type();
let tx = TxId {
Expand Down
50 changes: 35 additions & 15 deletions crates/durability/src/imp/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use tokio::{
};
use tracing::{instrument, Span};

use crate::{Close, Durability, DurableOffset, History, TxOffset};
use crate::{Close, Durability, DurableOffset, History, PreparedTx, TxOffset};

pub use spacetimedb_commitlog::repo::{OnNewSegmentFn, SizeOnDisk};

Expand All @@ -38,7 +38,8 @@ pub struct Options {
/// transactions that are currently in the queue, but shrink the buffer to
/// `batch_capacity` if it had to make additional space during a burst.
///
/// The internal queue of [Local] is bounded to `2 * batch_capacity`.
/// The internal queue of [Local] is bounded to
/// `Options::QUEUE_CAPACITY_MULTIPLIER * batch_capacity`.
///
/// Default: 4096
pub batch_capacity: NonZeroUsize,
Expand All @@ -48,6 +49,11 @@ pub struct Options {

impl Options {
pub const DEFAULT_BATCH_CAPACITY: NonZeroUsize = NonZeroUsize::new(4096).unwrap();
pub const QUEUE_CAPACITY_MULTIPLIER: usize = 4;

fn queue_capacity(self) -> usize {
Self::QUEUE_CAPACITY_MULTIPLIER * self.batch_capacity.get()
}
}

impl Default for Options {
Expand Down Expand Up @@ -89,9 +95,11 @@ pub struct Local<T> {
/// Backlog of transactions to be written to disk by the background
/// [`PersisterTask`].
///
/// The queue is bounded to `4 * Option::batch_capacity`.
queue: mpsc::Sender<Transaction<Txdata<T>>>,
/// How many transactions are sitting in the `queue`.
/// The queue is bounded to
/// `Options::QUEUE_CAPACITY_MULTIPLIER * Options::batch_capacity`.
queue: mpsc::Sender<PreparedTx<Txdata<T>>>,
/// How many transactions are pending durability, including items buffered
/// in the queue and items currently being written by the actor.
///
/// This is mainly for observability purposes, and can thus be updated with
/// relaxed memory ordering.
Expand Down Expand Up @@ -128,7 +136,8 @@ impl<T: Encode + Send + Sync + 'static> Local<T> {
opts.commitlog,
on_new_segment,
)?);
let (queue, txdata_rx) = mpsc::channel(4 * opts.batch_capacity.get());
let queue_capacity = opts.queue_capacity();
let (queue, txdata_rx) = mpsc::channel(queue_capacity);
let queue_depth = Arc::new(AtomicU64::new(0));
let (durable_tx, durable_rx) = watch::channel(clog.max_committed_offset());
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
Expand Down Expand Up @@ -209,7 +218,7 @@ impl<T: Encode + Send + Sync + 'static> Actor<T> {
#[instrument(name = "durability::local::actor", skip_all)]
async fn run(
self,
mut transactions_rx: mpsc::Receiver<Transaction<Txdata<T>>>,
mut transactions_rx: mpsc::Receiver<PreparedTx<Txdata<T>>>,
mut shutdown_rx: mpsc::Receiver<oneshot::Sender<OwnedNotified>>,
) {
info!("starting durability actor");
Expand Down Expand Up @@ -239,11 +248,16 @@ impl<T: Encode + Send + Sync + 'static> Actor<T> {
if n == 0 {
break;
}
self.queue_depth.fetch_sub(n as u64, Relaxed);
if tx_buf.is_empty() {
continue;
}

let clog = self.clog.clone();
tx_buf = spawn_blocking(move || -> io::Result<Vec<Transaction<Txdata<T>>>> {
let ready_len = tx_buf.len();
self.queue_depth.fetch_sub(ready_len as u64, Relaxed);
tx_buf = spawn_blocking(move || -> io::Result<Vec<PreparedTx<Txdata<T>>>> {
for tx in tx_buf.drain(..) {
clog.commit([tx])?;
clog.commit([tx.into_transaction()])?;
}
Ok(tx_buf)
})
Expand Down Expand Up @@ -329,24 +343,30 @@ impl Drop for Lock {
impl<T: Send + Sync + 'static> Durability for Local<T> {
type TxData = Txdata<T>;

fn append_tx(&self, tx: Transaction<Self::TxData>) {
match self.queue.try_reserve() {
Ok(permit) => permit.send(tx),
fn append_tx(&self, tx: PreparedTx<Self::TxData>) {
let mut tx = Some(tx);
let blocked = match self.queue.try_reserve() {
Ok(permit) => {
permit.send(tx.take().expect("tx already sent"));
false
}
Err(mpsc::error::TrySendError::Closed(_)) => {
panic!("durability actor crashed");
}
Err(mpsc::error::TrySendError::Full(_)) => {
let send = || self.queue.blocking_send(tx);
let mut send = || self.queue.blocking_send(tx.take().expect("tx already sent"));
if tokio::runtime::Handle::try_current().is_ok() {
tokio::task::block_in_place(send)
} else {
send()
}
.expect("durability actor crashed");
true
}
}
};

self.queue_depth.fetch_add(1, Relaxed);
let _ = blocked;
}

fn durable_tx_offset(&self) -> DurableOffset {
Expand Down
4 changes: 2 additions & 2 deletions crates/durability/src/imp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ mod testing {
use futures::FutureExt as _;
use tokio::sync::watch;

use crate::{Close, Durability, DurableOffset, Transaction, TxOffset};
use crate::{Close, Durability, DurableOffset, PreparedTx, TxOffset};

/// A [`Durability`] impl that sends all transactions into the void.
///
Expand All @@ -41,7 +41,7 @@ mod testing {
impl<T: Send + Sync> Durability for NoDurability<T> {
type TxData = T;

fn append_tx(&self, _: Transaction<Self::TxData>) {
fn append_tx(&self, _: PreparedTx<Self::TxData>) {
if self.closed.load(Ordering::Relaxed) {
panic!("`close` was called on this `NoDurability` instance");
}
Expand Down
Loading
Loading