Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions crates/core/src/host/v8/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ fn env_on_isolate_unwrap(isolate: &mut Isolate) -> &mut JsInstanceEnv {
/// The environment of a [`JsInstance`].
struct JsInstanceEnv {
instance_env: InstanceEnv,
module_def: Option<Arc<ModuleDef>>,

/// The slab of `BufferIters` created for this instance.
iters: RowIters,
Expand All @@ -233,6 +234,7 @@ impl JsInstanceEnv {
fn new(instance_env: InstanceEnv) -> Self {
Self {
instance_env,
module_def: None,
call_times: CallTimes::new(),
iters: <_>::default(),
chunk_pool: <_>::default(),
Expand Down Expand Up @@ -273,6 +275,14 @@ impl JsInstanceEnv {
wasm_instance_env_call_times,
}
}

fn set_module_def(&mut self, module_def: Arc<ModuleDef>) {
self.module_def = Some(module_def);
}

fn module_def(&self) -> Option<Arc<ModuleDef>> {
self.module_def.clone()
}
}

/// An instance for a [`JsModule`].
Expand Down Expand Up @@ -616,6 +626,7 @@ async fn spawn_instance_worker(
return;
}
Ok((crf, module_common)) => {
env_on_isolate_unwrap(scope).set_module_def(module_common.info().module_def.clone());
// Success! Send `module_common` to the spawner.
if send_result(Ok(module_common.clone())).is_err() {
return;
Expand Down Expand Up @@ -757,8 +768,7 @@ async fn spawn_instance_worker(
}

/// The embedder data slot for the `__get_error_constructor__` function.
/// One greater than the greatest value of [`syscall::ModuleHookKey`].
const GET_ERROR_CONSTRUCTOR_SLOT: i32 = 5;
const GET_ERROR_CONSTRUCTOR_SLOT: i32 = syscall::ModuleHookKey::GetErrorConstructor as i32;

/// Compiles, instantiate, and evaluate `code` as a module.
fn eval_module<'scope>(
Expand Down Expand Up @@ -854,7 +864,9 @@ impl WasmInstance for V8Instance<'_, '_, '_> {
self.scope.get_slot::<JsInstanceEnv>().unwrap().instance_env.tx.clone()
}

fn set_module_def(&mut self, _: Arc<ModuleDef>) {}
fn set_module_def(&mut self, module_def: Arc<ModuleDef>) {
env_on_isolate_unwrap(self.scope).set_module_def(module_def);
}

fn call_reducer(&mut self, op: ReducerOp<'_>, budget: FunctionBudget) -> ReducerExecuteResult {
common_call(self.scope, budget, op, |scope, op| {
Expand Down
218 changes: 211 additions & 7 deletions crates/core/src/host/v8/syscall/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,22 @@ use super::super::{
util::make_uint8array,
JsInstanceEnv,
};
use super::HookFunctions;
use super::{call_call_view, call_call_view_anon, get_registered_hooks, HookFunctions};
use crate::database_logger::{LogLevel, Record};
use crate::error::NodesError;
use crate::host::instance_env::InstanceEnv;
use crate::host::wasm_common::{RowIterIdx, TimingSpan, TimingSpanIdx};
use crate::{
database_logger::{LogLevel, Record},
host::wasm_common::module_host_actor::ProcedureOp,
use crate::host::wasm_common::module_host_actor::{
deserialize_view_rows, run_query_for_view, AnonymousViewOp, ProcedureOp, ViewOp, ViewResult, ViewReturnData,
};
use crate::host::wasm_common::{RowIterIdx, TimingSpan, TimingSpanIdx};
use anyhow::Context;
use bytes::Bytes;
use spacetimedb_datastore::locking_tx_datastore::{FuncCallType, MutTxId, ViewCallInfo};
use spacetimedb_lib::{ConnectionId, Identity, RawModuleDef, Timestamp};
use spacetimedb_primitives::{ColId, IndexId, ProcedureId, TableId};
use spacetimedb_sats::bsatn;
use spacetimedb_schema::def::ModuleDef;
use spacetimedb_schema::identifier::Identifier;
use v8::{FunctionCallbackArguments, Isolate, Local, PinScope, Value};

/// Calls the `__call_procedure__` function `fun`.
Expand Down Expand Up @@ -652,9 +656,209 @@ pub fn procedure_commit_mut_tx(
scope: &mut PinScope<'_, '_>,
_args: FunctionCallbackArguments<'_>,
) -> SysCallResult<()> {
let env = get_env(scope)?;
let tx = {
let env = get_env(scope)?;
env.instance_env.take_mutable_tx_for_commit()?
};

env.instance_env.commit_mutable_tx()?;
let hooks = get_registered_hooks(scope).ok_or_else(|| {
SysCallError::Exception(
TypeError("module hooks are unavailable while committing a procedure transaction").throw(scope),
)
})?;
let module_def = get_env(scope)?.module_def().ok_or_else(|| {
SysCallError::Exception(
TypeError("module definition is unavailable while committing a procedure transaction").throw(scope),
)
})?;
let tx = refresh_views(scope, tx, &hooks, &module_def)?;
get_env(scope)?.instance_env.commit_procedure_tx(tx)?;

Ok(())
}

/// Refresh all views made stale by a procedure `tx`.
///
/// This runs each pending view call in the same mutable transaction and writes the refreshed rows
/// into the corresponding backing view tables. If any step fails (missing metadata, view execution,
/// row decoding, SQL execution, or materialization), this method rolls back `tx` and returns an error.
///
/// On success, it returns the same transaction handle so the caller can commit it.
fn refresh_views(
scope: &mut PinScope<'_, '_>,
tx: MutTxId,
hooks: &HookFunctions<'_>,
module_def: &ModuleDef,
) -> SysCallResult<MutTxId> {
let views_for_refresh = tx.views_for_refresh().cloned().collect::<Vec<_>>();
let stdb = get_env(scope)?.instance_env.relational_db().clone();
let database_identity = *get_env(scope)?.instance_env.database_identity();
let mut tx_slot = get_env(scope)?.instance_env.tx.clone();
let mut tx = Some(tx);

for view_call in views_for_refresh {
let res: SysCallResult<()> = (|| {
let view_def = module_def
.get_view_by_id(view_call.fn_ptr, view_call.sender.is_none())
.ok_or_else(|| {
SysCallError::Exception(
TypeError(format!(
"view with fn_ptr `{}` not found while refreshing procedure transaction",
view_call.fn_ptr
))
.throw(scope),
)
})?;

let current_tx = tx.take().expect("procedure tx missing during view refresh");
let (next_tx, call_result) =
tx_slot.set(current_tx, || call_view(scope, hooks, &view_call, &view_def.name));
tx = Some(next_tx);
let return_data = call_result?;

let typespace = module_def.typespace();
let row_product_type = typespace
.resolve(view_def.product_type_ref)
.resolve_refs()
.map_err(|err| {
SysCallError::Exception(
TypeError(format!(
"failed resolving row type for refreshed view `{}`: {err}",
view_def.name
))
.throw(scope),
)
})?
.into_product()
.map_err(|_| {
SysCallError::Exception(
TypeError(format!(
"failed resolving row product type for refreshed view `{}`",
view_def.name
))
.throw(scope),
)
})?;

let rows = match ViewResult::from_return_data(return_data).map_err(|err| {
SysCallError::Exception(
TypeError(format!(
"failed parsing result for refreshed view `{}`: {err}",
view_def.name
))
.throw(scope),
)
})? {
ViewResult::Rows(bytes) => {
deserialize_view_rows(view_def.product_type_ref, bytes, typespace).map_err(NodesError::from)?
}
ViewResult::RawSql(query) => run_query_for_view(
tx.as_mut().expect("procedure tx missing while running view query"),
&query,
&row_product_type,
&view_call,
database_identity,
)
.map_err(|err| {
SysCallError::Exception(
TypeError(format!(
"failed running query for refreshed view `{}`: {err}",
view_def.name
))
.throw(scope),
)
})?,
};

match view_call.sender {
Some(sender) => stdb
.materialize_view(
tx.as_mut()
.expect("procedure tx missing while materializing authenticated view"),
view_call.table_id,
sender,
rows,
)
.map_err(NodesError::from)?,
None => stdb
.materialize_anonymous_view(
tx.as_mut()
.expect("procedure tx missing while materializing anonymous view"),
view_call.table_id,
rows,
)
.map_err(NodesError::from)?,
}

Ok(())
})();

if let Err(err) = res {
let tx = tx.expect("procedure tx missing while rolling back failed view refresh");
get_env(scope)?.instance_env.rollback_procedure_tx(tx);
return Err(err);
}
}

Ok(tx.expect("procedure tx missing after refreshing views"))
}

/// Execute a view and return its payload.
///
/// This helper is used by [`refresh_views`] while a procedure transaction is being committed.
/// It temporarily sets the active function type to the target view for dependency tracking,
/// invokes the applicable JS hook, restores the previous function type, and returns [`ViewReturnData`].
fn call_view(
scope: &mut PinScope<'_, '_>,
hooks: &HookFunctions<'_>,
view_call: &ViewCallInfo,
view_name: &Identifier,
) -> SysCallResult<ViewReturnData> {
let prev_func_type = get_env(scope)?
.instance_env
.swap_func_type(FuncCallType::View(view_call.clone()));

let result = {
let args = crate::host::ArgsTuple::nullary();
match view_call.sender {
Some(sender) => call_call_view(
scope,
hooks,
ViewOp {
name: view_name,
view_id: view_call.view_id,
table_id: view_call.table_id,
fn_ptr: view_call.fn_ptr,
args: &args,
sender: &sender,
timestamp: Timestamp::now(),
},
),
None => call_call_view_anon(
scope,
hooks,
AnonymousViewOp {
name: view_name,
view_id: view_call.view_id,
table_id: view_call.table_id,
fn_ptr: view_call.fn_ptr,
args: &args,
timestamp: Timestamp::now(),
},
),
}
};

get_env(scope)?.instance_env.swap_func_type(prev_func_type);

result.map_err(|err| match err {
ErrorOrException::Err(err) => SysCallError::Exception(
TypeError(format!(
"failed executing refreshed view `{}` during procedure commit: {err}",
view_name
))
.throw(scope),
),
ErrorOrException::Exception(exc) => SysCallError::Exception(exc),
})
}
44 changes: 41 additions & 3 deletions crates/core/src/host/v8/syscall/hooks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,46 @@ pub(super) fn set_hook_slots(
Ok(())
}

/// Registers the hooks for the current module instance so they can be reconstructed later
/// from procedure syscalls that only have access to the current V8 context.
pub(in super::super) fn set_registered_hooks(scope: &mut PinScope<'_, '_>, hooks: &HookFunctions<'_>) -> ExcResult<()> {
let mut to_register = vec![
(ModuleHookKey::DescribeModule, hooks.describe_module),
(ModuleHookKey::CallReducer, hooks.call_reducer),
];
if let Some(call_view) = hooks.call_view {
to_register.push((ModuleHookKey::CallView, call_view));
}
if let Some(call_view_anon) = hooks.call_view_anon {
to_register.push((ModuleHookKey::CallAnonymousView, call_view_anon));
}
if let Some(call_procedure) = hooks.call_procedure {
to_register.push((ModuleHookKey::CallProcedure, call_procedure));
}
if let Some(get_error_constructor) = hooks.get_error_constructor {
to_register.push((ModuleHookKey::GetErrorConstructor, get_error_constructor));
}
if let Some(sender_error_class) = hooks.sender_error_class {
to_register.push((ModuleHookKey::SenderErrorClass, sender_error_class));
}

set_hook_slots(scope, hooks.abi, &to_register)?;

let ctx = scope.get_current_context();
ctx.set_embedder_data(RECV_SLOT_INDEX, hooks.recv);

Ok(())
}

#[derive(enum_map::Enum, Copy, Clone)]
pub(in super::super) enum ModuleHookKey {
DescribeModule,
CallReducer,
CallView,
CallAnonymousView,
CallProcedure,
GetErrorConstructor,
SenderErrorClass,
}

impl ModuleHookKey {
Expand All @@ -59,6 +92,9 @@ impl ModuleHookKey {
}
}

/// Context embedder slot holding the receiver (`this`) value used for hook calls.
const RECV_SLOT_INDEX: i32 = ModuleHookKey::SenderErrorClass as i32 + 1;

/// Holds the `AbiVersion` used by the module
/// and the module hooks registered by the module
/// for that version.
Expand Down Expand Up @@ -126,10 +162,12 @@ pub(in super::super) fn get_registered_hooks<'scope>(

Some(HookFunctions {
abi: hooks.abi,
recv: v8::undefined(scope).into(),
recv: ctx
.get_embedder_data(scope, RECV_SLOT_INDEX)
.unwrap_or_else(|| v8::undefined(scope).into()),
describe_module: get(ModuleHookKey::DescribeModule)?,
get_error_constructor: None,
sender_error_class: None,
get_error_constructor: get(ModuleHookKey::GetErrorConstructor),
sender_error_class: get(ModuleHookKey::SenderErrorClass),
call_reducer: get(ModuleHookKey::CallReducer)?,
call_view: get(ModuleHookKey::CallView),
call_view_anon: get(ModuleHookKey::CallAnonymousView),
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/host/v8/syscall/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ mod hooks;
mod v1;
mod v2;

pub(super) use self::hooks::{get_registered_hooks, HookFunctions, ModuleHookKey};
pub(super) use self::hooks::{get_registered_hooks, set_registered_hooks, HookFunctions, ModuleHookKey};

/// The return type of a module -> host syscall.
pub(super) type FnRet<'scope> = ExcResult<Local<'scope, v8::Value>>;
Expand Down
Loading
Loading