diff --git a/Cargo.lock b/Cargo.lock index d977666c0e..22a7707031 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3106,7 +3106,7 @@ dependencies = [ [[package]] name = "fluvio-socket" -version = "0.14.0" +version = "0.14.1" dependencies = [ "async-channel", "async-lock", diff --git a/crates/fluvio-smartengine/src/engine/common.rs b/crates/fluvio-smartengine/src/engine/common.rs index 79f46f2bb7..d46451cd88 100644 --- a/crates/fluvio-smartengine/src/engine/common.rs +++ b/crates/fluvio-smartengine/src/engine/common.rs @@ -8,9 +8,13 @@ use fluvio_smartmodule::dataplane::smartmodule::{ SmartModuleExtraParams, }; -pub trait WasmInstance { +use crate::SmartModuleInitialData; + +use super::error::EngineError; + +pub(crate) trait WasmInstance { type Context; - type Func: WasmFn; + type Func: WasmFn + Send + Sync + 'static; fn params(&self) -> SmartModuleExtraParams; @@ -26,12 +30,12 @@ pub trait WasmInstance { /// All smartmodule wasm functions have the same ABI: /// `(ptr: *mut u8, len: usize, version: i16) -> i32`, which is `(param i32 i32 i32) (result i32)` in wasm. -pub trait WasmFn { +pub(crate) trait WasmFn { type Context; fn call(&self, ptr: i32, len: i32, version: i32, ctx: &mut Self::Context) -> Result; } -pub trait SmartModuleTransform: Send + Sync { +pub(crate) trait SmartModuleTransform: Send + Sync { /// transform records fn process( &mut self, @@ -57,13 +61,13 @@ impl + Any, I: WasmInstance> DowncastableTransform } } -pub struct SimpleTransformImpl { +pub(crate) struct SimpleTransform { name: String, func: F, } impl + Send + Sync> SmartModuleTransform - for SimpleTransformImpl + for SimpleTransform { fn process( &mut self, @@ -89,15 +93,15 @@ impl + Send + Sync> SmartModule } } -impl SimpleTransformImpl { - pub(crate) fn try_instantiate( +impl SimpleTransform { + pub(crate) fn try_instantiate( name: &str, instance: &mut I, - ctx: &mut ::Context, + ctx: &mut C, ) -> Result> where - I: WasmInstance, - F: WasmFn, + I: WasmInstance, + F: WasmFn, { let func = instance .get_fn(name, ctx)? @@ -109,6 +113,45 @@ impl SimpleTransformImpl { } } +pub(crate) const FILTER_FN_NAME: &str = "filter"; +pub(crate) const MAP_FN_NAME: &str = "map"; +pub(crate) const FILTER_MAP_FN_NAME: &str = "filter_map"; +pub(crate) const ARRAY_MAP_FN_NAME: &str = "array_map"; + +pub(crate) fn create_transform( + instance: &mut I, + ctx: &mut C, + _initial_data: SmartModuleInitialData, +) -> Result>> +where + I: WasmInstance, +{ + if let Some(tr) = SimpleTransform::try_instantiate(FILTER_FN_NAME, instance, ctx)? + .map(|transform| Box::new(transform) as Box>) + { + Ok(tr) + } else if let Some(tr) = SimpleTransform::try_instantiate(MAP_FN_NAME, instance, ctx)? + .map(|transform| Box::new(transform) as Box>) + { + Ok(tr) + } else if let Some(tr) = SimpleTransform::try_instantiate(FILTER_MAP_FN_NAME, instance, ctx)? + .map(|transform| Box::new(transform) as Box>) + { + Ok(tr) + } else if let Some(tr) = SimpleTransform::try_instantiate(ARRAY_MAP_FN_NAME, instance, ctx)? + .map(|transform| Box::new(transform) as Box>) + { + Ok(tr) + // TODO: AGGREGATE + // } else if let Some(tr) = SmartModuleAggregate::try_instantiate(ctx, initial_data, store)? + // .map(|transform| Box::new(transform) as Box>) + // { + // Ok(tr) + } else { + Err(EngineError::UnknownSmartModule.into()) + } +} + pub(crate) const INIT_FN_NAME: &str = "init"; pub(crate) struct SmartModuleInit(F); @@ -220,87 +263,3 @@ impl, F: WasmFn + Send + Sync> SmartModuleInstance, - params: SmartModuleExtraParams, - version: i16, - } - pub struct WasmTimeContext { - store: Store<()>, - } - - pub type WasmTimeFn = wasmtime::TypedFunc<(i32, i32, i32), i32>; - - impl super::WasmInstance for WasmTimeInstance { - type Context = WasmTimeContext; - - type Func = WasmTimeFn; - - fn get_fn(&self, name: &str, ctx: &mut Self::Context) -> Result> { - match self.instance.get_func(&mut ctx.store, name) { - Some(func) => { - // check type signature - func.typed(&mut ctx.store) - .or_else(|_| func.typed(&ctx.store)) - .map(|f| Some(f)) - } - None => Ok(None), - } - } - - fn write_input( - &mut self, - input: &E, - ctx: &mut Self::Context, - ) -> anyhow::Result<(i32, i32, i32)> { - self.records_cb.clear(); - let mut input_data = Vec::new(); - input.encode(&mut input_data, self.version)?; - debug!( - len = input_data.len(), - version = self.version, - "input encoded" - ); - let array_ptr = crate::engine::wasmtime::memory::copy_memory_to_instance( - &mut ctx.store, - &self.instance, - &input_data, - )?; - let length = input_data.len(); - Ok((array_ptr as i32, length as i32, self.version as i32)) - } - - fn read_output(&mut self, ctx: &mut Self::Context) -> Result { - let bytes = self - .records_cb - .get() - .and_then(|m| m.copy_memory_from(&ctx.store).ok()) - .unwrap_or_default(); - let mut output = D::default(); - output.decode(&mut std::io::Cursor::new(bytes), self.version)?; - Ok(output) - } - } - - impl super::WasmFn for WasmTimeFn { - type Context = WasmTimeContext; - - fn call(&self, ptr: i32, len: i32, version: i32, ctx: &mut Self::Context) -> Result { - WasmTimeFn::call(self, &mut ctx.store, (ptr, len, version)) - } - } -} diff --git a/crates/fluvio-smartengine/src/engine/wasmedge/imp.rs b/crates/fluvio-smartengine/src/engine/wasmedge/imp.rs new file mode 100644 index 0000000000..da9f978f82 --- /dev/null +++ b/crates/fluvio-smartengine/src/engine/wasmedge/imp.rs @@ -0,0 +1,153 @@ +use tracing::debug; +use wasmedge_sdk::error::HostFuncError; +use wasmedge_sdk::types::Val; +use wasmedge_sdk::{ + Executor, Func, Module, Store, CallingFrame, WasmValue, Caller, ImportObjectBuilder, +}; + +use crate::engine::wasmedge::instance::{RecordsCallBack, RecordsMemory}; + +use crate::engine::common::{WasmFn, WasmInstance}; +use crate::engine::error::EngineError; + +use anyhow::Result; +use fluvio_smartmodule::dataplane::smartmodule::{SmartModuleExtraParams}; +use std::sync::Arc; + +pub(crate) struct WasmedgeInstance { + instance: wasmedge_sdk::Instance, + records_cb: Arc, + params: SmartModuleExtraParams, + version: i16, +} + +pub(crate) struct WasmedgeContext { + pub engine: Executor, +} + +pub type WasmedgeFn = Func; + +impl WasmInstance for WasmedgeInstance { + type Context = WasmedgeContext; + + type Func = WasmedgeFn; + + fn get_fn(&self, name: &str, _ctx: &mut Self::Context) -> Result> { + match self.instance.func(name) { + // check type signature + Some(func) => Ok(Some(func)), + None => Ok(None), + } + } + + fn write_input( + &mut self, + input: &E, + ctx: &mut Self::Context, + ) -> Result<(i32, i32, i32)> { + self.records_cb.clear(); + let mut input_data = Vec::new(); + input.encode(&mut input_data, self.version)?; + debug!( + len = input_data.len(), + version = self.version, + "input encoded" + ); + let array_ptr = + super::memory::copy_memory_to_instance(&ctx.engine, &self.instance, &input_data)?; + let length = input_data.len(); + Ok((array_ptr, length as i32, self.version as i32)) + } + + fn read_output( + &mut self, + _ctx: &mut Self::Context, + ) -> Result { + let bytes = self + .records_cb + .get() + .and_then(|m| m.copy_memory_from().ok()) + .unwrap_or_default(); + let mut output = D::default(); + output.decode(&mut std::io::Cursor::new(bytes), self.version)?; + Ok(output) + } + + fn params(&self) -> SmartModuleExtraParams { + self.params.clone() + } +} + +impl WasmFn for WasmedgeFn { + type Context = WasmedgeContext; + + fn call(&self, ptr: i32, len: i32, version: i32, ctx: &mut Self::Context) -> Result { + let res = self.call( + &ctx.engine, + vec![ + Val::I32(ptr).into(), + Val::I32(len).into(), + Val::I32(version).into(), + ], + )?; + Ok(res[0].to_i32()) + } +} + +impl WasmedgeInstance { + /// instantiate new module instance that contain context + pub(crate) fn instantiate( + store: &mut Store, + executor: &mut Executor, + module: Module, + params: SmartModuleExtraParams, + version: i16, + ) -> Result { + debug!("creating WasmModuleInstance"); + let cb = Arc::new(RecordsCallBack::new()); + let records_cb = cb.clone(); + + // See crates/fluvio-smartmodule-derive/src/generator/transform.rs for copy_records + let copy_records_fn = move |caller: CallingFrame, + inputs: Vec| + -> Result, HostFuncError> { + assert_eq!(inputs.len(), 2); + let ptr = inputs[0].to_i32() as u32; + let len = inputs[1].to_i32() as u32; + + debug!(len, "callback from wasm filter"); + let caller = Caller::new(caller); + let memory = caller.memory(0).unwrap(); + + let records = RecordsMemory { ptr, len, memory }; + cb.set(records); + Ok(vec![]) + }; + + let import = ImportObjectBuilder::new() + .with_func::<(i32, i32), ()>("copy_records", copy_records_fn) + .map_err(|e| EngineError::Instantiate(e.into()))? + .build("env") + .map_err(|e| EngineError::Instantiate(e.into()))?; + + debug!("instantiating WASMtime"); + store + .register_import_module(executor, &import) + .map_err(|e| EngineError::Instantiate(e.into()))?; + let instance = store + .register_active_module(executor, &module) + .map_err(|e| EngineError::Instantiate(e.into()))?; + + // This is a hack to avoid them being dropped + // FIXME: manage their lifetimes + std::mem::forget(import); + std::mem::forget(module); + + Ok(Self { + instance, + records_cb, + params, + version, + }) + } +} diff --git a/crates/fluvio-smartengine/src/engine/wasmedge/init.rs b/crates/fluvio-smartengine/src/engine/wasmedge/init.rs new file mode 100644 index 0000000000..48d9bff5b1 --- /dev/null +++ b/crates/fluvio-smartengine/src/engine/wasmedge/init.rs @@ -0,0 +1,11 @@ +use std::convert::TryFrom; +use std::fmt::Debug; + +use anyhow::{Result, Ok}; +use fluvio_smartmodule::dataplane::smartmodule::{ + SmartModuleInitInput, SmartModuleInitOutput, SmartModuleInitErrorStatus, +}; + +use crate::engine::common::{WasmFn, WasmInstance}; + +use super::instance::SmartModuleInstanceContext; diff --git a/crates/fluvio-smartengine/src/engine/wasmedge/mod.rs b/crates/fluvio-smartengine/src/engine/wasmedge/mod.rs index 5645d8f096..770d008ae9 100644 --- a/crates/fluvio-smartengine/src/engine/wasmedge/mod.rs +++ b/crates/fluvio-smartengine/src/engine/wasmedge/mod.rs @@ -1,168 +1,26 @@ mod instance; mod transforms; -use instance::*; + mod memory; +mod imp; use tracing::debug; -use wasmedge_sdk::error::HostFuncError; -use wasmedge_sdk::types::Val; -use wasmedge_sdk::{ - Executor, Func, Module, Store, CallingFrame, WasmValue, Caller, ImportObjectBuilder, -}; + +use wasmedge_sdk::{Executor, Module, Store}; use crate::{SmartModuleChainBuilder}; -use crate::engine::common::WasmFn; -use crate::engine::error::EngineError; + use crate::metrics::SmartModuleChainMetrics; use anyhow::Result; -use fluvio_smartmodule::dataplane::smartmodule::{ - SmartModuleInput, SmartModuleOutput, SmartModuleExtraParams, -}; -use std::sync::Arc; +use fluvio_smartmodule::dataplane::smartmodule::{SmartModuleInput, SmartModuleOutput}; -use self::transforms::create_transform; +use self::imp::{WasmedgeFn, WasmedgeInstance, WasmedgeContext}; + +use super::common::create_transform; type SmartModuleInit = super::common::SmartModuleInit; type SmartModuleInstance = super::common::SmartModuleInstance; -pub struct WasmedgeInstance { - instance: wasmedge_sdk::Instance, - records_cb: Arc, - params: SmartModuleExtraParams, - version: i16, -} - -pub struct WasmedgeContext { - engine: Executor, -} - -pub type WasmedgeFn = Func; - -impl super::common::WasmInstance for WasmedgeInstance { - type Context = WasmedgeContext; - - type Func = WasmedgeFn; - - fn get_fn(&self, name: &str, _ctx: &mut Self::Context) -> Result> { - match self.instance.func(name) { - // check type signature - Some(func) => Ok(Some(func)), - None => Ok(None), - } - } - - fn write_input( - &mut self, - input: &E, - ctx: &mut Self::Context, - ) -> Result<(i32, i32, i32)> { - self.records_cb.clear(); - let mut input_data = Vec::new(); - input.encode(&mut input_data, self.version)?; - debug!( - len = input_data.len(), - version = self.version, - "input encoded" - ); - let array_ptr = - memory::copy_memory_to_instance(&mut ctx.engine, &self.instance, &input_data)?; - let length = input_data.len(); - Ok((array_ptr, length as i32, self.version as i32)) - } - - fn read_output( - &mut self, - _ctx: &mut Self::Context, - ) -> Result { - let bytes = self - .records_cb - .get() - .and_then(|m| m.copy_memory_from().ok()) - .unwrap_or_default(); - let mut output = D::default(); - output.decode(&mut std::io::Cursor::new(bytes), self.version)?; - Ok(output) - } - - fn params(&self) -> SmartModuleExtraParams { - self.params.clone() - } -} - -impl WasmFn for WasmedgeFn { - type Context = WasmedgeContext; - - fn call(&self, ptr: i32, len: i32, version: i32, ctx: &mut Self::Context) -> Result { - let res = self.call( - &ctx.engine, - vec![ - Val::I32(ptr).into(), - Val::I32(len).into(), - Val::I32(version).into(), - ], - )?; - Ok(res[0].to_i32()) - } -} - -impl WasmedgeInstance { - /// instantiate new module instance that contain context - pub(crate) fn instantiate( - store: &mut Store, - executor: &mut Executor, - module: Module, - params: SmartModuleExtraParams, - version: i16, - ) -> Result { - debug!("creating WasmModuleInstance"); - let cb = Arc::new(RecordsCallBack::new()); - let records_cb = cb.clone(); - - // See crates/fluvio-smartmodule-derive/src/generator/transform.rs for copy_records - let copy_records_fn = move |caller: CallingFrame, - inputs: Vec| - -> Result, HostFuncError> { - assert_eq!(inputs.len(), 2); - let ptr = inputs[0].to_i32() as u32; - let len = inputs[1].to_i32() as u32; - - debug!(len, "callback from wasm filter"); - let caller = Caller::new(caller); - let memory = caller.memory(0).unwrap(); - - let records = RecordsMemory { ptr, len, memory }; - cb.set(records); - Ok(vec![]) - }; - - let import = ImportObjectBuilder::new() - .with_func::<(i32, i32), ()>("copy_records", copy_records_fn) - .map_err(|e| EngineError::Instantiate(e.into()))? - .build("env") - .map_err(|e| EngineError::Instantiate(e.into()))?; - - debug!("instantiating WASMtime"); - store - .register_import_module(executor, &import) - .map_err(|e| EngineError::Instantiate(e.into()))?; - let instance = store - .register_active_module(executor, &module) - .map_err(|e| EngineError::Instantiate(e.into()))?; - - // This is a hack to avoid them being dropped - // FIXME: manage their lifetimes - std::mem::forget(import); - std::mem::forget(module); - - Ok(Self { - instance, - records_cb, - params, - version, - }) - } -} - #[derive(Clone)] pub struct SmartEngineImp(); diff --git a/crates/fluvio-smartengine/src/engine/wasmedge/transforms/filter.rs b/crates/fluvio-smartengine/src/engine/wasmedge/transforms/filter.rs index b5f975a8c7..3439a864eb 100644 --- a/crates/fluvio-smartengine/src/engine/wasmedge/transforms/filter.rs +++ b/crates/fluvio-smartengine/src/engine/wasmedge/transforms/filter.rs @@ -1,5 +1,3 @@ -const FILTER_FN_NAME: &str = "filter"; - #[cfg(test)] mod test { @@ -7,6 +5,7 @@ mod test { use fluvio_smartmodule::{dataplane::smartmodule::SmartModuleInput, Record}; + use crate::engine::common::FILTER_FN_NAME; use crate::engine::metrics::SmartModuleChainMetrics; use crate::engine::{SmartEngine, SmartModuleChainBuilder}; use crate::engine::config::SmartModuleConfig; @@ -34,7 +33,7 @@ mod test { assert_eq!( chain.instances().first().expect("first").transform().name(), - super::FILTER_FN_NAME + FILTER_FN_NAME ); let metrics = SmartModuleChainMetrics::default(); @@ -93,7 +92,7 @@ mod test { let instance = chain.instances().first().expect("first"); - assert_eq!(instance.transform().name(), super::FILTER_FN_NAME); + assert_eq!(instance.transform().name(), FILTER_FN_NAME); assert!(instance.get_init().is_some()); diff --git a/crates/fluvio-smartengine/src/engine/wasmedge/transforms/mod.rs b/crates/fluvio-smartengine/src/engine/wasmedge/transforms/mod.rs index 6b8a808bd9..da681c05ce 100644 --- a/crates/fluvio-smartengine/src/engine/wasmedge/transforms/mod.rs +++ b/crates/fluvio-smartengine/src/engine/wasmedge/transforms/mod.rs @@ -1,24 +1 @@ mod filter; - -use crate::engine::{ - error::EngineError, - SmartModuleInitialData, - common::{DowncastableTransform, SimpleTransformImpl}, -}; - -use super::{WasmedgeInstance, WasmedgeContext}; -use anyhow::Result; - -pub(crate) fn create_transform( - instance: &mut WasmedgeInstance, - ctx: &mut WasmedgeContext, - _initial_data: SmartModuleInitialData, -) -> Result>> { - if let Some(tr) = SimpleTransformImpl::try_instantiate("filter", instance, ctx)? - .map(|transform| Box::new(transform) as Box>) - { - Ok(tr) - } else { - Err(EngineError::UnknownSmartModule.into()) - } -} diff --git a/crates/fluvio-smartengine/src/engine/wasmtime/imp.rs b/crates/fluvio-smartengine/src/engine/wasmtime/imp.rs new file mode 100644 index 0000000000..c2e444c161 --- /dev/null +++ b/crates/fluvio-smartengine/src/engine/wasmtime/imp.rs @@ -0,0 +1,88 @@ +use anyhow::Result; +use fluvio_protocol::{Encoder, Decoder}; +use tracing::debug; + +use std::sync::Arc; + +use fluvio_smartmodule::dataplane::smartmodule::SmartModuleExtraParams; +use wasmtime::{Instance, Store}; + +use crate::engine::{ + wasmtime::instance::RecordsCallBack, + common::{WasmFn, WasmInstance}, +}; + +pub struct WasmTimeInstance { + instance: Instance, + records_cb: Arc, + params: SmartModuleExtraParams, + version: i16, +} +pub struct WasmTimeContext { + store: Store<()>, +} + +pub type WasmTimeFn = wasmtime::TypedFunc<(i32, i32, i32), i32>; + +impl WasmInstance for WasmTimeInstance { + type Context = WasmTimeContext; + + type Func = WasmTimeFn; + + fn get_fn(&self, name: &str, ctx: &mut Self::Context) -> Result> { + match self.instance.get_func(&mut ctx.store, name) { + Some(func) => { + // check type signature + func.typed(&mut ctx.store) + .or_else(|_| func.typed(&ctx.store)) + .map(|f| Some(f)) + } + None => Ok(None), + } + } + + fn write_input( + &mut self, + input: &E, + ctx: &mut Self::Context, + ) -> anyhow::Result<(i32, i32, i32)> { + self.records_cb.clear(); + let mut input_data = Vec::new(); + input.encode(&mut input_data, self.version)?; + debug!( + len = input_data.len(), + version = self.version, + "input encoded" + ); + let array_ptr = crate::engine::wasmtime::memory::copy_memory_to_instance( + &mut ctx.store, + &self.instance, + &input_data, + )?; + let length = input_data.len(); + Ok((array_ptr as i32, length as i32, self.version as i32)) + } + + fn read_output(&mut self, ctx: &mut Self::Context) -> Result { + let bytes = self + .records_cb + .get() + .and_then(|m| m.copy_memory_from(&ctx.store).ok()) + .unwrap_or_default(); + let mut output = D::default(); + output.decode(&mut std::io::Cursor::new(bytes), self.version)?; + Ok(output) + } + + fn params(&self) -> SmartModuleExtraParams { + self.params.clone() + } +} + +impl WasmFn for WasmTimeFn { + type Context = WasmTimeContext; + + fn call(&self, ptr: i32, len: i32, version: i32, ctx: &mut Self::Context) -> Result { + WasmTimeFn::call(self, &mut ctx.store, (ptr, len, version)) + } +} diff --git a/crates/fluvio-smartengine/src/engine/wasmtime/mod.rs b/crates/fluvio-smartengine/src/engine/wasmtime/mod.rs index e949cf9a22..2ea428ea47 100644 --- a/crates/fluvio-smartengine/src/engine/wasmtime/mod.rs +++ b/crates/fluvio-smartengine/src/engine/wasmtime/mod.rs @@ -4,6 +4,7 @@ pub(crate) mod init; pub(crate) mod state; pub(crate) mod engine; pub(crate) mod instance; +pub(crate) mod imp; pub use engine::{SmartEngineImp, initialize_imp, SmartModuleChainInstanceImp}; use super::*;