Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/client-api/src/routes/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ where
let module_def = &module.info.module_def;
let response_json = match version {
SchemaVersion::V9 => {
let raw = RawModuleDefV9::from(module_def.clone());
let raw = RawModuleDefV9::from(module_def.as_ref().clone());
axum::Json(sats::serde::SerdeWrapper(raw)).into_response()
}
};
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1282,8 +1282,8 @@ pub(crate) async fn extract_schema_with_pools(
Host::try_init_in_memory_to_check(runtimes, page_pool, database, program, core, bsatn_rlb_pool).await?;
// this should always succeed, but sometimes it doesn't
let module_def = match Arc::try_unwrap(module_info) {
Ok(info) => info.module_def,
Err(info) => info.module_def.clone(),
Ok(info) => Arc::try_unwrap(info.module_def).unwrap_or_else(|module_def| (*module_def).clone()),
Err(info) => (*info.module_def).clone(),
};

Ok(module_def)
Expand Down
25 changes: 21 additions & 4 deletions crates/core/src/host/instance_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,11 @@ impl InstanceEnv {
self.func_name.as_deref()
}

/// Swap in a temporary function type, returning the previous one.
pub fn swap_func_type(&mut self, func_type: FuncCallType) -> FuncCallType {
mem::replace(&mut self.func_type, func_type)
}

fn get_tx(&self) -> Result<impl DerefMut<Target = MutTxId> + '_, GetTxError> {
self.tx.get()
}
Expand Down Expand Up @@ -733,10 +738,19 @@ impl InstanceEnv {
// on `tokio::runtime::Handle::try_current()` before being able to run the `get_tx()` check.

pub fn commit_mutable_tx(&mut self) -> Result<(), NodesError> {
let tx = self.take_mutable_tx_for_commit()?;
self.commit_procedure_tx(tx)
}

/// Extract an anonymous mutable tx so callers can perform extra work before commit.
pub fn take_mutable_tx_for_commit(&mut self) -> Result<MutTxId, NodesError> {
self.finish_anon_tx()?;
Ok(self.take_tx()?)
}

/// Commit an anonymous procedure tx and broadcast resulting updates.
pub fn commit_procedure_tx(&mut self, tx: MutTxId) -> Result<(), NodesError> {
let stdb = self.relational_db().clone();
let tx = self.take_tx()?;
let subs = self.replica_ctx.subscriptions.clone();

let event = ModuleEvent {
Expand All @@ -761,13 +775,16 @@ impl InstanceEnv {

pub fn abort_mutable_tx(&mut self) -> Result<(), NodesError> {
self.finish_anon_tx()?;
let stdb = self.relational_db().clone();
let tx = self.take_tx()?;
self.rollback_procedure_tx(tx);
Ok(())
}

// Roll back the tx.
/// Roll back an anonymous procedure tx and record the resulting offset.
pub fn rollback_procedure_tx(&mut self, tx: MutTxId) {
let stdb = self.relational_db().clone();
let offset = ModuleSubscriptions::rollback_mut_tx(&stdb, tx);
self.procedure_last_tx_offset = Some(from_tx_offset(offset));
Ok(())
}

/// In-case there is a anonymous tx at the end of a procedure,
Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ pub struct ModuleInfo {
/// The definition of the module.
/// Loaded by loading the module's program from the system tables, extracting its definition,
/// and validating.
pub module_def: ModuleDef,
pub module_def: Arc<ModuleDef>,
/// The identity of the module.
pub owner_identity: Identity,
/// The identity of the database.
Expand Down Expand Up @@ -292,7 +292,7 @@ impl ModuleInfo {
) -> Arc<Self> {
let metrics = ModuleMetrics::new(&database_identity);
Arc::new(ModuleInfo {
module_def,
module_def: Arc::new(module_def),
owner_identity,
database_identity,
module_hash,
Expand Down Expand Up @@ -1936,7 +1936,7 @@ impl ModuleHost {
table_id,
fn_ptr,
sender,
} in out.tx.view_for_update().cloned().collect::<Vec<_>>()
} in out.tx.views_for_refresh().cloned().collect::<Vec<_>>()
{
let view_def = module_def
.get_view_by_id(fn_ptr, sender.is_none())
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/host/v8/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use spacetimedb_datastore::locking_tx_datastore::FuncCallType;
use spacetimedb_datastore::traits::Program;
use spacetimedb_lib::{ConnectionId, Identity, RawModuleDef, Timestamp};
use spacetimedb_schema::auto_migrate::MigrationPolicy;
use spacetimedb_schema::def::ModuleDef;
use spacetimedb_schema::identifier::Identifier;
use spacetimedb_table::static_assert_size;
use std::panic::AssertUnwindSafe;
Expand Down Expand Up @@ -853,6 +854,8 @@ impl WasmInstance for V8Instance<'_, '_, '_> {
self.scope.get_slot::<JsInstanceEnv>().unwrap().instance_env.tx.clone()
}

fn set_module_def(&mut self, _: Arc<ModuleDef>) {}

fn call_reducer(&mut self, op: ReducerOp<'_>, budget: FunctionBudget) -> ReducerExecuteResult {
common_call(self.scope, budget, op, |scope, op| {
Ok(call_call_reducer(scope, self.hooks, op, self.reducer_args_buf)?)
Expand Down
123 changes: 68 additions & 55 deletions crates/core/src/host/wasm_common/module_host_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ pub trait WasmInstance {

fn tx_slot(&self) -> TxSlot;

fn set_module_def(&mut self, module_def: Arc<ModuleDef>);

fn call_reducer(&mut self, op: ReducerOp<'_>, budget: FunctionBudget) -> ReducerExecuteResult;

fn call_view(&mut self, op: ViewOp<'_>, budget: FunctionBudget) -> ViewExecuteResult;
Expand Down Expand Up @@ -114,7 +116,7 @@ impl EnergyStats {
}
}

fn deserialize_view_rows(
pub(crate) fn deserialize_view_rows(
row_type: AlgebraicTypeRef,
bytes: Bytes,
typespace: &Typespace,
Expand Down Expand Up @@ -143,6 +145,67 @@ fn deserialize_view_rows(
.collect()
}

pub(crate) fn run_query_for_view(
tx: &mut MutTxId,
the_query: &str,
expected_row_type: &ProductType,
call_info: &ViewCallInfo,
database_identity: Identity,
) -> anyhow::Result<Vec<ProductValue>> {
if the_query.trim().is_empty() {
return Ok(Vec::new());
}

// Views bypass RLS, since views should enforce their own access control procedurally.
let auth = AuthCtx::for_current(database_identity);
let schema_view = SchemaViewer::new(&*tx, &auth);

// Compile to subscription plans.
let (plans, has_params) = SubscriptionPlan::compile(the_query, &schema_view, &auth)?;
ensure!(
!has_params,
"parameterized SQL is not supported for view materialization yet"
);

// Validate shape and disallow views-on-views.
for plan in &plans {
let phys = plan.optimized_physical_plan();
let Some(source_schema) = phys.return_table() else {
bail!("query does not return plain table rows");
};
if phys.reads_from_view(true) || phys.reads_from_view(false) {
bail!("view definition cannot read from other views");
}
if source_schema.row_type != *expected_row_type {
bail!(
"query returns `{}` but view expects `{}`",
fmt_algebraic_type(&AlgebraicType::Product(source_schema.row_type.clone())),
fmt_algebraic_type(&AlgebraicType::Product(expected_row_type.clone())),
);
}
}

let op = FuncCallType::View(call_info.clone());
let mut metrics = ExecutionMetrics::default();
let mut rows = Vec::new();

for plan in plans {
// Track read sets for all tables involved in this plan.
// TODO(jsdt): This means we will rerun the view and query for any change to these tables, so we should optimize this asap.
for table_id in plan.table_ids() {
tx.record_table_scan(&op, table_id);
}

let pipelined = PipelinedProject::from(plan.optimized_physical_plan().clone());
pipelined.execute(&*tx, &mut metrics, &mut |row| {
rows.push(row.to_product_value());
Ok(())
})?;
}

Ok(rows)
}

pub struct ExecutionTimings {
pub total_duration: Duration,
pub wasm_instance_env_call_times: CallTimes,
Expand Down Expand Up @@ -333,8 +396,9 @@ impl<T: WasmModule> WasmModuleHostActor<T> {
}

impl<T: WasmModule> WasmModuleHostActor<T> {
fn make_from_instance(&self, instance: T::Instance) -> WasmModuleInstance<T::Instance> {
fn make_from_instance(&self, mut instance: T::Instance) -> WasmModuleInstance<T::Instance> {
let common = InstanceCommon::new(&self.common);
instance.set_module_def(common.info().module_def.clone());
WasmModuleInstance {
instance,
common,
Expand Down Expand Up @@ -1244,58 +1308,7 @@ impl InstanceCommon {
expected_row_type: &ProductType,
call_info: &ViewCallInfo,
) -> anyhow::Result<Vec<ProductValue>> {
if the_query.trim().is_empty() {
return Ok(Vec::new());
}

// Views bypass RLS, since views should enforce their own access control procedurally.
let auth = AuthCtx::for_current(self.info.database_identity);
let schema_view = SchemaViewer::new(&*tx, &auth);

// Compile to subscription plans.
let (plans, has_params) = SubscriptionPlan::compile(the_query, &schema_view, &auth)?;
ensure!(
!has_params,
"parameterized SQL is not supported for view materialization yet"
);

// Validate shape and disallow views-on-views.
for plan in &plans {
let phys = plan.optimized_physical_plan();
let Some(source_schema) = phys.return_table() else {
bail!("query does not return plain table rows");
};
if phys.reads_from_view(true) || phys.reads_from_view(false) {
bail!("view definition cannot read from other views");
}
if source_schema.row_type != *expected_row_type {
bail!(
"query returns `{}` but view expects `{}`",
fmt_algebraic_type(&AlgebraicType::Product(source_schema.row_type.clone())),
fmt_algebraic_type(&AlgebraicType::Product(expected_row_type.clone())),
);
}
}

let op = FuncCallType::View(call_info.clone());
let mut metrics = ExecutionMetrics::default();
let mut rows = Vec::new();

for plan in plans {
// Track read sets for all tables involved in this plan.
// TODO(jsdt): This means we will rerun the view and query for any change to these tables, so we should optimize this asap.
for table_id in plan.table_ids() {
tx.record_table_scan(&op, table_id);
}

let pipelined = PipelinedProject::from(plan.optimized_physical_plan().clone());
pipelined.execute(&*tx, &mut metrics, &mut |row| {
rows.push(row.to_product_value());
Ok(())
})?;
}

Ok(rows)
run_query_for_view(tx, the_query, expected_row_type, call_info, self.info.database_identity)
}
/// A [`MutTxId`] knows which views must be updated (re-evaluated).
/// This method re-evaluates them and updates their backing tables.
Expand All @@ -1308,7 +1321,7 @@ impl InstanceCommon {
timestamp: Timestamp,
) -> (ViewCallResult, bool) {
let view_calls = tx
.view_for_update()
.views_for_refresh()
.map(|info| {
let view_def = module_def
.get_view_by_id(info.fn_ptr, info.sender.is_none())
Expand Down
Loading
Loading