11use anyhow:: { anyhow, Result } ;
2- use rust_embed :: RustEmbed ;
2+ use std :: path :: PathBuf ;
33use std:: string:: String ;
4- use std:: { collections:: HashSet , path:: PathBuf } ;
54use wasmtime:: { AsContextMut , Config , Engine , Linker , Module , ResourceLimiter , Store } ;
6- use wasmtime_wasi:: pipe:: { MemoryInputPipe , MemoryOutputPipe } ;
75use wasmtime_wasi:: preview1:: WasiP1Ctx ;
8- use wasmtime_wasi:: { I32Exit , WasiCtxBuilder } ;
6+ use wasmtime_wasi:: I32Exit ;
97
108use crate :: function_run_result:: FunctionRunResult ;
9+ use crate :: io:: { IOHandler , OutputAndLogs } ;
1110use crate :: { BytesContainer , BytesContainerType } ;
1211
1312#[ derive( Clone ) ]
@@ -16,44 +15,12 @@ pub struct ProfileOpts {
1615 pub out : PathBuf ,
1716}
1817
19- #[ derive( RustEmbed ) ]
20- #[ folder = "providers/" ]
21- struct StandardProviders ;
22-
2318pub fn uses_msgpack_provider ( module : & Module ) -> bool {
2419 module. imports ( ) . map ( |i| i. module ( ) ) . any ( |module| {
2520 module. starts_with ( "shopify_function_v" ) || module == "shopify_functions_javy_v2"
2621 } )
2722}
2823
29- fn import_modules < T > (
30- module : & Module ,
31- engine : & Engine ,
32- linker : & mut Linker < T > ,
33- mut store : & mut Store < T > ,
34- ) {
35- let imported_modules: HashSet < String > =
36- module. imports ( ) . map ( |i| i. module ( ) . to_string ( ) ) . collect ( ) ;
37-
38- imported_modules. iter ( ) . for_each ( |module_name| {
39- let provider_path = format ! ( "{module_name}.wasm" ) ;
40- let imported_module_bytes = StandardProviders :: get ( & provider_path) ;
41-
42- if let Some ( bytes) = imported_module_bytes {
43- let imported_module = Module :: from_binary ( engine, & bytes. data )
44- . unwrap_or_else ( |_| panic ! ( "Failed to load module {module_name}" ) ) ;
45-
46- let imported_module_instance = linker
47- . instantiate ( & mut store, & imported_module)
48- . expect ( "Failed to instantiate imported instance" ) ;
49-
50- linker
51- . instance ( & mut store, module_name, imported_module_instance)
52- . expect ( "Failed to import module" ) ;
53- }
54- } ) ;
55- }
56-
5724pub struct FunctionRunParams < ' a > {
5825 pub function_path : PathBuf ,
5926 pub input : BytesContainer ,
@@ -68,12 +35,12 @@ const STARTING_FUEL: u64 = u64::MAX;
6835const MAXIMUM_MEMORIES : usize = 2 ; // 1 for the module, 1 for Javy's provider
6936
7037struct FunctionContext {
71- wasi : WasiP1Ctx ,
38+ wasi : Option < WasiP1Ctx > ,
7239 limiter : MemoryLimiter ,
7340}
7441
7542impl FunctionContext {
76- fn new ( wasi : WasiP1Ctx ) -> Self {
43+ fn new ( wasi : Option < WasiP1Ctx > ) -> Self {
7744 Self {
7845 wasi,
7946 limiter : Default :: default ( ) ,
@@ -128,85 +95,77 @@ pub fn run(params: FunctionRunParams) -> Result<FunctionRunResult> {
12895 module,
12996 } = params;
13097
131- let input_stream = MemoryInputPipe :: new ( input. raw . clone ( ) ) ;
132- let output_stream = MemoryOutputPipe :: new ( usize:: MAX ) ;
133- let error_stream = MemoryOutputPipe :: new ( usize:: MAX ) ;
98+ let mut io_handler = IOHandler :: new ( module, input. clone ( ) ) ;
13499
135100 let memory_usage: u64 ;
136101 let instructions: u64 ;
137102 let mut error_logs: String = String :: new ( ) ;
138103 let mut module_result: Result < ( ) , anyhow:: Error > ;
139104 let profile_data: Option < String > ;
140105
141- {
142- let mut linker = Linker :: new ( & engine) ;
106+ let mut linker = Linker :: new ( & engine) ;
107+ let wasi = io_handler. wasi ( ) ;
108+ if wasi. is_some ( ) {
143109 wasmtime_wasi:: preview1:: add_to_linker_sync ( & mut linker, |ctx : & mut FunctionContext | {
144- & mut ctx. wasi
110+ ctx. wasi . as_mut ( ) . expect ( "Should have WASI context" )
145111 } ) ?;
146112 deterministic_wasi_ctx:: replace_scheduling_functions ( & mut linker) ?;
147- let mut wasi_builder = WasiCtxBuilder :: new ( ) ;
148- wasi_builder. stdin ( input_stream) ;
149- wasi_builder. stdout ( output_stream. clone ( ) ) ;
150- wasi_builder. stderr ( error_stream. clone ( ) ) ;
151- deterministic_wasi_ctx:: add_determinism_to_wasi_ctx_builder ( & mut wasi_builder) ;
152- let wasi = wasi_builder. build_p1 ( ) ;
153- let function_context = FunctionContext :: new ( wasi) ;
154- let mut store = Store :: new ( & engine, function_context) ;
155- store. limiter ( |s| & mut s. limiter ) ;
156- store. set_fuel ( STARTING_FUEL ) ?;
157- store. set_epoch_deadline ( 1 ) ;
158-
159- import_modules ( & module, & engine, & mut linker, & mut store) ;
160-
161- linker. module ( & mut store, "Function" , & module) ?;
162- let instance = linker. instantiate ( & mut store, & module) ?;
163-
164- let func = instance. get_typed_func :: < ( ) , ( ) > ( store. as_context_mut ( ) , export) ?;
165-
166- ( module_result, profile_data) = if let Some ( profile_opts) = profile_opts {
167- let ( result, profile_data) = wasmprof:: ProfilerBuilder :: new ( & mut store)
168- . frequency ( profile_opts. interval )
169- . weight_unit ( wasmprof:: WeightUnit :: Fuel )
170- . profile ( |store| func. call ( store. as_context_mut ( ) , ( ) ) ) ;
171-
172- (
173- result,
174- Some ( profile_data. into_collapsed_stacks ( ) . to_string ( ) ) ,
175- )
176- } else {
177- ( func. call ( store. as_context_mut ( ) , ( ) ) , None )
178- } ;
179-
180- // modules may exit with a specific exit code, an exit code of 0 is considered success but is reported as
181- // a GuestFault by wasmtime, so we need to map it to a success result. Any other exit code is considered
182- // a failure.
183- module_result = module_result. or_else ( |error| match error. downcast_ref :: < I32Exit > ( ) {
184- Some ( I32Exit ( 0 ) ) => Ok ( ( ) ) ,
185- Some ( I32Exit ( code) ) => Err ( anyhow ! ( "module exited with code: {}" , code) ) ,
186- None => Err ( error) ,
187- } ) ;
113+ }
188114
189- memory_usage = store. data ( ) . max_memory_bytes ( ) as u64 / 1024 ;
190- instructions = STARTING_FUEL . saturating_sub ( store. get_fuel ( ) . unwrap_or_default ( ) ) ;
115+ let function_context = FunctionContext :: new ( wasi) ;
116+ let mut store = Store :: new ( & engine, function_context) ;
117+ store. limiter ( |s| & mut s. limiter ) ;
191118
192- match module_result {
193- Ok ( _) => { }
194- Err ( ref e) => {
195- error_logs = e. to_string ( ) ;
196- }
197- }
119+ io_handler. initialize ( & engine, & mut linker, & mut store) ?;
120+
121+ store. set_fuel ( STARTING_FUEL ) ?;
122+ store. set_epoch_deadline ( 1 ) ;
123+
124+ let instance = linker. instantiate ( & mut store, io_handler. module ( ) ) ?;
125+
126+ let func = instance. get_typed_func :: < ( ) , ( ) > ( store. as_context_mut ( ) , export) ?;
127+
128+ ( module_result, profile_data) = if let Some ( profile_opts) = profile_opts {
129+ let ( result, profile_data) = wasmprof:: ProfilerBuilder :: new ( & mut store)
130+ . frequency ( profile_opts. interval )
131+ . weight_unit ( wasmprof:: WeightUnit :: Fuel )
132+ . profile ( |store| func. call ( store. as_context_mut ( ) , ( ) ) ) ;
133+
134+ (
135+ result,
136+ Some ( profile_data. into_collapsed_stacks ( ) . to_string ( ) ) ,
137+ )
138+ } else {
139+ ( func. call ( store. as_context_mut ( ) , ( ) ) , None )
198140 } ;
199141
200- let mut logs = error_stream
201- . try_into_inner ( )
202- . expect ( "Log stream reference still exists" ) ;
142+ // modules may exit with a specific exit code, an exit code of 0 is considered success but is reported as
143+ // a GuestFault by wasmtime, so we need to map it to a success result. Any other exit code is considered
144+ // a failure.
145+ module_result = module_result. or_else ( |error| match error. downcast_ref :: < I32Exit > ( ) {
146+ Some ( I32Exit ( 0 ) ) => Ok ( ( ) ) ,
147+ Some ( I32Exit ( code) ) => Err ( anyhow ! ( "module exited with code: {}" , code) ) ,
148+ None => Err ( error) ,
149+ } ) ;
150+
151+ memory_usage = store. data ( ) . max_memory_bytes ( ) as u64 / 1024 ;
152+ instructions = STARTING_FUEL . saturating_sub ( store. get_fuel ( ) . unwrap_or_default ( ) ) ;
153+
154+ match module_result {
155+ Ok ( _) => { }
156+ Err ( ref e) => {
157+ error_logs = e. to_string ( ) ;
158+ }
159+ }
160+
161+ let OutputAndLogs {
162+ output : raw_output,
163+ mut logs,
164+ } = io_handler. finalize ( store) ?;
203165
204166 logs. extend_from_slice ( error_logs. as_bytes ( ) ) ;
205167
206168 let output_codec = input. codec ;
207- let raw_output = output_stream
208- . try_into_inner ( )
209- . expect ( "Output stream reference still exists" ) ;
210169 let output = BytesContainer :: new (
211170 BytesContainerType :: Output ,
212171 output_codec,
0 commit comments