Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/core/src/host/instance_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub struct InstanceEnv {
/// Are we in an anonymous tx context?
in_anon_tx: bool,
/// A procedure's last known transaction offset.
procedure_last_tx_offset: Option<TransactionOffset>,
pub(crate) procedure_last_tx_offset: Option<TransactionOffset>,
}

/// `InstanceEnv` needs to be `Send` because it is created on the host thread
Expand Down Expand Up @@ -715,7 +715,7 @@ impl InstanceEnv {
/// Finishes an anonymous transaction,
/// returning `Some(_)` if there was no ongoing one,
/// in which case the caller should return early.
fn finish_anon_tx(&mut self) -> Result<(), NodesError> {
pub(crate) fn finish_anon_tx(&mut self) -> Result<(), NodesError> {
if self.in_anon_tx {
self.in_anon_tx = false;
Ok(())
Expand Down
9 changes: 9 additions & 0 deletions crates/core/src/host/v8/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use spacetimedb_datastore::locking_tx_datastore::FuncCallType;
use spacetimedb_datastore::traits::Program;
use spacetimedb_lib::{ConnectionId, Identity, RawModuleDef, Timestamp};
use spacetimedb_schema::auto_migrate::MigrationPolicy;
use spacetimedb_schema::def::ModuleDef;
use spacetimedb_schema::identifier::Identifier;
use spacetimedb_table::static_assert_size;
use std::panic::AssertUnwindSafe;
Expand Down Expand Up @@ -225,6 +226,9 @@ struct JsInstanceEnv {
/// A pool of unused allocated chunks that can be reused.
// TODO(Centril): consider using this pool for `console_timer_start` and `bytes_sink_write`.
chunk_pool: ChunkPool,

/// Module definition for view evaluation during procedure commits.
module_def: Option<Arc<ModuleDef>>,
}

impl JsInstanceEnv {
Expand All @@ -236,6 +240,7 @@ impl JsInstanceEnv {
iters: <_>::default(),
chunk_pool: <_>::default(),
timing_spans: <_>::default(),
module_def: None,
}
}

Expand Down Expand Up @@ -895,6 +900,10 @@ impl WasmInstance for V8Instance<'_, '_, '_> {
.take_procedure_tx_offset();
(result, tx_offset)
}

fn set_module_def(&mut self, module_def: Arc<ModuleDef>) {
env_on_isolate_unwrap(self.scope).module_def = Some(module_def);
}
}

fn common_call<'scope, R, O, F>(
Expand Down
177 changes: 176 additions & 1 deletion crates/core/src/host/v8/syscall/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -652,9 +652,184 @@ pub fn procedure_commit_mut_tx(
scope: &mut PinScope<'_, '_>,
_args: FunctionCallbackArguments<'_>,
) -> SysCallResult<()> {
use crate::host::module_host::{DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall};
use crate::subscription::module_subscription_actor::commit_and_broadcast_event;
use spacetimedb_client_api_messages::energy::EnergyQuanta;
use spacetimedb_datastore::locking_tx_datastore::ViewCallInfo;
use std::time::Duration;

let env = get_env(scope)?;

// Finish the anonymous tx context.
env.instance_env.finish_anon_tx()?;

// Take the transaction from the slot.
let tx = env.instance_env.take_tx().map_err(crate::error::NodesError::from)?;

// Collect dirty views.
let dirty_views: Vec<ViewCallInfo> = tx.view_for_update().cloned().collect();

let module_def = env.module_def.clone();
let replica_ctx = env.instance_env.replica_ctx.clone();
let stdb = replica_ctx.relational_db.clone();
let database_identity = *env.instance_env.database_identity();
let mut tx_slot = env.instance_env.tx.clone();

// Evaluate dirty views re-entrantly before committing.
let tx = if dirty_views.is_empty() || module_def.is_none() {
tx
} else {
let module_def = module_def.as_ref().unwrap();

// Get hooks for calling view functions.
let hooks = super::get_registered_hooks(scope);

let mut current_tx = tx;

for view_info in &dirty_views {
let is_anonymous = view_info.sender.is_none();
let view_def = match module_def.get_view_by_id(view_info.fn_ptr, is_anonymous) {
Some(def) => def,
None => {
log::error!(
"view with fn_ptr `{}` (anonymous={}) not found in module_def",
view_info.fn_ptr,
is_anonymous
);
continue;
}
};

let view_name = view_def.name.clone();
let row_type = view_def.product_type_ref;

// Put the tx back in the slot so the view function can access it.
let hooks_ref = &hooks;
let (mut returned_tx, call_result) = tx_slot.set(current_tx, || {
let Some(hooks) = hooks_ref else {
return Err(anyhow::anyhow!(
"No hooks registered, cannot evaluate view `{view_name}`"
));
};

// Call the view function re-entrantly.
let view_return = if let Some(sender) = &view_info.sender {
super::call_call_view(
scope,
hooks,
crate::host::wasm_common::module_host_actor::ViewOp {
name: &view_name,
view_id: view_info.view_id,
table_id: view_info.table_id,
fn_ptr: view_info.fn_ptr,
sender,
args: &crate::host::ArgsTuple::nullary(),
timestamp: Timestamp::now(),
},
)
} else {
super::call_call_view_anon(
scope,
hooks,
crate::host::wasm_common::module_host_actor::AnonymousViewOp {
name: &view_name,
view_id: view_info.view_id,
table_id: view_info.table_id,
fn_ptr: view_info.fn_ptr,
args: &crate::host::ArgsTuple::nullary(),
timestamp: Timestamp::now(),
},
)
};

match view_return {
Ok(data) => Ok(data),
Err(e) => Err(anyhow::anyhow!("view `{view_name}` call failed: {e:?}")),
}
});

// Process the view result and materialize rows.
match call_result {
Ok(view_return_data) => {
if let Err(e) = process_v8_view_result(
view_return_data,
row_type,
view_info,
module_def,
&stdb,
&mut returned_tx,
database_identity,
) {
log::error!("Error processing view `{view_name}` during procedure commit: {e:?}");
}
}
Err(e) => {
log::error!("Error calling view `{view_name}` during procedure commit: {e:?}");
}
}

current_tx = returned_tx;
}

current_tx
};

// Commit the transaction (with view materializations included).
let subs = replica_ctx.subscriptions.clone();
let event = ModuleEvent {
timestamp: Timestamp::now(),
caller_identity: stdb.database_identity(),
caller_connection_id: None,
function_call: ModuleFunctionCall::default(),
status: EventStatus::Committed(DatabaseUpdate::default()),
reducer_return_value: None,
request_id: None,
timer: None,
energy_quanta_used: EnergyQuanta { quanta: 0 },
host_execution_duration: Duration::from_millis(0),
};
let event = commit_and_broadcast_event(&subs, None, event, tx);

let env = get_env(scope)?;
env.instance_env.procedure_last_tx_offset = Some(event.tx_offset);

env.instance_env.commit_mutable_tx()?;
Ok(())
}

/// Process a view's return data for V8: deserialize rows or execute SQL, then materialize.
fn process_v8_view_result(
view_return_data: crate::host::wasm_common::module_host_actor::ViewReturnData,
row_type: spacetimedb_sats::AlgebraicTypeRef,
view_info: &spacetimedb_datastore::locking_tx_datastore::ViewCallInfo,
module_def: &spacetimedb_schema::def::ModuleDef,
stdb: &crate::db::relational_db::RelationalDB,
tx: &mut spacetimedb_datastore::locking_tx_datastore::MutTxId,
database_identity: Identity,
) -> anyhow::Result<()> {
use crate::host::wasm_common::module_host_actor::{evaluate_view_sql, ViewResult};

let view_result = ViewResult::from_return_data(view_return_data)?;
let typespace = module_def.typespace();

let row_product_type = typespace
.resolve(row_type)
.resolve_refs()
.map_err(|e| anyhow::anyhow!("Error resolving row type: {e}"))?
.into_product()
.map_err(|_| anyhow::anyhow!("Row type is not a product type"))?;

let rows = match view_result {
ViewResult::Rows(bytes) => {
crate::host::wasm_common::module_host_actor::deserialize_view_rows(row_type, bytes, typespace)
.map_err(|e| anyhow::anyhow!(e))?
}
ViewResult::RawSql(query) => evaluate_view_sql(tx, &query, &row_product_type, view_info, database_identity)?,
};

match &view_info.sender {
Some(sender) => stdb.materialize_view(tx, view_info.table_id, *sender, rows)?,
None => stdb.materialize_anonymous_view(tx, view_info.table_id, rows)?,
};

Ok(())
}
119 changes: 71 additions & 48 deletions crates/core/src/host/wasm_common/module_host_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ pub trait WasmInstance {

fn tx_slot(&self) -> TxSlot;

/// Set the module definition on this instance.
/// This is needed for re-entrant view evaluation during procedure commits.
fn set_module_def(&mut self, _module_def: Arc<ModuleDef>) {}

fn call_reducer(&mut self, op: ReducerOp<'_>, budget: FunctionBudget) -> ReducerExecuteResult;

fn call_view(&mut self, op: ViewOp<'_>, budget: FunctionBudget) -> ViewExecuteResult;
Expand Down Expand Up @@ -114,7 +118,7 @@ impl EnergyStats {
}
}

fn deserialize_view_rows(
pub(crate) fn deserialize_view_rows(
row_type: AlgebraicTypeRef,
bytes: Bytes,
typespace: &Typespace,
Expand Down Expand Up @@ -333,8 +337,11 @@ impl<T: WasmModule> WasmModuleHostActor<T> {
}

impl<T: WasmModule> WasmModuleHostActor<T> {
fn make_from_instance(&self, instance: T::Instance) -> WasmModuleInstance<T::Instance> {
fn make_from_instance(&self, mut instance: T::Instance) -> WasmModuleInstance<T::Instance> {
let common = InstanceCommon::new(&self.common);
// Provide the module definition to the instance so that
// re-entrant view evaluation can look up view metadata during procedure commits.
instance.set_module_def(Arc::new(common.info().module_def.clone()));
WasmModuleInstance {
instance,
common,
Expand Down Expand Up @@ -1244,59 +1251,75 @@ impl InstanceCommon {
expected_row_type: &ProductType,
call_info: &ViewCallInfo,
) -> anyhow::Result<Vec<ProductValue>> {
if the_query.trim().is_empty() {
return Ok(Vec::new());
}

// Views bypass RLS, since views should enforce their own access control procedurally.
let auth = AuthCtx::for_current(self.info.database_identity);
let schema_view = SchemaViewer::new(&*tx, &auth);

// Compile to subscription plans.
let (plans, has_params) = SubscriptionPlan::compile(the_query, &schema_view, &auth)?;
ensure!(
!has_params,
"parameterized SQL is not supported for view materialization yet"
);
evaluate_view_sql(tx, the_query, expected_row_type, call_info, self.info.database_identity)
}
}

// Validate shape and disallow views-on-views.
for plan in &plans {
let phys = plan.optimized_physical_plan();
let Some(source_schema) = phys.return_table() else {
bail!("query does not return plain table rows");
};
if phys.reads_from_view(true) || phys.reads_from_view(false) {
bail!("view definition cannot read from other views");
}
if source_schema.row_type != *expected_row_type {
bail!(
"query returns `{}` but view expects `{}`",
fmt_algebraic_type(&AlgebraicType::Product(source_schema.row_type.clone())),
fmt_algebraic_type(&AlgebraicType::Product(expected_row_type.clone())),
);
}
/// Compiles and runs a SQL query that was returned from a view.
/// This is a standalone function extracted from `InstanceCommon::run_query_for_view`
/// so it can be called from re-entrant view evaluation during procedure commits.
pub(crate) fn evaluate_view_sql(
tx: &mut MutTxId,
the_query: &str,
expected_row_type: &ProductType,
call_info: &ViewCallInfo,
database_identity: Identity,
) -> anyhow::Result<Vec<ProductValue>> {
if the_query.trim().is_empty() {
return Ok(Vec::new());
}

// Views bypass RLS, since views should enforce their own access control procedurally.
let auth = AuthCtx::for_current(database_identity);
let schema_view = SchemaViewer::new(&*tx, &auth);

// Compile to subscription plans.
let (plans, has_params) = SubscriptionPlan::compile(the_query, &schema_view, &auth)?;
ensure!(
!has_params,
"parameterized SQL is not supported for view materialization yet"
);

// Validate shape and disallow views-on-views.
for plan in &plans {
let phys = plan.optimized_physical_plan();
let Some(source_schema) = phys.return_table() else {
bail!("query does not return plain table rows");
};
if phys.reads_from_view(true) || phys.reads_from_view(false) {
bail!("view definition cannot read from other views");
}
if source_schema.row_type != *expected_row_type {
bail!(
"query returns `{}` but view expects `{}`",
fmt_algebraic_type(&AlgebraicType::Product(source_schema.row_type.clone())),
fmt_algebraic_type(&AlgebraicType::Product(expected_row_type.clone())),
);
}
}

let op = FuncCallType::View(call_info.clone());
let mut metrics = ExecutionMetrics::default();
let mut rows = Vec::new();

for plan in plans {
// Track read sets for all tables involved in this plan.
// TODO(jsdt): This means we will rerun the view and query for any change to these tables, so we should optimize this asap.
for table_id in plan.table_ids() {
tx.record_table_scan(&op, table_id);
}
let op = FuncCallType::View(call_info.clone());
let mut metrics = ExecutionMetrics::default();
let mut rows = Vec::new();

let pipelined = PipelinedProject::from(plan.optimized_physical_plan().clone());
pipelined.execute(&*tx, &mut metrics, &mut |row| {
rows.push(row.to_product_value());
Ok(())
})?;
for plan in plans {
// Track read sets for all tables involved in this plan.
// TODO(jsdt): This means we will rerun the view and query for any change to these tables, so we should optimize this asap.
for table_id in plan.table_ids() {
tx.record_table_scan(&op, table_id);
}

Ok(rows)
let pipelined = PipelinedProject::from(plan.optimized_physical_plan().clone());
pipelined.execute(&*tx, &mut metrics, &mut |row| {
rows.push(row.to_product_value());
Ok(())
})?;
}

Ok(rows)
}

impl InstanceCommon {
/// A [`MutTxId`] knows which views must be updated (re-evaluated).
/// This method re-evaluates them and updates their backing tables.
pub(crate) fn call_views_with_tx<I: WasmInstance>(
Expand Down
Loading
Loading