From 8cd7b385cb7d9cb006f60ffdf275c5d1ba85bc53 Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Sun, 15 Feb 2026 09:08:41 -0800 Subject: [PATCH] Trigger view refresh when a WASM procedure commits a transaction --- crates/client-api/src/routes/database.rs | 2 +- crates/core/src/host/host_controller.rs | 4 +- crates/core/src/host/instance_env.rs | 25 ++- crates/core/src/host/module_host.rs | 6 +- crates/core/src/host/v8/mod.rs | 3 + .../src/host/wasm_common/module_host_actor.rs | 123 ++++++----- .../src/host/wasmtime/wasm_instance_env.rs | 203 +++++++++++++++++- .../core/src/host/wasmtime/wasmtime_module.rs | 132 ++++++++---- .../src/locking_tx_datastore/mut_tx.rs | 2 +- .../modules/views-subscribe/src/lib.rs | 13 +- crates/smoketests/tests/views.rs | 34 +++ 11 files changed, 428 insertions(+), 119 deletions(-) diff --git a/crates/client-api/src/routes/database.rs b/crates/client-api/src/routes/database.rs index eab3e0a9262..b39ad36fe25 100644 --- a/crates/client-api/src/routes/database.rs +++ b/crates/client-api/src/routes/database.rs @@ -343,7 +343,7 @@ where let module_def = &module.info.module_def; let response_json = match version { SchemaVersion::V9 => { - let raw = RawModuleDefV9::from(module_def.clone()); + let raw = RawModuleDefV9::from(module_def.as_ref().clone()); axum::Json(sats::serde::SerdeWrapper(raw)).into_response() } }; diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index d03edb4bd6e..58932f5dfb8 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -1282,8 +1282,8 @@ pub(crate) async fn extract_schema_with_pools( Host::try_init_in_memory_to_check(runtimes, page_pool, database, program, core, bsatn_rlb_pool).await?; // this should always succeed, but sometimes it doesn't let module_def = match Arc::try_unwrap(module_info) { - Ok(info) => info.module_def, - Err(info) => info.module_def.clone(), + Ok(info) => Arc::try_unwrap(info.module_def).unwrap_or_else(|module_def| (*module_def).clone()), + Err(info) => (*info.module_def).clone(), }; Ok(module_def) diff --git a/crates/core/src/host/instance_env.rs b/crates/core/src/host/instance_env.rs index a3f5bb9299c..bdb080b3c5f 100644 --- a/crates/core/src/host/instance_env.rs +++ b/crates/core/src/host/instance_env.rs @@ -259,6 +259,11 @@ impl InstanceEnv { self.func_name.as_deref() } + /// Swap in a temporary function type, returning the previous one. + pub fn swap_func_type(&mut self, func_type: FuncCallType) -> FuncCallType { + mem::replace(&mut self.func_type, func_type) + } + fn get_tx(&self) -> Result + '_, GetTxError> { self.tx.get() } @@ -733,10 +738,19 @@ impl InstanceEnv { // on `tokio::runtime::Handle::try_current()` before being able to run the `get_tx()` check. pub fn commit_mutable_tx(&mut self) -> Result<(), NodesError> { + let tx = self.take_mutable_tx_for_commit()?; + self.commit_procedure_tx(tx) + } + + /// Extract an anonymous mutable tx so callers can perform extra work before commit. + pub fn take_mutable_tx_for_commit(&mut self) -> Result { self.finish_anon_tx()?; + Ok(self.take_tx()?) + } + /// Commit an anonymous procedure tx and broadcast resulting updates. + pub fn commit_procedure_tx(&mut self, tx: MutTxId) -> Result<(), NodesError> { let stdb = self.relational_db().clone(); - let tx = self.take_tx()?; let subs = self.replica_ctx.subscriptions.clone(); let event = ModuleEvent { @@ -761,13 +775,16 @@ impl InstanceEnv { pub fn abort_mutable_tx(&mut self) -> Result<(), NodesError> { self.finish_anon_tx()?; - let stdb = self.relational_db().clone(); let tx = self.take_tx()?; + self.rollback_procedure_tx(tx); + Ok(()) + } - // Roll back the tx. + /// Roll back an anonymous procedure tx and record the resulting offset. + pub fn rollback_procedure_tx(&mut self, tx: MutTxId) { + let stdb = self.relational_db().clone(); let offset = ModuleSubscriptions::rollback_mut_tx(&stdb, tx); self.procedure_last_tx_offset = Some(from_tx_offset(offset)); - Ok(()) } /// In-case there is a anonymous tx at the end of a procedure, diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 39552b1385b..d2d943a21d9 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -219,7 +219,7 @@ pub struct ModuleInfo { /// The definition of the module. /// Loaded by loading the module's program from the system tables, extracting its definition, /// and validating. - pub module_def: ModuleDef, + pub module_def: Arc, /// The identity of the module. pub owner_identity: Identity, /// The identity of the database. @@ -292,7 +292,7 @@ impl ModuleInfo { ) -> Arc { let metrics = ModuleMetrics::new(&database_identity); Arc::new(ModuleInfo { - module_def, + module_def: Arc::new(module_def), owner_identity, database_identity, module_hash, @@ -1936,7 +1936,7 @@ impl ModuleHost { table_id, fn_ptr, sender, - } in out.tx.view_for_update().cloned().collect::>() + } in out.tx.views_for_refresh().cloned().collect::>() { let view_def = module_def .get_view_by_id(fn_ptr, sender.is_none()) diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index fdda70832cf..6e58666a248 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -43,6 +43,7 @@ use spacetimedb_datastore::locking_tx_datastore::FuncCallType; use spacetimedb_datastore::traits::Program; use spacetimedb_lib::{ConnectionId, Identity, RawModuleDef, Timestamp}; use spacetimedb_schema::auto_migrate::MigrationPolicy; +use spacetimedb_schema::def::ModuleDef; use spacetimedb_schema::identifier::Identifier; use spacetimedb_table::static_assert_size; use std::panic::AssertUnwindSafe; @@ -853,6 +854,8 @@ impl WasmInstance for V8Instance<'_, '_, '_> { self.scope.get_slot::().unwrap().instance_env.tx.clone() } + fn set_module_def(&mut self, _: Arc) {} + fn call_reducer(&mut self, op: ReducerOp<'_>, budget: FunctionBudget) -> ReducerExecuteResult { common_call(self.scope, budget, op, |scope, op| { Ok(call_call_reducer(scope, self.hooks, op, self.reducer_args_buf)?) diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index d265191b9b8..b45a90ef402 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -82,6 +82,8 @@ pub trait WasmInstance { fn tx_slot(&self) -> TxSlot; + fn set_module_def(&mut self, module_def: Arc); + fn call_reducer(&mut self, op: ReducerOp<'_>, budget: FunctionBudget) -> ReducerExecuteResult; fn call_view(&mut self, op: ViewOp<'_>, budget: FunctionBudget) -> ViewExecuteResult; @@ -114,7 +116,7 @@ impl EnergyStats { } } -fn deserialize_view_rows( +pub(crate) fn deserialize_view_rows( row_type: AlgebraicTypeRef, bytes: Bytes, typespace: &Typespace, @@ -143,6 +145,67 @@ fn deserialize_view_rows( .collect() } +pub(crate) fn run_query_for_view( + tx: &mut MutTxId, + the_query: &str, + expected_row_type: &ProductType, + call_info: &ViewCallInfo, + database_identity: Identity, +) -> anyhow::Result> { + if the_query.trim().is_empty() { + return Ok(Vec::new()); + } + + // Views bypass RLS, since views should enforce their own access control procedurally. + let auth = AuthCtx::for_current(database_identity); + let schema_view = SchemaViewer::new(&*tx, &auth); + + // Compile to subscription plans. + let (plans, has_params) = SubscriptionPlan::compile(the_query, &schema_view, &auth)?; + ensure!( + !has_params, + "parameterized SQL is not supported for view materialization yet" + ); + + // Validate shape and disallow views-on-views. + for plan in &plans { + let phys = plan.optimized_physical_plan(); + let Some(source_schema) = phys.return_table() else { + bail!("query does not return plain table rows"); + }; + if phys.reads_from_view(true) || phys.reads_from_view(false) { + bail!("view definition cannot read from other views"); + } + if source_schema.row_type != *expected_row_type { + bail!( + "query returns `{}` but view expects `{}`", + fmt_algebraic_type(&AlgebraicType::Product(source_schema.row_type.clone())), + fmt_algebraic_type(&AlgebraicType::Product(expected_row_type.clone())), + ); + } + } + + let op = FuncCallType::View(call_info.clone()); + let mut metrics = ExecutionMetrics::default(); + let mut rows = Vec::new(); + + for plan in plans { + // Track read sets for all tables involved in this plan. + // TODO(jsdt): This means we will rerun the view and query for any change to these tables, so we should optimize this asap. + for table_id in plan.table_ids() { + tx.record_table_scan(&op, table_id); + } + + let pipelined = PipelinedProject::from(plan.optimized_physical_plan().clone()); + pipelined.execute(&*tx, &mut metrics, &mut |row| { + rows.push(row.to_product_value()); + Ok(()) + })?; + } + + Ok(rows) +} + pub struct ExecutionTimings { pub total_duration: Duration, pub wasm_instance_env_call_times: CallTimes, @@ -333,8 +396,9 @@ impl WasmModuleHostActor { } impl WasmModuleHostActor { - fn make_from_instance(&self, instance: T::Instance) -> WasmModuleInstance { + fn make_from_instance(&self, mut instance: T::Instance) -> WasmModuleInstance { let common = InstanceCommon::new(&self.common); + instance.set_module_def(common.info().module_def.clone()); WasmModuleInstance { instance, common, @@ -1244,58 +1308,7 @@ impl InstanceCommon { expected_row_type: &ProductType, call_info: &ViewCallInfo, ) -> anyhow::Result> { - if the_query.trim().is_empty() { - return Ok(Vec::new()); - } - - // Views bypass RLS, since views should enforce their own access control procedurally. - let auth = AuthCtx::for_current(self.info.database_identity); - let schema_view = SchemaViewer::new(&*tx, &auth); - - // Compile to subscription plans. - let (plans, has_params) = SubscriptionPlan::compile(the_query, &schema_view, &auth)?; - ensure!( - !has_params, - "parameterized SQL is not supported for view materialization yet" - ); - - // Validate shape and disallow views-on-views. - for plan in &plans { - let phys = plan.optimized_physical_plan(); - let Some(source_schema) = phys.return_table() else { - bail!("query does not return plain table rows"); - }; - if phys.reads_from_view(true) || phys.reads_from_view(false) { - bail!("view definition cannot read from other views"); - } - if source_schema.row_type != *expected_row_type { - bail!( - "query returns `{}` but view expects `{}`", - fmt_algebraic_type(&AlgebraicType::Product(source_schema.row_type.clone())), - fmt_algebraic_type(&AlgebraicType::Product(expected_row_type.clone())), - ); - } - } - - let op = FuncCallType::View(call_info.clone()); - let mut metrics = ExecutionMetrics::default(); - let mut rows = Vec::new(); - - for plan in plans { - // Track read sets for all tables involved in this plan. - // TODO(jsdt): This means we will rerun the view and query for any change to these tables, so we should optimize this asap. - for table_id in plan.table_ids() { - tx.record_table_scan(&op, table_id); - } - - let pipelined = PipelinedProject::from(plan.optimized_physical_plan().clone()); - pipelined.execute(&*tx, &mut metrics, &mut |row| { - rows.push(row.to_product_value()); - Ok(()) - })?; - } - - Ok(rows) + run_query_for_view(tx, the_query, expected_row_type, call_info, self.info.database_identity) } /// A [`MutTxId`] knows which views must be updated (re-evaluated). /// This method re-evaluates them and updates their backing tables. @@ -1308,7 +1321,7 @@ impl InstanceCommon { timestamp: Timestamp, ) -> (ViewCallResult, bool) { let view_calls = tx - .view_for_update() + .views_for_refresh() .map(|info| { let view_def = module_def .get_view_by_id(info.fn_ptr, info.sender.is_none()) diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index 6e648c82e5f..74a57b35e92 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -1,22 +1,30 @@ #![allow(clippy::too_many_arguments)] +use super::wasmtime_module::{ + call_view_export, decode_view_result_sink_code, CallViewAnonType, CallViewType, ViewResultSinkError, +}; use super::{Mem, MemView, NullableMemOp, WasmError, WasmPointee, WasmPtr}; use crate::database_logger::{BacktraceFrame, BacktraceProvider, ModuleBacktrace, Record}; use crate::error::NodesError; use crate::host::instance_env::{ChunkPool, InstanceEnv}; use crate::host::wasm_common::instrumentation::{span, CallTimes}; -use crate::host::wasm_common::module_host_actor::ExecutionTimings; +use crate::host::wasm_common::module_host_actor::{ + deserialize_view_rows, run_query_for_view, ExecutionTimings, ViewResult, ViewReturnData, +}; use crate::host::wasm_common::{err_to_errno_and_log, RowIterIdx, RowIters, TimingSpan, TimingSpanIdx, TimingSpanSet}; use crate::host::AbiCall; use crate::subscription::module_subscription_manager::TransactionOffset; -use anyhow::Context as _; +use anyhow::{anyhow, Context as _}; use spacetimedb_data_structures::map::IntMap; -use spacetimedb_datastore::locking_tx_datastore::FuncCallType; +use spacetimedb_datastore::locking_tx_datastore::{FuncCallType, MutTxId, ViewCallInfo}; use spacetimedb_lib::{bsatn, ConnectionId, Timestamp}; +use spacetimedb_primitives::errno::HOST_CALL_FAILURE; use spacetimedb_primitives::{errno, ColId}; +use spacetimedb_schema::def::ModuleDef; use spacetimedb_schema::identifier::Identifier; use std::future::Future; use std::num::NonZeroU32; +use std::sync::Arc; use std::time::Instant; use wasmtime::{AsContext, Caller, StoreContextMut}; @@ -78,6 +86,15 @@ pub(super) struct WasmInstanceEnv { /// The database `InstanceEnv` associated to this instance. instance_env: InstanceEnv, + /// A validated `ModuleDef` for this instance used by procedures to refresh views. + module_def: Option>, + + /// A cached `__call_view__` export used by procedures to refresh views. + call_view: Option, + + /// A cached `__call_view_anon__` export used by procedures to refresh views. + call_view_anon: Option, + /// The `Mem` associated to this instance. At construction time, /// this is always `None`. The `Mem` instance is extracted from the /// instance exports, and after instantiation is complete, this will @@ -130,6 +147,9 @@ impl WasmInstanceEnv { pub fn new(instance_env: InstanceEnv) -> Self { Self { instance_env, + module_def: None, + call_view: None, + call_view_anon: None, mem: None, bytes_sources: IntMap::default(), next_bytes_source_id: NonZeroU32::new(1).unwrap(), @@ -187,6 +207,15 @@ impl WasmInstanceEnv { self.mem = Some(mem); } + pub fn set_module_def(&mut self, module_def: Arc) { + self.module_def = Some(module_def) + } + + pub fn set_call_view_exports(&mut self, call_view: Option, call_view_anon: Option) { + self.call_view = call_view; + self.call_view_anon = call_view_anon; + } + /// Returns a reference to the memory, assumed to be initialized. pub fn get_mem(&self) -> Mem { self.mem.expect("Initialized memory") @@ -1591,14 +1620,170 @@ impl WasmInstanceEnv { /// This currently does not happen as anonymous read transactions /// are not exposed to modules. pub fn procedure_commit_mut_tx<'caller>(caller: Caller<'caller, Self>) -> RtResult { - Self::with_span(caller, AbiCall::ProcedureCommitMutTransaction, |mut caller| { - let (_, env) = Self::mem_env(&mut caller); - - { - env.instance_env.commit_mutable_tx()?; + Self::with_span(caller, AbiCall::ProcedureCommitMutTransaction, |caller| { + let res: Result = (|| { + let tx = { + let env = caller.data_mut(); + env.instance_env.take_mutable_tx_for_commit().map_err(WasmError::from)? + }; + let tx = Self::refresh_views(caller, tx)?; + caller + .data_mut() + .instance_env + .commit_procedure_tx(tx) + .map_err(WasmError::from)?; Ok(0u16.into()) + })(); + res.or_else(|err| Self::convert_wasm_result(AbiCall::ProcedureCommitMutTransaction, err)) + }) + } + + /// Refresh all views made stale by a procedure `tx`. + /// + /// This runs each pending view call in the same mutable transaction and writes the refreshed rows + /// into the corresponding backing view tables. If any step fails (missing metadata, view execution, + /// row decoding, SQL execution, or materialization), this method rolls back `tx` and returns an error. + /// + /// On success, it returns the same transaction handle so the caller can commit it. + fn refresh_views<'a>(caller: &mut Caller<'a, Self>, tx: MutTxId) -> Result { + let Some(module_def) = caller.data().module_def.clone() else { + caller.data_mut().instance_env.rollback_procedure_tx(tx); + return Err(WasmError::Wasm(anyhow!( + "module definition is unavailable while committing a procedure transaction" + ))); + }; + + let views_for_refresh = tx.views_for_refresh().cloned().collect::>(); + let mut tx = Some(tx); + let mut tx_slot = caller.data().instance_env.tx.clone(); + + for view_call in views_for_refresh { + let res: anyhow::Result<()> = (|| { + let view_def = module_def + .get_view_by_id(view_call.fn_ptr, view_call.sender.is_none()) + .ok_or_else(|| anyhow!("view with fn_ptr `{}` not found", view_call.fn_ptr))?; + + let current_tx = tx.take().expect("procedure tx missing during view refresh"); + let (next_tx, call_result) = + tx_slot.set(current_tx, || Self::call_view(caller, &view_call, &view_def.name)); + tx = Some(next_tx); + let return_data = call_result?; + + let typespace = module_def.typespace(); + let row_product_type = typespace + .resolve(view_def.product_type_ref) + .resolve_refs()? + .into_product() + .map_err(|_| anyhow!("Error resolving row type for view"))?; + + let rows = match ViewResult::from_return_data(return_data)? { + ViewResult::Rows(bytes) => deserialize_view_rows(view_def.product_type_ref, bytes, typespace) + .map_err(|err| anyhow!(err.to_string()))?, + ViewResult::RawSql(query) => run_query_for_view( + tx.as_mut().expect("procedure tx missing while running view query"), + &query, + &row_product_type, + &view_call, + *caller.data().instance_env.database_identity(), + )?, + }; + + let stdb = caller.data().instance_env.relational_db().clone(); + match view_call.sender { + Some(sender) => stdb.materialize_view( + tx.as_mut() + .expect("procedure tx missing while materializing authenticated view"), + view_call.table_id, + sender, + rows, + )?, + None => stdb.materialize_anonymous_view( + tx.as_mut() + .expect("procedure tx missing while materializing anonymous view"), + view_call.table_id, + rows, + )?, + } + + Ok(()) + })(); + + if let Err(err) = res { + let tx = tx.expect("procedure tx missing while rolling back failed view refresh"); + caller.data_mut().instance_env.rollback_procedure_tx(tx); + return Err(WasmError::Wasm(err)); } - .or_else(|err| Self::convert_wasm_result(AbiCall::ProcedureCommitMutTransaction, err)) + } + + Ok(tx.expect("procedure tx missing after view refresh")) + } + + /// Execute a view and return its payload. + /// + /// This helper is used by [`Self::refresh_views`] while a procedure transaction is being committed. + /// It temporarily sets the active function type to the target view for dependency tracking, + /// invokes the cached typed view export, restores the previous function type, and decodes the + /// result sink into [`ViewReturnData`]. + fn call_view<'a>( + caller: &mut Caller<'a, Self>, + view_call: &ViewCallInfo, + view_name: &Identifier, + ) -> anyhow::Result { + // Preserve the procedure's result/error sink so this view does not overwrite it. + let previous_standard_sink = { + let env = caller.data_mut(); + env.standard_bytes_sink.take() + }; + + let prev_func_type = caller + .data_mut() + .instance_env + .swap_func_type(FuncCallType::View(view_call.clone())); + + let call_result = (|| -> anyhow::Result { + let (args_source, result_sink) = { + let env = caller.data_mut(); + let args_source = env.create_bytes_source(bytes::Bytes::new())?; + let result_sink = env.setup_standard_bytes_sink(); + (args_source, result_sink) + }; + + let (call_view, call_view_anon) = { + let env = caller.data(); + (env.call_view.clone(), env.call_view_anon.clone()) + }; + + let code = call_view_export( + &mut *caller, + call_view, + call_view_anon, + view_name, + view_call.fn_ptr.0, + view_call.sender, + args_source.0, + result_sink, + )?; + + Ok(code) + })(); + + caller.data_mut().instance_env.swap_func_type(prev_func_type); + + let result_bytes = { + let env = caller.data_mut(); + // Restore the outer sink of the procedure before propagating any trap/user error from the call. + let result = env.take_standard_bytes_sink(); + env.standard_bytes_sink = previous_standard_sink; + result + }; + let code = call_result?; + + decode_view_result_sink_code(code, result_bytes).map_err(|err| match err { + ViewResultSinkError::User(err) => anyhow!("view call failed: {err}"), + ViewResultSinkError::UnexpectedCode(code) => anyhow!( + "unexpected return code {code} from view call, expected 0, 2, or {failure}", + failure = HOST_CALL_FAILURE.get() + ), }) } diff --git a/crates/core/src/host/wasmtime/wasmtime_module.rs b/crates/core/src/host/wasmtime/wasmtime_module.rs index be4a685b1d8..48ac0fe80e2 100644 --- a/crates/core/src/host/wasmtime/wasmtime_module.rs +++ b/crates/core/src/host/wasmtime/wasmtime_module.rs @@ -19,6 +19,8 @@ use futures_util::FutureExt; use spacetimedb_datastore::locking_tx_datastore::FuncCallType; use spacetimedb_lib::{bsatn, ConnectionId, Identity, RawModuleDef}; use spacetimedb_primitives::errno::HOST_CALL_FAILURE; +use spacetimedb_schema::def::ModuleDef; +use spacetimedb_schema::identifier::Identifier; use wasmtime::{ AsContext, AsContextMut, ExternType, Instance, InstancePre, Linker, Store, TypedFunc, WasmBacktrace, WasmParams, WasmResults, @@ -99,6 +101,11 @@ fn handle_error_sink_code(code: i32, error: Vec) -> Result<(), ExecutionErro handle_result_sink_code(code, error).map(drop) } +pub(super) enum ViewResultSinkError { + User(String), + UnexpectedCode(i32), +} + /// Handle the return code from a function using a result sink. /// /// On success, returns the result bytes. @@ -114,11 +121,18 @@ fn handle_result_sink_code(code: i32, result: Vec) -> Result, Execut /// Handle the return code from a view function using a result sink. /// For views, we treat the return code 2 as a successful return using the header format. fn handle_view_result_sink_code(code: i32, result: Vec) -> Result { + decode_view_result_sink_code(code, result).map_err(|err| match err { + ViewResultSinkError::User(err) => ExecutionError::User(err.into()), + ViewResultSinkError::UnexpectedCode(_) => ExecutionError::Recoverable(anyhow::anyhow!("unknown return code")), + }) +} + +pub(super) fn decode_view_result_sink_code(code: i32, result: Vec) -> Result { match code { 0 => Ok(ViewReturnData::Rows(result.into())), 2 => Ok(ViewReturnData::HeaderFirst(result.into())), - CALL_FAILURE => Err(ExecutionError::User(string_from_utf8_lossy_owned(result).into())), - _ => Err(ExecutionError::Recoverable(anyhow::anyhow!("unknown return code"))), + CALL_FAILURE => Err(ViewResultSinkError::User(string_from_utf8_lossy_owned(result))), + _ => Err(ViewResultSinkError::UnexpectedCode(code)), } } @@ -131,20 +145,63 @@ const CALL_FAILURE: i32 = HOST_CALL_FAILURE.get() as i32; /// However, most of the WASM we execute, incl. reducers and startup functions, should never block/yield. /// Rather than crossing our fingers and trusting, we run [`TypedFunc::call_async`] in [`FutureExt::now_or_never`], /// an "async executor" which invokes [`std::task::Future::poll`] exactly once. -fn call_sync_typed_func( +pub(super) fn call_sync_typed_func( typed_func: &TypedFunc, - store: &mut Store, + mut ctx: impl AsContextMut, args: Args, ) -> anyhow::Result where Args: WasmParams + Sync, Ret: WasmResults + Sync, { - let fut = typed_func.call_async(store, args); + let fut = typed_func.call_async(ctx.as_context_mut(), args); fut.now_or_never() .expect("`call_async` of supposedly synchronous WASM function returned `Poll::Pending`") } +#[allow(clippy::too_many_arguments)] +pub(super) fn call_view_export( + mut ctx: impl AsContextMut, + call_view: Option, + call_view_anon: Option, + view_name: &Identifier, + fn_ptr: u32, + sender: Option, + args_source: u32, + result_sink: u32, +) -> anyhow::Result { + if let Some(sender) = sender { + let [sender_0, sender_1, sender_2, sender_3] = prepare_identity_for_call(sender); + let call_view = call_view.ok_or_else(|| { + anyhow::anyhow!( + "Module defines view {} but does not export `{}`", + view_name, + CALL_VIEW_DUNDER + ) + })?; + + call_sync_typed_func( + &call_view, + ctx.as_context_mut(), + (fn_ptr, sender_0, sender_1, sender_2, sender_3, args_source, result_sink), + ) + } else { + let call_view_anon = call_view_anon.ok_or_else(|| { + anyhow::anyhow!( + "Module defines anonymous view {} but does not export `{}`", + view_name, + CALL_VIEW_ANON_DUNDER, + ) + })?; + + call_sync_typed_func( + &call_view_anon, + ctx.as_context_mut(), + (fn_ptr, args_source, result_sink), + ) + } +} + impl module_host_actor::WasmInstancePre for WasmtimeModule { type Instance = WasmtimeInstance; @@ -209,6 +266,9 @@ impl module_host_actor::WasmInstancePre for WasmtimeModule { let call_procedure = get_call_procedure(&mut store, &instance); let call_view = get_call_view(&mut store, &instance); let call_view_anon = get_call_view_anon(&mut store, &instance); + store + .data_mut() + .set_call_view_exports(call_view.clone(), call_view_anon.clone()); Ok(WasmtimeInstance { store, @@ -308,7 +368,7 @@ type CallReducerType = TypedFunc< >; /// The function signature of `__call_view__` -type CallViewType = TypedFunc< +pub(super) type CallViewType = TypedFunc< ( // ViewId u32, @@ -329,7 +389,7 @@ type CallViewType = TypedFunc< >; /// The function signature of `__call_view_anon__` -type CallViewAnonType = TypedFunc< +pub(super) type CallViewAnonType = TypedFunc< ( // ViewId u32, @@ -381,6 +441,10 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { self.store.data().instance_env().tx.clone() } + fn set_module_def(&mut self, module_def: Arc) { + self.store.data_mut().set_module_def(module_def); + } + #[tracing::instrument(level = "trace", skip_all)] fn call_reducer(&mut self, op: ReducerOp<'_>, budget: FunctionBudget) -> module_host_actor::ReducerExecuteResult { let store = &mut self.store; @@ -431,8 +495,6 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { let store = &mut self.store; prepare_store_for_call(store, budget); - // Prepare sender identity and connection ID, as LITTLE-ENDIAN byte arrays. - let [sender_0, sender_1, sender_2, sender_3] = prepare_identity_for_call(*op.sender); // Prepare arguments to the reducer + the error sink & start timings. let args_bytes = op.args.get_bsatn().clone(); @@ -441,29 +503,15 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { .data_mut() .start_funcall(op.name.clone(), args_bytes, op.timestamp, op.call_type()); - let Some(call_view) = self.call_view.as_ref() else { - return module_host_actor::ViewExecuteResult { - stats: zero_execution_stats(store), - call_result: Err(ExecutionError::Recoverable(anyhow::anyhow!( - "Module defines view {} but does not export `{}`", - op.name, - CALL_VIEW_DUNDER, - ))), - }; - }; - - let call_result = call_sync_typed_func( - call_view, + let call_result = call_view_export( &mut *store, - ( - op.fn_ptr.0, - sender_0, - sender_1, - sender_2, - sender_3, - args_source.0, - errors_sink, - ), + self.call_view.clone(), + self.call_view_anon.clone(), + op.name, + op.fn_ptr.0, + Some(*op.sender), + args_source.0, + errors_sink, ); let (stats, result_bytes) = finish_opcall(store, budget); @@ -491,18 +539,16 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { .data_mut() .start_funcall(op.name.clone(), args_bytes, op.timestamp, op.call_type()); - let Some(call_view_anon) = self.call_view_anon.as_ref() else { - return module_host_actor::ViewExecuteResult { - stats: zero_execution_stats(store), - call_result: Err(ExecutionError::Recoverable(anyhow::anyhow!( - "Module defines anonymous view {} but does not export `{}`", - op.name, - CALL_VIEW_ANON_DUNDER, - ))), - }; - }; - - let call_result = call_sync_typed_func(call_view_anon, &mut *store, (op.fn_ptr.0, args_source.0, errors_sink)); + let call_result = call_view_export( + &mut *store, + self.call_view.clone(), + self.call_view_anon.clone(), + op.name, + op.fn_ptr.0, + None, + args_source.0, + errors_sink, + ); let (stats, result_bytes) = finish_opcall(store, budget); diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index 101a5feb753..14fc0b9c3ea 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -343,7 +343,7 @@ impl MutTxId { } /// Returns the views whose read sets overlaps with this transaction's write set - pub fn view_for_update(&self) -> impl Iterator + '_ { + pub fn views_for_refresh(&self) -> impl Iterator + '_ { // Return early if there are no views. // This is profitable as the method is also called for reducers. if self.committed_state_write_lock.has_no_views_for_table_scans() { diff --git a/crates/smoketests/modules/views-subscribe/src/lib.rs b/crates/smoketests/modules/views-subscribe/src/lib.rs index acb6ef7348a..3f25760da74 100644 --- a/crates/smoketests/modules/views-subscribe/src/lib.rs +++ b/crates/smoketests/modules/views-subscribe/src/lib.rs @@ -1,4 +1,4 @@ -use spacetimedb::{Identity, ReducerContext, Table, ViewContext}; +use spacetimedb::{Identity, ProcedureContext, ReducerContext, Table, ViewContext}; #[spacetimedb::table(name = player_state)] pub struct PlayerState { @@ -20,3 +20,14 @@ pub fn insert_player(ctx: &ReducerContext, name: String) { identity: ctx.sender(), }); } + +#[spacetimedb::procedure] +pub fn insert_player_proc(ctx: &mut ProcedureContext, name: String) { + let sender = ctx.sender(); + ctx.with_tx(|tx| { + tx.db.player_state().insert(PlayerState { + name: name.clone(), + identity: sender, + }); + }); +} diff --git a/crates/smoketests/tests/views.rs b/crates/smoketests/tests/views.rs index 2d5f43917a0..cc30ad7bd5a 100644 --- a/crates/smoketests/tests/views.rs +++ b/crates/smoketests/tests/views.rs @@ -402,3 +402,37 @@ fn test_where_expr_view() { 2 | "BOB" | 20"#, ); } + +#[test] +fn test_procedure_triggers_subscription_updates() { + let test = Smoketest::builder().precompiled_module("views-subscribe").build(); + let sub = test.subscribe_background(&["select * from my_player"], 1).unwrap(); + test.call("insert_player_proc", &["Alice"]).unwrap(); + let events = sub.collect().unwrap(); + + let projection: Vec = events + .into_iter() + .map(|event| { + let deletes = event["my_player"]["deletes"] + .as_array() + .unwrap() + .iter() + .map(|row| json!({"name": row["name"]})) + .collect::>(); + let inserts = event["my_player"]["inserts"] + .as_array() + .unwrap() + .iter() + .map(|row| json!({"name": row["name"]})) + .collect::>(); + json!({"my_player": {"deletes": deletes, "inserts": inserts}}) + }) + .collect(); + + assert_eq!( + serde_json::json!(projection), + serde_json::json!([ + {"my_player": {"deletes": [], "inserts": [{"name": "Alice"}]}} + ]) + ); +}