From b1282c7b5140104242add84748a113a069149d14 Mon Sep 17 00:00:00 2001 From: clockwork-labs-bot Date: Sun, 15 Feb 2026 11:01:46 -0500 Subject: [PATCH 1/2] fix: re-materialize views during procedure commit (re-entrant) When a procedure commits a transaction via procedure_commit_mut_tx, we now check for dirty views (views whose read sets overlap with the transaction's write set) and re-evaluate them re-entrantly before committing. This ensures that views are kept up-to-date when procedures modify data they depend on. Implementation details: - Store call_view/call_view_anon TypedFunc handles and module_def on WasmInstanceEnv for re-entrant access during procedure commits - Factor out evaluate_view_sql as a standalone pub(crate) function from InstanceCommon::run_query_for_view - In procedure_commit_mut_tx, iterate dirty views, put tx back in TxSlot for each view call, call view function re-entrantly via TypedFunc::call_async/now_or_never, process results (BSATN rows or RawSql), and materialize before final commit - Add set_module_def to WasmInstance trait for post-instantiation module definition injection - V8 backend: added TODO for equivalent implementation Fixes #4296 --- crates/core/src/host/instance_env.rs | 4 +- crates/core/src/host/v8/syscall/common.rs | 3 + .../src/host/wasm_common/module_host_actor.rs | 119 +++++---- .../src/host/wasmtime/wasm_instance_env.rs | 228 +++++++++++++++++- .../core/src/host/wasmtime/wasmtime_module.rs | 12 +- 5 files changed, 308 insertions(+), 58 deletions(-) diff --git a/crates/core/src/host/instance_env.rs b/crates/core/src/host/instance_env.rs index a3f5bb9299c..218cbf8b491 100644 --- a/crates/core/src/host/instance_env.rs +++ b/crates/core/src/host/instance_env.rs @@ -52,7 +52,7 @@ pub struct InstanceEnv { /// Are we in an anonymous tx context? in_anon_tx: bool, /// A procedure's last known transaction offset. - procedure_last_tx_offset: Option, + pub(crate) procedure_last_tx_offset: Option, } /// `InstanceEnv` needs to be `Send` because it is created on the host thread @@ -715,7 +715,7 @@ impl InstanceEnv { /// Finishes an anonymous transaction, /// returning `Some(_)` if there was no ongoing one, /// in which case the caller should return early. - fn finish_anon_tx(&mut self) -> Result<(), NodesError> { + pub(crate) fn finish_anon_tx(&mut self) -> Result<(), NodesError> { if self.in_anon_tx { self.in_anon_tx = false; Ok(()) diff --git a/crates/core/src/host/v8/syscall/common.rs b/crates/core/src/host/v8/syscall/common.rs index ee297ab3583..b084f2456ad 100644 --- a/crates/core/src/host/v8/syscall/common.rs +++ b/crates/core/src/host/v8/syscall/common.rs @@ -654,6 +654,9 @@ pub fn procedure_commit_mut_tx( ) -> SysCallResult<()> { let env = get_env(scope)?; + // TODO(#4296): V8 needs equivalent re-entrant view evaluation during procedure commits. + // The wasmtime backend evaluates dirty views re-entrantly before committing. + // V8 currently skips this step and just commits directly. env.instance_env.commit_mutable_tx()?; Ok(()) diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index d265191b9b8..9e777bbd079 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -82,6 +82,10 @@ pub trait WasmInstance { fn tx_slot(&self) -> TxSlot; + /// Set the module definition on this instance. + /// This is needed for re-entrant view evaluation during procedure commits. + fn set_module_def(&mut self, _module_def: Arc) {} + fn call_reducer(&mut self, op: ReducerOp<'_>, budget: FunctionBudget) -> ReducerExecuteResult; fn call_view(&mut self, op: ViewOp<'_>, budget: FunctionBudget) -> ViewExecuteResult; @@ -114,7 +118,7 @@ impl EnergyStats { } } -fn deserialize_view_rows( +pub(crate) fn deserialize_view_rows( row_type: AlgebraicTypeRef, bytes: Bytes, typespace: &Typespace, @@ -333,8 +337,11 @@ impl WasmModuleHostActor { } impl WasmModuleHostActor { - fn make_from_instance(&self, instance: T::Instance) -> WasmModuleInstance { + fn make_from_instance(&self, mut instance: T::Instance) -> WasmModuleInstance { let common = InstanceCommon::new(&self.common); + // Provide the module definition to the instance so that + // re-entrant view evaluation can look up view metadata during procedure commits. + instance.set_module_def(Arc::new(common.info().module_def.clone())); WasmModuleInstance { instance, common, @@ -1244,59 +1251,75 @@ impl InstanceCommon { expected_row_type: &ProductType, call_info: &ViewCallInfo, ) -> anyhow::Result> { - if the_query.trim().is_empty() { - return Ok(Vec::new()); - } - - // Views bypass RLS, since views should enforce their own access control procedurally. - let auth = AuthCtx::for_current(self.info.database_identity); - let schema_view = SchemaViewer::new(&*tx, &auth); - - // Compile to subscription plans. - let (plans, has_params) = SubscriptionPlan::compile(the_query, &schema_view, &auth)?; - ensure!( - !has_params, - "parameterized SQL is not supported for view materialization yet" - ); + evaluate_view_sql(tx, the_query, expected_row_type, call_info, self.info.database_identity) + } +} - // Validate shape and disallow views-on-views. - for plan in &plans { - let phys = plan.optimized_physical_plan(); - let Some(source_schema) = phys.return_table() else { - bail!("query does not return plain table rows"); - }; - if phys.reads_from_view(true) || phys.reads_from_view(false) { - bail!("view definition cannot read from other views"); - } - if source_schema.row_type != *expected_row_type { - bail!( - "query returns `{}` but view expects `{}`", - fmt_algebraic_type(&AlgebraicType::Product(source_schema.row_type.clone())), - fmt_algebraic_type(&AlgebraicType::Product(expected_row_type.clone())), - ); - } +/// Compiles and runs a SQL query that was returned from a view. +/// This is a standalone function extracted from `InstanceCommon::run_query_for_view` +/// so it can be called from re-entrant view evaluation during procedure commits. +pub(crate) fn evaluate_view_sql( + tx: &mut MutTxId, + the_query: &str, + expected_row_type: &ProductType, + call_info: &ViewCallInfo, + database_identity: Identity, +) -> anyhow::Result> { + if the_query.trim().is_empty() { + return Ok(Vec::new()); + } + + // Views bypass RLS, since views should enforce their own access control procedurally. + let auth = AuthCtx::for_current(database_identity); + let schema_view = SchemaViewer::new(&*tx, &auth); + + // Compile to subscription plans. + let (plans, has_params) = SubscriptionPlan::compile(the_query, &schema_view, &auth)?; + ensure!( + !has_params, + "parameterized SQL is not supported for view materialization yet" + ); + + // Validate shape and disallow views-on-views. + for plan in &plans { + let phys = plan.optimized_physical_plan(); + let Some(source_schema) = phys.return_table() else { + bail!("query does not return plain table rows"); + }; + if phys.reads_from_view(true) || phys.reads_from_view(false) { + bail!("view definition cannot read from other views"); } + if source_schema.row_type != *expected_row_type { + bail!( + "query returns `{}` but view expects `{}`", + fmt_algebraic_type(&AlgebraicType::Product(source_schema.row_type.clone())), + fmt_algebraic_type(&AlgebraicType::Product(expected_row_type.clone())), + ); + } + } - let op = FuncCallType::View(call_info.clone()); - let mut metrics = ExecutionMetrics::default(); - let mut rows = Vec::new(); - - for plan in plans { - // Track read sets for all tables involved in this plan. - // TODO(jsdt): This means we will rerun the view and query for any change to these tables, so we should optimize this asap. - for table_id in plan.table_ids() { - tx.record_table_scan(&op, table_id); - } + let op = FuncCallType::View(call_info.clone()); + let mut metrics = ExecutionMetrics::default(); + let mut rows = Vec::new(); - let pipelined = PipelinedProject::from(plan.optimized_physical_plan().clone()); - pipelined.execute(&*tx, &mut metrics, &mut |row| { - rows.push(row.to_product_value()); - Ok(()) - })?; + for plan in plans { + // Track read sets for all tables involved in this plan. + // TODO(jsdt): This means we will rerun the view and query for any change to these tables, so we should optimize this asap. + for table_id in plan.table_ids() { + tx.record_table_scan(&op, table_id); } - Ok(rows) + let pipelined = PipelinedProject::from(plan.optimized_physical_plan().clone()); + pipelined.execute(&*tx, &mut metrics, &mut |row| { + rows.push(row.to_product_value()); + Ok(()) + })?; } + + Ok(rows) +} + +impl InstanceCommon { /// A [`MutTxId`] knows which views must be updated (re-evaluated). /// This method re-evaluates them and updates their backing tables. pub(crate) fn call_views_with_tx( diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index 6e648c82e5f..e07ace52a0f 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -14,11 +14,19 @@ use spacetimedb_data_structures::map::IntMap; use spacetimedb_datastore::locking_tx_datastore::FuncCallType; use spacetimedb_lib::{bsatn, ConnectionId, Timestamp}; use spacetimedb_primitives::{errno, ColId}; +use spacetimedb_schema::def::ModuleDef; use spacetimedb_schema::identifier::Identifier; use std::future::Future; use std::num::NonZeroU32; +use std::sync::Arc; use std::time::Instant; -use wasmtime::{AsContext, Caller, StoreContextMut}; +use wasmtime::{AsContext, Caller, StoreContextMut, TypedFunc}; + +/// The function signature of `__call_view__` (re-exported from wasmtime_module for re-entrant calls). +pub(super) type CallViewType = TypedFunc<(u32, u64, u64, u64, u64, u32, u32), i32>; + +/// The function signature of `__call_view_anon__` (re-exported from wasmtime_module for re-entrant calls). +pub(super) type CallViewAnonType = TypedFunc<(u32, u32, u32), i32>; /// A stream of bytes which the WASM module can read from /// using [`WasmInstanceEnv::bytes_source_read`]. @@ -116,6 +124,15 @@ pub(super) struct WasmInstanceEnv { /// A pool of unused allocated chunks that can be reused. // TODO(Centril): consider using this pool for `console_timer_start` and `bytes_sink_write`. chunk_pool: ChunkPool, + + /// Cached handle to `__call_view__` for re-entrant view calls during procedure commits. + pub(super) call_view: Option, + + /// Cached handle to `__call_view_anon__` for re-entrant anonymous view calls during procedure commits. + pub(super) call_view_anon: Option, + + /// The module definition, needed to look up view metadata during re-entrant view evaluation. + pub(super) module_def: Option>, } const STANDARD_BYTES_SINK: u32 = 1; @@ -138,6 +155,9 @@ impl WasmInstanceEnv { timing_spans: Default::default(), call_times: CallTimes::new(), chunk_pool: <_>::default(), + call_view: None, + call_view_anon: None, + module_def: None, } } @@ -1592,14 +1612,208 @@ impl WasmInstanceEnv { /// are not exposed to modules. pub fn procedure_commit_mut_tx<'caller>(caller: Caller<'caller, Self>) -> RtResult { Self::with_span(caller, AbiCall::ProcedureCommitMutTransaction, |mut caller| { - let (_, env) = Self::mem_env(&mut caller); + Self::procedure_commit_mut_tx_inner(&mut caller) + .map_err(WasmError::Db) + .or_else(|err| Self::convert_wasm_result(AbiCall::ProcedureCommitMutTransaction, err)) + }) + } + + /// Inner implementation of procedure_commit_mut_tx that evaluates dirty views + /// re-entrantly before committing the transaction. + fn procedure_commit_mut_tx_inner(caller: &mut Caller<'_, Self>) -> Result { + use crate::host::module_host::{DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall}; + use crate::host::wasm_common::module_host_actor::ViewReturnData; + use crate::subscription::module_subscription_actor::commit_and_broadcast_event; + use futures_util::FutureExt as _; + use spacetimedb_client_api_messages::energy::EnergyQuanta; + use spacetimedb_datastore::locking_tx_datastore::ViewCallInfo; + use spacetimedb_lib::Timestamp; + use std::time::Duration; + + let env = caller.data_mut(); + + // Finish the anonymous tx context (validates we're in one). + env.instance_env.finish_anon_tx()?; + + // Take the transaction from the slot. + let tx = env.instance_env.take_tx()?; + + // Collect the set of dirty views that need re-evaluation. + let dirty_views: Vec = tx.view_for_update().cloned().collect(); + + // Get module_def and database identity for view evaluation. + let module_def = env.module_def.clone(); + let replica_ctx = env.instance_env.replica_ctx.clone(); + let stdb = replica_ctx.relational_db.clone(); + let database_identity = *env.instance_env.database_identity(); + let call_view_func = env.call_view.clone(); + let call_view_anon_func = env.call_view_anon.clone(); + let mut tx_slot = env.instance_env.tx.clone(); + let _timestamp = env.instance_env.start_time; + + // If there are no dirty views, skip re-entrant evaluation. + let tx = if dirty_views.is_empty() || module_def.is_none() { + tx + } else { + let module_def = module_def.as_ref().unwrap(); + let mut current_tx = tx; + + for view_info in &dirty_views { + let is_anonymous = view_info.sender.is_none(); + let view_def = module_def + .get_view_by_id(view_info.fn_ptr, is_anonymous) + .unwrap_or_else(|| { + panic!( + "view with fn_ptr `{}` (anonymous={}) not found in module_def", + view_info.fn_ptr, is_anonymous + ) + }); + + let view_name = view_def.name.clone(); + let row_type = view_def.product_type_ref; + + // Put the tx back in the slot so the view function can access it. + let call_view_ref = &call_view_func; + let call_view_anon_ref = &call_view_anon_func; + let (mut returned_tx, call_result) = tx_slot.set(current_tx, || { + // Set up bytes source (empty args) and result sink. + let args_bytes = bytes::Bytes::new(); // Views take no args. + + let env = caller.data_mut(); + let args_source = env.create_bytes_source(args_bytes).unwrap_or(BytesSourceId::INVALID); + let result_sink = env.setup_standard_bytes_sink(); + + // Call the view function re-entrantly. + let code = if let Some(sender) = &view_info.sender { + // Non-anonymous view: use call_view with sender identity. + let Some(call_view) = call_view_ref else { + log::error!("No __call_view__ export but view `{view_name}` needs evaluation"); + return Err(anyhow::anyhow!("No __call_view__ export")); + }; + let [s0, s1, s2, s3] = super::wasmtime_module::prepare_identity_for_call(*sender); + call_view + .call_async( + &mut *caller, + (view_info.fn_ptr.0, s0, s1, s2, s3, args_source.0, result_sink), + ) + .now_or_never() + .expect("view call should not yield") + } else { + // Anonymous view: use call_view_anon. + let Some(call_view_anon) = call_view_anon_ref else { + log::error!( + "No __call_view_anon__ export but anonymous view `{view_name}` needs evaluation" + ); + return Err(anyhow::anyhow!("No __call_view_anon__ export")); + }; + call_view_anon + .call_async(&mut *caller, (view_info.fn_ptr.0, args_source.0, result_sink)) + .now_or_never() + .expect("anonymous view call should not yield") + }; + + // Get the result bytes. + let result_bytes = caller.data_mut().take_standard_bytes_sink(); + + // Process the return code. + let code = match code { + Ok(c) => c, + Err(e) => return Err(e), + }; + + // Parse the view result. + let view_return_data = match code { + 0 => ViewReturnData::Rows(result_bytes.into()), + 2 => ViewReturnData::HeaderFirst(result_bytes.into()), + _ => { + return Err(anyhow::anyhow!("view `{view_name}` returned unexpected code {code}")); + } + }; + + Ok(view_return_data) + }); + + // Process the view result and materialize rows. + if let Err(e) = Self::process_view_result_and_materialize( + call_result, + &view_name, + row_type, + view_info, + module_def, + &stdb, + &mut returned_tx, + database_identity, + ) { + log::error!("Error processing view `{view_name}` during procedure commit: {e:?}"); + } - { - env.instance_env.commit_mutable_tx()?; - Ok(0u16.into()) + current_tx = returned_tx; } - .or_else(|err| Self::convert_wasm_result(AbiCall::ProcedureCommitMutTransaction, err)) - }) + + current_tx + }; + + // Now commit the transaction (with any view materializations included). + let subs = replica_ctx.subscriptions.clone(); + let event = ModuleEvent { + timestamp: Timestamp::now(), + caller_identity: stdb.database_identity(), + caller_connection_id: None, + function_call: ModuleFunctionCall::default(), + status: EventStatus::Committed(DatabaseUpdate::default()), + reducer_return_value: None, + request_id: None, + timer: None, + energy_quanta_used: EnergyQuanta { quanta: 0 }, + host_execution_duration: Duration::from_millis(0), + }; + let event = commit_and_broadcast_event(&subs, None, event, tx); + caller.data_mut().instance_env.procedure_last_tx_offset = Some(event.tx_offset); + + Ok(0u16.into()) + } + + /// Process a view's return data: deserialize rows or execute SQL, then materialize. + #[allow(clippy::too_many_arguments)] + fn process_view_result_and_materialize( + call_result: Result, + _view_name: &spacetimedb_schema::identifier::Identifier, + row_type: spacetimedb_sats::AlgebraicTypeRef, + view_info: &spacetimedb_datastore::locking_tx_datastore::ViewCallInfo, + module_def: &ModuleDef, + stdb: &crate::db::relational_db::RelationalDB, + tx: &mut spacetimedb_datastore::locking_tx_datastore::MutTxId, + database_identity: spacetimedb_lib::Identity, + ) -> anyhow::Result<()> { + use crate::host::wasm_common::module_host_actor::{evaluate_view_sql, ViewResult}; + + let view_return_data = call_result?; + let view_result = ViewResult::from_return_data(view_return_data)?; + let typespace = module_def.typespace(); + + let row_product_type = typespace + .resolve(row_type) + .resolve_refs() + .map_err(|e| anyhow::anyhow!("Error resolving row type: {e}"))? + .into_product() + .map_err(|_| anyhow::anyhow!("Row type is not a product type"))?; + + let rows = match view_result { + ViewResult::Rows(bytes) => { + crate::host::wasm_common::module_host_actor::deserialize_view_rows(row_type, bytes, typespace) + .map_err(|e| anyhow::anyhow!(e))? + } + ViewResult::RawSql(query) => { + evaluate_view_sql(tx, &query, &row_product_type, view_info, database_identity)? + } + }; + + match &view_info.sender { + Some(sender) => stdb.materialize_view(tx, view_info.table_id, *sender, rows)?, + None => stdb.materialize_anonymous_view(tx, view_info.table_id, rows)?, + }; + + Ok(()) } /// Aborts a mutable transaction, diff --git a/crates/core/src/host/wasmtime/wasmtime_module.rs b/crates/core/src/host/wasmtime/wasmtime_module.rs index be4a685b1d8..8fd8783e5d2 100644 --- a/crates/core/src/host/wasmtime/wasmtime_module.rs +++ b/crates/core/src/host/wasmtime/wasmtime_module.rs @@ -210,6 +210,12 @@ impl module_host_actor::WasmInstancePre for WasmtimeModule { let call_view = get_call_view(&mut store, &instance); let call_view_anon = get_call_view_anon(&mut store, &instance); + // Store copies of view function handles in the WasmInstanceEnv + // so that procedure_commit_mut_tx can make re-entrant view calls. + // TypedFunc is Copy, so this is cheap. + store.data_mut().call_view = call_view.clone(); + store.data_mut().call_view_anon = call_view_anon.clone(); + Ok(WasmtimeInstance { store, instance, @@ -351,6 +357,10 @@ pub struct WasmtimeInstance { } impl module_host_actor::WasmInstance for WasmtimeInstance { + fn set_module_def(&mut self, module_def: Arc) { + self.store.data_mut().module_def = Some(module_def); + } + fn extract_descriptions(&mut self) -> Result { let describer_func_name = DESCRIBE_MODULE_DUNDER; @@ -617,7 +627,7 @@ fn prepare_store_for_call(store: &mut Store, budget: FunctionBu /// # let identity = Identity::ZERO; /// let [sender_0, sender_1, sender_2, sender_3] = prepare_identity_for_call(identity); /// ``` -fn prepare_identity_for_call(caller_identity: Identity) -> [u64; 4] { +pub(super) fn prepare_identity_for_call(caller_identity: Identity) -> [u64; 4] { // Encode this as a LITTLE-ENDIAN byte array bytemuck::must_cast(caller_identity.to_byte_array()) } From 8bb1334cc0d91c6ef4f0ab8d4dbf72e5c13884d7 Mon Sep 17 00:00:00 2001 From: clockwork-labs-bot Date: Sun, 15 Feb 2026 12:03:50 -0500 Subject: [PATCH 2/2] fix: implement re-entrant view evaluation for V8 procedure commits Mirrors the wasmtime re-entrant view evaluation in the V8 backend. During procedure_commit_mut_tx in V8: - Check tx.view_for_update() for dirty views - Get hooks via get_registered_hooks(scope) - For each dirty view, call call_call_view/call_call_view_anon re-entrantly through the V8 scope - Process results (Rows or RawSql) and materialize backing tables - Commit the transaction with view updates included Also adds module_def to JsInstanceEnv (set via set_module_def on the WasmInstance trait) so view definitions are available during the syscall. Part of #4296 --- crates/core/src/host/v8/mod.rs | 9 ++ crates/core/src/host/v8/syscall/common.rs | 180 +++++++++++++++++++++- 2 files changed, 185 insertions(+), 4 deletions(-) diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index fdda70832cf..5a08002371c 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -43,6 +43,7 @@ use spacetimedb_datastore::locking_tx_datastore::FuncCallType; use spacetimedb_datastore::traits::Program; use spacetimedb_lib::{ConnectionId, Identity, RawModuleDef, Timestamp}; use spacetimedb_schema::auto_migrate::MigrationPolicy; +use spacetimedb_schema::def::ModuleDef; use spacetimedb_schema::identifier::Identifier; use spacetimedb_table::static_assert_size; use std::panic::AssertUnwindSafe; @@ -225,6 +226,9 @@ struct JsInstanceEnv { /// A pool of unused allocated chunks that can be reused. // TODO(Centril): consider using this pool for `console_timer_start` and `bytes_sink_write`. chunk_pool: ChunkPool, + + /// Module definition for view evaluation during procedure commits. + module_def: Option>, } impl JsInstanceEnv { @@ -236,6 +240,7 @@ impl JsInstanceEnv { iters: <_>::default(), chunk_pool: <_>::default(), timing_spans: <_>::default(), + module_def: None, } } @@ -895,6 +900,10 @@ impl WasmInstance for V8Instance<'_, '_, '_> { .take_procedure_tx_offset(); (result, tx_offset) } + + fn set_module_def(&mut self, module_def: Arc) { + env_on_isolate_unwrap(self.scope).module_def = Some(module_def); + } } fn common_call<'scope, R, O, F>( diff --git a/crates/core/src/host/v8/syscall/common.rs b/crates/core/src/host/v8/syscall/common.rs index b084f2456ad..da8504bde0e 100644 --- a/crates/core/src/host/v8/syscall/common.rs +++ b/crates/core/src/host/v8/syscall/common.rs @@ -652,12 +652,184 @@ pub fn procedure_commit_mut_tx( scope: &mut PinScope<'_, '_>, _args: FunctionCallbackArguments<'_>, ) -> SysCallResult<()> { + use crate::host::module_host::{DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall}; + use crate::subscription::module_subscription_actor::commit_and_broadcast_event; + use spacetimedb_client_api_messages::energy::EnergyQuanta; + use spacetimedb_datastore::locking_tx_datastore::ViewCallInfo; + use std::time::Duration; + + let env = get_env(scope)?; + + // Finish the anonymous tx context. + env.instance_env.finish_anon_tx()?; + + // Take the transaction from the slot. + let tx = env.instance_env.take_tx().map_err(crate::error::NodesError::from)?; + + // Collect dirty views. + let dirty_views: Vec = tx.view_for_update().cloned().collect(); + + let module_def = env.module_def.clone(); + let replica_ctx = env.instance_env.replica_ctx.clone(); + let stdb = replica_ctx.relational_db.clone(); + let database_identity = *env.instance_env.database_identity(); + let mut tx_slot = env.instance_env.tx.clone(); + + // Evaluate dirty views re-entrantly before committing. + let tx = if dirty_views.is_empty() || module_def.is_none() { + tx + } else { + let module_def = module_def.as_ref().unwrap(); + + // Get hooks for calling view functions. + let hooks = super::get_registered_hooks(scope); + + let mut current_tx = tx; + + for view_info in &dirty_views { + let is_anonymous = view_info.sender.is_none(); + let view_def = match module_def.get_view_by_id(view_info.fn_ptr, is_anonymous) { + Some(def) => def, + None => { + log::error!( + "view with fn_ptr `{}` (anonymous={}) not found in module_def", + view_info.fn_ptr, + is_anonymous + ); + continue; + } + }; + + let view_name = view_def.name.clone(); + let row_type = view_def.product_type_ref; + + // Put the tx back in the slot so the view function can access it. + let hooks_ref = &hooks; + let (mut returned_tx, call_result) = tx_slot.set(current_tx, || { + let Some(hooks) = hooks_ref else { + return Err(anyhow::anyhow!( + "No hooks registered, cannot evaluate view `{view_name}`" + )); + }; + + // Call the view function re-entrantly. + let view_return = if let Some(sender) = &view_info.sender { + super::call_call_view( + scope, + hooks, + crate::host::wasm_common::module_host_actor::ViewOp { + name: &view_name, + view_id: view_info.view_id, + table_id: view_info.table_id, + fn_ptr: view_info.fn_ptr, + sender, + args: &crate::host::ArgsTuple::nullary(), + timestamp: Timestamp::now(), + }, + ) + } else { + super::call_call_view_anon( + scope, + hooks, + crate::host::wasm_common::module_host_actor::AnonymousViewOp { + name: &view_name, + view_id: view_info.view_id, + table_id: view_info.table_id, + fn_ptr: view_info.fn_ptr, + args: &crate::host::ArgsTuple::nullary(), + timestamp: Timestamp::now(), + }, + ) + }; + + match view_return { + Ok(data) => Ok(data), + Err(e) => Err(anyhow::anyhow!("view `{view_name}` call failed: {e:?}")), + } + }); + + // Process the view result and materialize rows. + match call_result { + Ok(view_return_data) => { + if let Err(e) = process_v8_view_result( + view_return_data, + row_type, + view_info, + module_def, + &stdb, + &mut returned_tx, + database_identity, + ) { + log::error!("Error processing view `{view_name}` during procedure commit: {e:?}"); + } + } + Err(e) => { + log::error!("Error calling view `{view_name}` during procedure commit: {e:?}"); + } + } + + current_tx = returned_tx; + } + + current_tx + }; + + // Commit the transaction (with view materializations included). + let subs = replica_ctx.subscriptions.clone(); + let event = ModuleEvent { + timestamp: Timestamp::now(), + caller_identity: stdb.database_identity(), + caller_connection_id: None, + function_call: ModuleFunctionCall::default(), + status: EventStatus::Committed(DatabaseUpdate::default()), + reducer_return_value: None, + request_id: None, + timer: None, + energy_quanta_used: EnergyQuanta { quanta: 0 }, + host_execution_duration: Duration::from_millis(0), + }; + let event = commit_and_broadcast_event(&subs, None, event, tx); + let env = get_env(scope)?; + env.instance_env.procedure_last_tx_offset = Some(event.tx_offset); - // TODO(#4296): V8 needs equivalent re-entrant view evaluation during procedure commits. - // The wasmtime backend evaluates dirty views re-entrantly before committing. - // V8 currently skips this step and just commits directly. - env.instance_env.commit_mutable_tx()?; + Ok(()) +} + +/// Process a view's return data for V8: deserialize rows or execute SQL, then materialize. +fn process_v8_view_result( + view_return_data: crate::host::wasm_common::module_host_actor::ViewReturnData, + row_type: spacetimedb_sats::AlgebraicTypeRef, + view_info: &spacetimedb_datastore::locking_tx_datastore::ViewCallInfo, + module_def: &spacetimedb_schema::def::ModuleDef, + stdb: &crate::db::relational_db::RelationalDB, + tx: &mut spacetimedb_datastore::locking_tx_datastore::MutTxId, + database_identity: Identity, +) -> anyhow::Result<()> { + use crate::host::wasm_common::module_host_actor::{evaluate_view_sql, ViewResult}; + + let view_result = ViewResult::from_return_data(view_return_data)?; + let typespace = module_def.typespace(); + + let row_product_type = typespace + .resolve(row_type) + .resolve_refs() + .map_err(|e| anyhow::anyhow!("Error resolving row type: {e}"))? + .into_product() + .map_err(|_| anyhow::anyhow!("Row type is not a product type"))?; + + let rows = match view_result { + ViewResult::Rows(bytes) => { + crate::host::wasm_common::module_host_actor::deserialize_view_rows(row_type, bytes, typespace) + .map_err(|e| anyhow::anyhow!(e))? + } + ViewResult::RawSql(query) => evaluate_view_sql(tx, &query, &row_product_type, view_info, database_identity)?, + }; + + match &view_info.sender { + Some(sender) => stdb.materialize_view(tx, view_info.table_id, *sender, rows)?, + None => stdb.materialize_anonymous_view(tx, view_info.table_id, rows)?, + }; Ok(()) }