diff --git a/crates/fluvio-smartengine/src/engine/common.rs b/crates/fluvio-smartengine/src/engine/common.rs new file mode 100644 index 00000000000..714ef9f9cef --- /dev/null +++ b/crates/fluvio-smartengine/src/engine/common.rs @@ -0,0 +1,190 @@ +use std::any::Any; + +use anyhow::Result; +use fluvio_protocol::{Encoder, Decoder}; +use fluvio_smartmodule::dataplane::smartmodule::{ + SmartModuleInput, SmartModuleOutput, SmartModuleTransformErrorStatus, +}; + +pub trait WasmInstance { + type Context; + type Func: WasmFn; + + fn get_fn(&self, name: &str, ctx: &mut Self::Context) -> Result>; + + fn write_input( + &mut self, + input: &E, + ctx: &mut Self::Context, + ) -> Result<(i32, i32, i32)>; + fn read_output(&mut self, ctx: &mut Self::Context) -> Result; +} + +/// All smartmodule wasm functions have the same ABI: +/// `(ptr: *mut u8, len: usize, version: i16) -> i32`, which is `(param i32 i32 i32) (result i32)` in wasm. +pub trait WasmFn { + type Context; + fn call(&self, ptr: i32, len: i32, version: i32, ctx: &mut Self::Context) -> Result; +} + +pub trait SmartModuleTransform: Send + Sync { + /// transform records + fn process( + &mut self, + input: SmartModuleInput, + instance: &mut I, + ctx: &mut I::Context, + ) -> Result; + + /// return name of transform, this is used for identifying transform and debugging + fn name(&self) -> &str; +} + +// In order turn to any, need following magic trick +pub(crate) trait DowncastableTransform: + SmartModuleTransform + Any +{ + fn as_any(&self) -> &dyn Any; +} + +impl + Any, I: WasmInstance> DowncastableTransform for T { + fn as_any(&self) -> &dyn Any { + self + } +} + +pub struct SimpleTransformImpl { + name: String, + func: F, +} + +impl + Send + Sync> SmartModuleTransform + for SimpleTransformImpl +{ + fn process( + &mut self, + input: SmartModuleInput, + instance: &mut I, + ctx: &mut I::Context, + ) -> Result { + let (ptr, len, version) = instance.write_input(&input, ctx)?; + let output = self.func.call(ptr, len, version, ctx)?; + + if output < 0 { + let internal_error = SmartModuleTransformErrorStatus::try_from(output) + .unwrap_or(SmartModuleTransformErrorStatus::UnknownError); + return Err(internal_error.into()); + } + + let output: SmartModuleOutput = instance.read_output(ctx)?; + Ok(output) + } + + fn name(&self) -> &str { + &self.name + } +} + +impl SimpleTransformImpl { + pub(crate) fn try_instantiate( + name: &str, + instance: &mut I, + ctx: &mut ::Context, + ) -> Result> + where + I: WasmInstance, + F: WasmFn, + { + let func = instance + .get_fn(name, ctx)? + .ok_or_else(|| anyhow::anyhow!("{} not found", name))?; + Ok(Some(Self { + name: name.to_owned(), + func, + })) + } +} + +mod wasmtime { + use anyhow::Result; + use fluvio_protocol::{Encoder, Decoder}; + use tracing::debug; + + use std::sync::Arc; + + use fluvio_smartmodule::dataplane::smartmodule::SmartModuleExtraParams; + use wasmtime::{Instance, Store}; + + use super::super::instance::RecordsCallBack; + + pub struct WasmTimeInstance { + instance: Instance, + records_cb: Arc, + params: SmartModuleExtraParams, + version: i16, + } + pub struct WasmTimeContext { + store: Store<()>, + } + + pub type WasmTimeFn = wasmtime::TypedFunc<(i32, i32, i32), i32>; + + impl super::WasmInstance for WasmTimeInstance { + type Context = WasmTimeContext; + + type Func = WasmTimeFn; + + fn get_fn(&self, name: &str, ctx: &mut Self::Context) -> Result> { + match self.instance.get_func(&mut ctx.store, name) { + Some(func) => { + // check type signature + func.typed(&mut ctx.store) + .or_else(|_| func.typed(&ctx.store)) + .map(|f| Some(f)) + } + None => Ok(None), + } + } + + fn write_input( + &mut self, + input: &E, + ctx: &mut Self::Context, + ) -> anyhow::Result<(i32, i32, i32)> { + self.records_cb.clear(); + let mut input_data = Vec::new(); + input.encode(&mut input_data, self.version)?; + debug!( + len = input_data.len(), + version = self.version, + "input encoded" + ); + let array_ptr = super::super::memory::copy_memory_to_instance( + &mut ctx.store, + &self.instance, + &input_data, + )?; + let length = input_data.len(); + Ok((array_ptr as i32, length as i32, self.version as i32)) + } + + fn read_output(&mut self, ctx: &mut Self::Context) -> Result { + let bytes = self + .records_cb + .get() + .and_then(|m| m.copy_memory_from(&ctx.store).ok()) + .unwrap_or_default(); + let mut output = D::default(); + output.decode(&mut std::io::Cursor::new(bytes), self.version)?; + Ok(output) + } + } + + impl super::WasmFn for WasmTimeFn { + type Context = WasmTimeContext; + + fn call(&self, ptr: i32, len: i32, version: i32, ctx: &mut Self::Context) -> Result { + WasmTimeFn::call(self, &mut ctx.store, (ptr, len, version)) + } + } +} diff --git a/crates/fluvio-smartengine/src/config.rs b/crates/fluvio-smartengine/src/engine/config.rs similarity index 100% rename from crates/fluvio-smartengine/src/config.rs rename to crates/fluvio-smartengine/src/engine/config.rs diff --git a/crates/fluvio-smartengine/src/engine.rs b/crates/fluvio-smartengine/src/engine/engine.rs similarity index 97% rename from crates/fluvio-smartengine/src/engine.rs rename to crates/fluvio-smartengine/src/engine/engine.rs index 3c628e00fdb..bf0f4183eda 100644 --- a/crates/fluvio-smartengine/src/engine.rs +++ b/crates/fluvio-smartengine/src/engine/engine.rs @@ -6,13 +6,13 @@ use wasmtime::{Engine, Module}; use fluvio_smartmodule::dataplane::smartmodule::{SmartModuleInput, SmartModuleOutput}; -use crate::config::*; -use crate::init::SmartModuleInit; -use crate::instance::{SmartModuleInstance, SmartModuleInstanceContext}; +use super::config::*; +use super::init::SmartModuleInit; +use super::instance::{SmartModuleInstance, SmartModuleInstanceContext}; -use crate::metrics::SmartModuleChainMetrics; -use crate::state::WasmState; -use crate::transforms::create_transform; +use super::metrics::SmartModuleChainMetrics; +use super::state::WasmState; +use super::transforms::create_transform; #[derive(Clone)] pub struct SmartEngine(Engine); @@ -174,7 +174,7 @@ mod chaining_test { use fluvio_smartmodule::{dataplane::smartmodule::SmartModuleInput, Record}; - use crate::{ + use super::super::{ SmartEngine, SmartModuleChainBuilder, SmartModuleConfig, SmartModuleInitialData, metrics::SmartModuleChainMetrics, }; @@ -182,7 +182,7 @@ mod chaining_test { const SM_FILTER_INIT: &str = "fluvio_smartmodule_filter_init"; const SM_MAP: &str = "fluvio_smartmodule_map"; - use crate::fixture::read_wasm_module; + use super::super::fixture::read_wasm_module; #[ignore] #[test] diff --git a/crates/fluvio-smartengine/src/error.rs b/crates/fluvio-smartengine/src/engine/error.rs similarity index 100% rename from crates/fluvio-smartengine/src/error.rs rename to crates/fluvio-smartengine/src/engine/error.rs diff --git a/crates/fluvio-smartengine/src/fixture.rs b/crates/fluvio-smartengine/src/engine/fixture.rs similarity index 100% rename from crates/fluvio-smartengine/src/fixture.rs rename to crates/fluvio-smartengine/src/engine/fixture.rs diff --git a/crates/fluvio-smartengine/src/init.rs b/crates/fluvio-smartengine/src/engine/init.rs similarity index 97% rename from crates/fluvio-smartengine/src/init.rs rename to crates/fluvio-smartengine/src/engine/init.rs index 08e6d7fa705..dbdd677cac9 100644 --- a/crates/fluvio-smartengine/src/init.rs +++ b/crates/fluvio-smartengine/src/engine/init.rs @@ -7,7 +7,7 @@ use fluvio_smartmodule::dataplane::smartmodule::{ }; use wasmtime::{AsContextMut, TypedFunc}; -use crate::instance::SmartModuleInstanceContext; +use super::instance::SmartModuleInstanceContext; pub(crate) const INIT_FN_NAME: &str = "init"; type WasmInitFn = TypedFunc<(i32, i32, u32), i32>; diff --git a/crates/fluvio-smartengine/src/instance.rs b/crates/fluvio-smartengine/src/engine/instance.rs similarity index 96% rename from crates/fluvio-smartengine/src/instance.rs rename to crates/fluvio-smartengine/src/engine/instance.rs index e76207545b5..8a6e6f43a34 100644 --- a/crates/fluvio-smartengine/src/instance.rs +++ b/crates/fluvio-smartengine/src/engine/instance.rs @@ -12,10 +12,10 @@ use fluvio_smartmodule::dataplane::smartmodule::{ SmartModuleExtraParams, SmartModuleInput, SmartModuleOutput, SmartModuleInitInput, }; -use crate::error::EngineError; -use crate::init::SmartModuleInit; -use crate::{WasmSlice, memory}; -use crate::state::WasmState; +use super::error::EngineError; +use super::init::SmartModuleInit; +use super::{WasmSlice, memory}; +use super::state::WasmState; pub(crate) struct SmartModuleInstance { ctx: SmartModuleInstanceContext, @@ -186,7 +186,7 @@ pub struct RecordsMemory { } impl RecordsMemory { - fn copy_memory_from(&self, store: impl AsContext) -> Result> { + pub fn copy_memory_from(&self, store: impl AsContext) -> Result> { let mut bytes = vec![0u8; self.len as u32 as usize]; self.memory.read(store, self.ptr as usize, &mut bytes)?; Ok(bytes) diff --git a/crates/fluvio-smartengine/src/memory.rs b/crates/fluvio-smartengine/src/engine/memory.rs similarity index 100% rename from crates/fluvio-smartengine/src/memory.rs rename to crates/fluvio-smartengine/src/engine/memory.rs diff --git a/crates/fluvio-smartengine/src/metrics.rs b/crates/fluvio-smartengine/src/engine/metrics.rs similarity index 100% rename from crates/fluvio-smartengine/src/metrics.rs rename to crates/fluvio-smartengine/src/engine/metrics.rs diff --git a/crates/fluvio-smartengine/src/engine/mod.rs b/crates/fluvio-smartengine/src/engine/mod.rs new file mode 100644 index 00000000000..331f5f1467e --- /dev/null +++ b/crates/fluvio-smartengine/src/engine/mod.rs @@ -0,0 +1,40 @@ +#[cfg(test)] +mod fixture; +mod config; + +pub type WasmSlice = (i32, i32, u32); +pub type Version = i16; + +cfg_if::cfg_if! { + // TODO: turn on this check when ready + // #[cfg(all(feature = "wasmedge_engine", feature = "wasmtime_engine"))] + // compile_error!( + // "Both `wasmedge_engine` and `wasmtime_engine` features are enabled, but \ + // only one WASM runtime is supported at a time" + // ); + + + if #[cfg(feature = "wasmtime-engine")] { + pub(crate) mod memory; + pub(crate) mod transforms; + pub(crate) mod init; + pub(crate) mod error; + pub mod metrics; + mod state; + mod engine; + pub use config::{SmartModuleConfig, SmartModuleConfigBuilder, SmartModuleConfigBuilderError, SmartModuleInitialData}; + pub mod instance; + + pub(crate) mod common; + + // TODO: move wasmtime engine to a module; or maybe add some common abstraction? + pub use engine::{SmartEngine, SmartModuleChainBuilder, SmartModuleChainInstance}; + + + } else if #[cfg(feature = "wasmedge-engine")] { + mod wasmedge_engine; + mod test_use { + pub use super::wasmedge_engine::{SmartEngine, SmartModuleChainBuilder, SmartModuleChainInstance}; + } + } +} diff --git a/crates/fluvio-smartengine/src/state.rs b/crates/fluvio-smartengine/src/engine/state.rs similarity index 100% rename from crates/fluvio-smartengine/src/state.rs rename to crates/fluvio-smartengine/src/engine/state.rs diff --git a/crates/fluvio-smartengine/src/transforms/aggregate.rs b/crates/fluvio-smartengine/src/engine/transforms/aggregate.rs similarity index 98% rename from crates/fluvio-smartengine/src/transforms/aggregate.rs rename to crates/fluvio-smartengine/src/engine/transforms/aggregate.rs index 028c5791d1b..1e64d28c2f9 100644 --- a/crates/fluvio-smartengine/src/transforms/aggregate.rs +++ b/crates/fluvio-smartengine/src/engine/transforms/aggregate.rs @@ -9,7 +9,7 @@ use fluvio_smartmodule::dataplane::smartmodule::{ SmartModuleInput, SmartModuleOutput, SmartModuleAggregateInput, SmartModuleAggregateOutput, SmartModuleTransformErrorStatus, }; -use crate::{ +use super::super::{ instance::{SmartModuleInstanceContext, SmartModuleTransform}, SmartModuleInitialData, state::WasmState, @@ -111,14 +111,14 @@ mod test { Record, }; - use crate::{ + use crate::engine::{ SmartEngine, SmartModuleChainBuilder, SmartModuleConfig, SmartModuleInitialData, metrics::SmartModuleChainMetrics, }; const SM_AGGEGRATE: &str = "fluvio_smartmodule_aggregate"; - use crate::fixture::read_wasm_module; + use crate::engine::fixture::read_wasm_module; #[ignore] #[test] diff --git a/crates/fluvio-smartengine/src/transforms/array_map.rs b/crates/fluvio-smartengine/src/engine/transforms/array_map.rs similarity index 95% rename from crates/fluvio-smartengine/src/transforms/array_map.rs rename to crates/fluvio-smartengine/src/engine/transforms/array_map.rs index 61272916000..a9c58a8b9a6 100644 --- a/crates/fluvio-smartengine/src/transforms/array_map.rs +++ b/crates/fluvio-smartengine/src/engine/transforms/array_map.rs @@ -8,14 +8,14 @@ mod test { Record, }; - use crate::{ + use crate::engine::{ SmartEngine, SmartModuleChainBuilder, SmartModuleConfig, metrics::SmartModuleChainMetrics, transforms::simple_transform::ARRAY_MAP_FN_NAME, }; const SM_ARRAY_MAP: &str = "fluvio_smartmodule_array_map_array"; - use crate::fixture::read_wasm_module; + use crate::engine::fixture::read_wasm_module; #[ignore] #[test] diff --git a/crates/fluvio-smartengine/src/transforms/filter.rs b/crates/fluvio-smartengine/src/engine/transforms/filter.rs similarity index 98% rename from crates/fluvio-smartengine/src/transforms/filter.rs rename to crates/fluvio-smartengine/src/engine/transforms/filter.rs index 0952cbdda1d..6da49c5edf6 100644 --- a/crates/fluvio-smartengine/src/transforms/filter.rs +++ b/crates/fluvio-smartengine/src/engine/transforms/filter.rs @@ -8,7 +8,7 @@ mod test { Record, }; - use crate::{ + use crate::engine::{ SmartEngine, SmartModuleChainBuilder, SmartModuleConfig, metrics::SmartModuleChainMetrics, transforms::simple_transform::FILTER_FN_NAME, }; @@ -16,7 +16,7 @@ mod test { const SM_FILTER: &str = "fluvio_smartmodule_filter"; const SM_FILTER_INIT: &str = "fluvio_smartmodule_filter_init"; - use crate::fixture::read_wasm_module; + use crate::engine::fixture::read_wasm_module; #[ignore] #[test] diff --git a/crates/fluvio-smartengine/src/transforms/filter_map.rs b/crates/fluvio-smartengine/src/engine/transforms/filter_map.rs similarity index 94% rename from crates/fluvio-smartengine/src/transforms/filter_map.rs rename to crates/fluvio-smartengine/src/engine/transforms/filter_map.rs index 245d74f5813..ed95b8bfd90 100644 --- a/crates/fluvio-smartengine/src/transforms/filter_map.rs +++ b/crates/fluvio-smartengine/src/engine/transforms/filter_map.rs @@ -8,14 +8,14 @@ mod test { Record, }; - use crate::{ + use crate::engine::{ SmartEngine, SmartModuleChainBuilder, SmartModuleConfig, metrics::SmartModuleChainMetrics, transforms::simple_transform::FILTER_MAP_FN_NAME, }; const SM_FILTER_MAP: &str = "fluvio_smartmodule_filter_map"; - use crate::fixture::read_wasm_module; + use crate::engine::fixture::read_wasm_module; #[ignore] #[test] diff --git a/crates/fluvio-smartengine/src/transforms/map.rs b/crates/fluvio-smartengine/src/engine/transforms/map.rs similarity index 94% rename from crates/fluvio-smartengine/src/transforms/map.rs rename to crates/fluvio-smartengine/src/engine/transforms/map.rs index d94eea75869..9400e2f3803 100644 --- a/crates/fluvio-smartengine/src/transforms/map.rs +++ b/crates/fluvio-smartengine/src/engine/transforms/map.rs @@ -8,11 +8,11 @@ mod test { Record, }; - use crate::{ + use crate::engine::{ SmartEngine, SmartModuleChainBuilder, SmartModuleConfig, metrics::SmartModuleChainMetrics, transforms::simple_transform::MAP_FN_NAME, }; - use crate::fixture::read_wasm_module; + use crate::engine::fixture::read_wasm_module; const SM_MAP: &str = "fluvio_smartmodule_map"; diff --git a/crates/fluvio-smartengine/src/transforms/mod.rs b/crates/fluvio-smartengine/src/engine/transforms/mod.rs similarity index 98% rename from crates/fluvio-smartengine/src/transforms/mod.rs rename to crates/fluvio-smartengine/src/engine/transforms/mod.rs index b5a90cc16f2..4bcb18390b5 100644 --- a/crates/fluvio-smartengine/src/transforms/mod.rs +++ b/crates/fluvio-smartengine/src/engine/transforms/mod.rs @@ -11,7 +11,7 @@ mod instance { use anyhow::{Result}; use wasmtime::AsContextMut; - use crate::{ + use crate::engine::{ instance::{SmartModuleInstanceContext, DowncastableTransform}, error::EngineError, SmartModuleInitialData, diff --git a/crates/fluvio-smartengine/src/transforms/simple_transform.rs b/crates/fluvio-smartengine/src/engine/transforms/simple_transform.rs similarity index 99% rename from crates/fluvio-smartengine/src/transforms/simple_transform.rs rename to crates/fluvio-smartengine/src/engine/transforms/simple_transform.rs index 0aebe92d311..1fe4be8d31e 100644 --- a/crates/fluvio-smartengine/src/transforms/simple_transform.rs +++ b/crates/fluvio-smartengine/src/engine/transforms/simple_transform.rs @@ -4,7 +4,7 @@ use fluvio_smartmodule::dataplane::smartmodule::{ use wasmtime::{TypedFunc, AsContextMut}; use anyhow::Result; -use crate::{ +use crate::engine::{ instance::{SmartModuleInstanceContext, SmartModuleTransform}, state::WasmState, }; diff --git a/crates/fluvio-smartengine/src/wasmedge_engine/init.rs b/crates/fluvio-smartengine/src/engine/wasmedge_engine/init.rs similarity index 100% rename from crates/fluvio-smartengine/src/wasmedge_engine/init.rs rename to crates/fluvio-smartengine/src/engine/wasmedge_engine/init.rs diff --git a/crates/fluvio-smartengine/src/wasmedge_engine/instance.rs b/crates/fluvio-smartengine/src/engine/wasmedge_engine/instance.rs similarity index 81% rename from crates/fluvio-smartengine/src/wasmedge_engine/instance.rs rename to crates/fluvio-smartengine/src/engine/wasmedge_engine/instance.rs index 8a78836890d..976f5f1fdc0 100644 --- a/crates/fluvio-smartengine/src/wasmedge_engine/instance.rs +++ b/crates/fluvio-smartengine/src/engine/wasmedge_engine/instance.rs @@ -7,7 +7,9 @@ use wasmedge_sdk::{ Memory, Module, Store, WasmValue, }; +use super::{WasmedgeInstance, WasmedgeContext}; use super::init::SmartModuleInit; +use crate::common::DowncastableTransform; use crate::error::EngineError; use crate::metrics::SmartModuleChainMetrics; use crate::wasmedge_engine::memory; @@ -18,16 +20,16 @@ use std::any::Any; use std::fmt::{self, Debug}; use std::sync::{Arc, Mutex}; -pub struct SmartModuleInstance { - ctx: SmartModuleInstanceContext, - transform: Box, +pub(crate) struct SmartModuleInstance { + pub instance: WasmedgeInstance, + pub transform: Box>, // init: Option, } impl SmartModuleInstance { #[cfg(test)] #[allow(clippy::borrowed_box)] - pub(crate) fn transform(&self) -> &Box { + pub(crate) fn transform(&self) -> &Box> { &self.transform } @@ -37,12 +39,12 @@ impl SmartModuleInstance { } pub(crate) fn new( - ctx: SmartModuleInstanceContext, + instance: WasmedgeInstance, // init: Option, - transform: Box, + transform: Box>, ) -> Self { Self { - ctx, + instance, // init, transform, } @@ -51,9 +53,9 @@ impl SmartModuleInstance { pub(crate) fn process( &mut self, input: SmartModuleInput, - engine: &mut Executor, + ctx: &mut WasmedgeContext, ) -> Result { - self.transform.process(input, &mut self.ctx, engine) + self.transform.process(input, &mut self.instance, ctx) } } @@ -129,7 +131,7 @@ impl SmartModuleInstanceContext { pub(crate) fn write_input( &mut self, input: &E, - engine: &mut impl Engine, + engine: &impl Engine, ) -> Result> { self.records_cb.clear(); let mut input_data = Vec::new(); @@ -162,15 +164,15 @@ impl SmartModuleInstanceContext { // TODO: revise later to see whether Clone is necessary #[derive(Clone)] -pub struct RecordsMemory { - ptr: u32, - len: u32, - memory: Memory, +pub(crate) struct RecordsMemory { + pub ptr: u32, + pub len: u32, + pub memory: Memory, } impl RecordsMemory { - fn copy_memory_from(&self) -> Result> { - let mut bytes = self.memory.read(self.ptr, self.len)?; + pub(crate) fn copy_memory_from(&self) -> Result> { + let bytes = self.memory.read(self.ptr, self.len)?; Ok(bytes) } } @@ -197,27 +199,3 @@ impl RecordsCallBack { reader.clone() } } - -pub(crate) trait SmartModuleTransform: Send + Sync { - /// transform records - fn process( - &mut self, - input: SmartModuleInput, - ctx: &mut SmartModuleInstanceContext, - engine: &mut Executor, - ) -> Result; - - /// return name of transform, this is used for identifying transform and debugging - fn name(&self) -> &str; -} - -// In order turn to any, need following magic trick -pub(crate) trait DowncastableTransform: SmartModuleTransform + Any { - fn as_any(&self) -> &dyn Any; -} - -impl DowncastableTransform for T { - fn as_any(&self) -> &dyn Any { - self - } -} diff --git a/crates/fluvio-smartengine/src/wasmedge_engine/memory.rs b/crates/fluvio-smartengine/src/engine/wasmedge_engine/memory.rs similarity index 98% rename from crates/fluvio-smartengine/src/wasmedge_engine/memory.rs rename to crates/fluvio-smartengine/src/engine/wasmedge_engine/memory.rs index 092216666a5..be643c9324f 100644 --- a/crates/fluvio-smartengine/src/wasmedge_engine/memory.rs +++ b/crates/fluvio-smartengine/src/engine/wasmedge_engine/memory.rs @@ -10,7 +10,7 @@ const MEMORY: &str = "memory"; /// Copy a byte array into an instance's linear memory /// and return the offset relative to the module's memory. pub(crate) fn copy_memory_to_instance( - engine: &mut impl Engine, + engine: &impl Engine, instance: &Instance, bytes: &[u8], ) -> Result { diff --git a/crates/fluvio-smartengine/src/engine/wasmedge_engine/mod.rs b/crates/fluvio-smartengine/src/engine/wasmedge_engine/mod.rs new file mode 100644 index 00000000000..7a4716ebf94 --- /dev/null +++ b/crates/fluvio-smartengine/src/engine/wasmedge_engine/mod.rs @@ -0,0 +1,292 @@ +mod instance; +mod transforms; +use instance::*; +mod init; +use init::*; +mod memory; +use memory::*; + +use tracing::debug; +use wasmedge_sdk::error::HostFuncError; +use wasmedge_sdk::types::Val; +use wasmedge_sdk::{ + Executor, Func, Instance, Memory, Module, Store, CallingFrame, WasmValue, Caller, + ImportObjectBuilder, +}; + +use crate::common::WasmFn; +use crate::config::*; +use crate::error::EngineError; +use crate::metrics::SmartModuleChainMetrics; +use anyhow::Result; +use fluvio_smartmodule::dataplane::smartmodule::{ + SmartModuleInput, SmartModuleOutput, SmartModuleExtraParams, +}; +use std::any::Any; +use std::fmt::{self, Debug}; +use std::sync::{Arc, Mutex}; + +use self::transforms::create_transform; + +pub struct WasmedgeInstance { + instance: wasmedge_sdk::Instance, + records_cb: Arc, + params: SmartModuleExtraParams, + version: i16, +} + +pub struct WasmedgeContext { + engine: Executor, +} + +pub type WasmedgeFn = Func; + +impl super::common::WasmInstance for WasmedgeInstance { + type Context = WasmedgeContext; + + type Func = WasmedgeFn; + + fn get_fn(&self, name: &str, _ctx: &mut Self::Context) -> Result> { + match self.instance.func(name) { + // check type signature + Some(func) => Ok(Some(func)), + None => Ok(None), + } + } + + fn write_input( + &mut self, + input: &E, + ctx: &mut Self::Context, + ) -> Result<(i32, i32, i32)> { + self.records_cb.clear(); + let mut input_data = Vec::new(); + input.encode(&mut input_data, self.version)?; + debug!( + len = input_data.len(), + version = self.version, + "input encoded" + ); + let array_ptr = + memory::copy_memory_to_instance(&mut ctx.engine, &self.instance, &input_data)?; + let length = input_data.len(); + Ok((array_ptr as i32, length as i32, self.version as i32)) + } + + fn read_output( + &mut self, + _ctx: &mut Self::Context, + ) -> Result { + let bytes = self + .records_cb + .get() + .and_then(|m| m.copy_memory_from().ok()) + .unwrap_or_default(); + let mut output = D::default(); + output.decode(&mut std::io::Cursor::new(bytes), self.version)?; + Ok(output) + } +} + +impl WasmFn for WasmedgeFn { + type Context = WasmedgeContext; + + fn call(&self, ptr: i32, len: i32, version: i32, ctx: &mut Self::Context) -> Result { + let res = self.call( + &ctx.engine, + vec![ + Val::I32(ptr as i32).into(), + Val::I32(len as i32).into(), + Val::I32(version as i32).into(), + ], + )?; + Ok(res[0].to_i32()) + } +} + +impl WasmedgeInstance { + /// instantiate new module instance that contain context + pub(crate) fn instantiate( + store: &mut Store, + executor: &mut Executor, + module: Module, + params: SmartModuleExtraParams, + version: i16, + ) -> Result { + debug!("creating WasmModuleInstance"); + let cb = Arc::new(RecordsCallBack::new()); + let records_cb = cb.clone(); + + // See crates/fluvio-smartmodule-derive/src/generator/transform.rs for copy_records + let copy_records_fn = move |caller: CallingFrame, + inputs: Vec| + -> Result, HostFuncError> { + assert_eq!(inputs.len(), 2); + let ptr = inputs[0].to_i32() as u32; + let len = inputs[1].to_i32() as u32; + + debug!(len, "callback from wasm filter"); + let caller = Caller::new(caller); + let memory = caller.memory(0).unwrap(); + + let records = RecordsMemory { ptr, len, memory }; + cb.set(records); + Ok(vec![]) + }; + + let import = ImportObjectBuilder::new() + .with_func::<(i32, i32), ()>("copy_records", copy_records_fn) + .map_err(|e| EngineError::Instantiate(e.into()))? + .build("env") + .map_err(|e| EngineError::Instantiate(e.into()))?; + + debug!("instantiating WASMtime"); + store + .register_import_module(executor, &import) + .map_err(|e| EngineError::Instantiate(e.into()))?; + let instance = store + .register_active_module(executor, &module) + .map_err(|e| EngineError::Instantiate(e.into()))?; + + // This is a hack to avoid them being dropped + // FIXME: manage their lifetimes + std::mem::forget(import); + std::mem::forget(module); + + Ok(Self { + instance, + records_cb, + params, + version, + }) + } +} + +pub struct SmartEngine(); + +#[allow(clippy::new_without_default)] +impl SmartEngine { + pub fn new() -> Self { + Self() + } +} + +impl Debug for SmartEngine { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "SmartModuleEngine") + } +} + +/// Building SmartModule +#[derive(Default)] +pub struct SmartModuleChainBuilder { + smart_modules: Vec<(SmartModuleConfig, Vec)>, +} + +impl SmartModuleChainBuilder { + /// Add SmartModule with a single transform and init + pub fn add_smart_module(&mut self, config: SmartModuleConfig, bytes: Vec) { + self.smart_modules.push((config, bytes)) + } + + /// stop adding smartmodule and return SmartModuleChain that can be executed + pub fn initialize(self, _engine: &SmartEngine) -> Result { + let mut executor = Executor::new(None, None).expect("Failed to create WasmEdge executor"); + let mut store = Store::new().expect("Failed to create WasmEdge store"); + let mut ctx = WasmedgeContext { engine: executor }; + + let mut instances = Vec::with_capacity(self.smart_modules.len()); + // let mut state = engine.new_state(); + for (config, bytes) in self.smart_modules { + let module = Module::from_bytes(None, bytes)?; + let version = config.version(); + let mut instance = WasmedgeInstance::instantiate( + &mut store, + &mut ctx.engine, + module, + config.params, + version, + )?; + + // let init = SmartModuleInit::try_instantiate(&ctx, &mut state)?; + let transform = create_transform(&mut instance, &mut ctx, config.initial_data)?; + // instance.init(&mut state)?; + instances.push(SmartModuleInstance { + instance, + transform, + }); + } + + Ok(SmartModuleChainInstance { ctx, instances }) + } +} + +impl>> From<(SmartModuleConfig, T)> for SmartModuleChainBuilder { + fn from(pair: (SmartModuleConfig, T)) -> Self { + let mut result = Self::default(); + result.add_smart_module(pair.0, pair.1.into()); + result + } +} + +/// SmartModule Chain Instance that can be executed +pub struct SmartModuleChainInstance { + ctx: WasmedgeContext, + instances: Vec, +} + +impl Debug for SmartModuleChainInstance { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "SmartModuleChainInstance") + } +} + +impl SmartModuleChainInstance { + /// A single record is processed thru all smartmodules in the chain. + /// The output of one smartmodule is the input of the next smartmodule. + /// A single record may result in multiple records. + /// The output of the last smartmodule is added to the output of the chain. + pub fn process( + &mut self, + input: SmartModuleInput, + metric: &SmartModuleChainMetrics, + ) -> Result { + let raw_len = input.raw_bytes().len(); + debug!(raw_len, "sm raw input"); + metric.add_bytes_in(raw_len as u64); + + let base_offset = input.base_offset(); + + if let Some((last, instances)) = self.instances.split_last_mut() { + let mut next_input = input; + + for instance in instances { + // pass raw inputs to transform instance + // each raw input may result in multiple records + // self.store.top_up_fuel(); + let output = instance.process(next_input, &mut self.ctx)?; + + if output.error.is_some() { + // encountered error, we stop processing and return partial output + return Ok(output); + } else { + next_input = output.successes.try_into()?; + next_input.set_base_offset(base_offset); + } + } + + let output = last.process(next_input, &mut self.ctx)?; + let records_out = output.successes.len(); + metric.add_records_out(records_out as u64); + debug!(records_out, "sm records out"); + Ok(output) + } else { + Ok(SmartModuleOutput::new(input.try_into()?)) + } + } + + #[cfg(test)] + pub(crate) fn instances(&self) -> &Vec { + &self.instances + } +} diff --git a/crates/fluvio-smartengine/src/wasmedge_engine/transforms/filter.rs b/crates/fluvio-smartengine/src/engine/wasmedge_engine/transforms/filter.rs similarity index 72% rename from crates/fluvio-smartengine/src/wasmedge_engine/transforms/filter.rs rename to crates/fluvio-smartengine/src/engine/wasmedge_engine/transforms/filter.rs index 9639cc5cca1..34ac272fa91 100644 --- a/crates/fluvio-smartengine/src/wasmedge_engine/transforms/filter.rs +++ b/crates/fluvio-smartengine/src/engine/wasmedge_engine/transforms/filter.rs @@ -1,66 +1,5 @@ -use std::fmt::Debug; -use std::{convert::TryFrom, sync::Arc}; - -use anyhow::Result; -use wasmedge_sdk::{Engine, Executor, Func, Instance}; - -use fluvio_smartmodule::dataplane::smartmodule::{ - SmartModuleInput, SmartModuleOutput, SmartModuleTransformErrorStatus, -}; - -use crate::wasmedge_engine::instance::SmartModuleInstanceContext; -use crate::wasmedge_engine::SmartModuleTransform; - const FILTER_FN_NAME: &str = "filter"; -type WasmFilterFn = Func; - -pub(crate) struct SmartModuleFilter(WasmFilterFn); - -impl Debug for SmartModuleFilter { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "FilterFn") - } -} - -impl SmartModuleFilter { - /// Try to create filter by matching function, if function is not found, then return empty - pub fn try_instantiate(ctx: &SmartModuleInstanceContext) -> Result> { - match ctx.get_wasm_func(FILTER_FN_NAME) { - // check type signature - Some(func) => Ok(Some(Self(func))), - None => Ok(None), - } - } -} - -impl SmartModuleTransform for SmartModuleFilter { - fn process( - &mut self, - input: SmartModuleInput, - ctx: &mut SmartModuleInstanceContext, - engine: &mut Executor, - ) -> Result { - let args = ctx.write_input(&input, engine)?; - // FIXME: SIGSEGV here! - let filter_output = self.0.call(engine, args)?; - let filter_output = filter_output[0].to_i32(); - - if filter_output < 0 { - let internal_error = SmartModuleTransformErrorStatus::try_from(filter_output) - .unwrap_or(SmartModuleTransformErrorStatus::UnknownError); - return Err(internal_error.into()); - } - - let output: SmartModuleOutput = ctx.read_output()?; - Ok(output) - } - - fn name(&self) -> &str { - FILTER_FN_NAME - } -} - #[cfg(test)] mod test { diff --git a/crates/fluvio-smartengine/src/engine/wasmedge_engine/transforms/mod.rs b/crates/fluvio-smartengine/src/engine/wasmedge_engine/transforms/mod.rs new file mode 100644 index 00000000000..37ca7271e13 --- /dev/null +++ b/crates/fluvio-smartengine/src/engine/wasmedge_engine/transforms/mod.rs @@ -0,0 +1,24 @@ +mod filter; + +use crate::{ + error::EngineError, + SmartModuleInitialData, + common::{DowncastableTransform, SimpleTransformImpl}, +}; + +use super::{WasmedgeInstance, WasmedgeContext}; +use anyhow::Result; + +pub(crate) fn create_transform( + instance: &mut WasmedgeInstance, + ctx: &mut WasmedgeContext, + initial_data: SmartModuleInitialData, +) -> Result>> { + if let Some(tr) = SimpleTransformImpl::try_instantiate("filter", instance, ctx)? + .map(|transform| Box::new(transform) as Box>) + { + Ok(tr) + } else { + Err(EngineError::UnknownSmartModule.into()) + } +} diff --git a/crates/fluvio-smartengine/src/lib.rs b/crates/fluvio-smartengine/src/lib.rs index 9037727eb38..5785467dc5b 100644 --- a/crates/fluvio-smartengine/src/lib.rs +++ b/crates/fluvio-smartengine/src/lib.rs @@ -1,40 +1,10 @@ -// TODO: turn on this check when ready -// #[cfg(all(feature = "wasmedge_engine", feature = "wasmtime_engine"))] -// compile_error!( -// "Both `wasmedge_engine` and `wasmtime_engine` features are enabled, but \ -// only one WASM runtime is supported at a time" -// ); - cfg_if::cfg_if! { if #[cfg(feature = "engine")] { - pub(crate) mod memory; - pub(crate) mod transforms; - pub(crate) mod init; - pub(crate) mod error; - pub mod metrics; - mod state; - mod config; mod engine; - pub use config::{SmartModuleConfig, SmartModuleConfigBuilder, SmartModuleConfigBuilderError, SmartModuleInitialData}; - pub mod instance; - - // TODO: move wasmtime engine to a module; or maybe add some common abstraction? - #[cfg(feature = "wasmtime-engine")] - pub use engine::{SmartEngine, SmartModuleChainBuilder, SmartModuleChainInstance}; - - #[cfg(feature = "wasmedge-engine")] - mod wasmedge_engine; - #[cfg(feature = "wasmedge-engine")] - mod test_use { - pub use super::wasmedge_engine::{SmartEngine, SmartModuleChainBuilder, SmartModuleChainInstance}; - } + pub use engine::*; } } + #[cfg(feature = "transformation")] pub mod transformation; -pub type WasmSlice = (i32, i32, u32); -pub type Version = i16; - -#[cfg(test)] -mod fixture; diff --git a/crates/fluvio-smartengine/src/wasmedge_engine/mod.rs b/crates/fluvio-smartengine/src/wasmedge_engine/mod.rs deleted file mode 100644 index 08bfedabd7a..00000000000 --- a/crates/fluvio-smartengine/src/wasmedge_engine/mod.rs +++ /dev/null @@ -1,148 +0,0 @@ -mod instance; -mod transforms; -use instance::*; -mod init; -use init::*; -mod memory; -use memory::*; - -use tracing::debug; -use wasmedge_sdk::{Executor, Func, Instance, Memory, Module, Store}; - -use crate::config::*; -use crate::error::EngineError; -use crate::metrics::SmartModuleChainMetrics; -use anyhow::Result; -use fluvio_smartmodule::dataplane::smartmodule::{SmartModuleInput, SmartModuleOutput}; -use std::any::Any; -use std::fmt::{self, Debug}; -use std::sync::{Arc, Mutex}; - -use self::transforms::create_transform; - -pub struct SmartEngine(); - -#[allow(clippy::new_without_default)] -impl SmartEngine { - pub fn new() -> Self { - Self() - } -} - -impl Debug for SmartEngine { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "SmartModuleEngine") - } -} - -/// Building SmartModule -#[derive(Default)] -pub struct SmartModuleChainBuilder { - smart_modules: Vec<(SmartModuleConfig, Vec)>, -} - -impl SmartModuleChainBuilder { - /// Add SmartModule with a single transform and init - pub fn add_smart_module(&mut self, config: SmartModuleConfig, bytes: Vec) { - self.smart_modules.push((config, bytes)) - } - - /// stop adding smartmodule and return SmartModuleChain that can be executed - pub fn initialize(self, _engine: &SmartEngine) -> Result { - let mut executor = Executor::new(None, None).expect("Failed to create WasmEdge executor"); - let mut store = Store::new().expect("Failed to create WasmEdge store"); - - let mut instances = Vec::with_capacity(self.smart_modules.len()); - // let mut state = engine.new_state(); - for (config, bytes) in self.smart_modules { - let module = Module::from_bytes(None, bytes)?; - let version = config.version(); - let ctx = SmartModuleInstanceContext::instantiate( - &mut store, - &mut executor, - module, - version, - )?; - // let init = SmartModuleInit::try_instantiate(&ctx, &mut state)?; - let transform = create_transform(&ctx, config.initial_data)?; - let instance = SmartModuleInstance::new(ctx, transform); - // instance.init(&mut state)?; - instances.push(instance); - } - - Ok(SmartModuleChainInstance { - engine: executor, - instances, - }) - } -} - -impl>> From<(SmartModuleConfig, T)> for SmartModuleChainBuilder { - fn from(pair: (SmartModuleConfig, T)) -> Self { - let mut result = Self::default(); - result.add_smart_module(pair.0, pair.1.into()); - result - } -} - -/// SmartModule Chain Instance that can be executed -pub struct SmartModuleChainInstance { - engine: Executor, - instances: Vec, -} - -impl Debug for SmartModuleChainInstance { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "SmartModuleChainInstance") - } -} - -impl SmartModuleChainInstance { - /// A single record is processed thru all smartmodules in the chain. - /// The output of one smartmodule is the input of the next smartmodule. - /// A single record may result in multiple records. - /// The output of the last smartmodule is added to the output of the chain. - pub fn process( - &mut self, - input: SmartModuleInput, - metric: &SmartModuleChainMetrics, - ) -> Result { - let raw_len = input.raw_bytes().len(); - debug!(raw_len, "sm raw input"); - metric.add_bytes_in(raw_len as u64); - - let base_offset = input.base_offset(); - - if let Some((last, instances)) = self.instances.split_last_mut() { - let mut next_input = input; - - for instance in instances { - // pass raw inputs to transform instance - // each raw input may result in multiple records - // self.store.top_up_fuel(); - let output = instance.process(next_input, &mut self.engine)?; - - if output.error.is_some() { - // encountered error, we stop processing and return partial output - return Ok(output); - } else { - next_input = output.successes.try_into()?; - next_input.set_base_offset(base_offset); - } - } - - let output = last.process(next_input, &mut self.engine)?; - let records_out = output.successes.len(); - metric.add_records_out(records_out as u64); - debug!(records_out, "sm records out"); - Ok(output) - } else { - Ok(SmartModuleOutput::new(input.try_into()?)) - } - } - - #[cfg(test)] - pub(crate) fn instances(&self) -> &Vec { - &self.instances - } -} diff --git a/crates/fluvio-smartengine/src/wasmedge_engine/transforms/mod.rs b/crates/fluvio-smartengine/src/wasmedge_engine/transforms/mod.rs deleted file mode 100644 index cf10f5d2499..00000000000 --- a/crates/fluvio-smartengine/src/wasmedge_engine/transforms/mod.rs +++ /dev/null @@ -1,21 +0,0 @@ -mod filter; - -use crate::{error::EngineError, SmartModuleInitialData}; - -use self::filter::SmartModuleFilter; -use super::instance::{DowncastableTransform, SmartModuleInstanceContext}; -use anyhow::Result; - -pub(crate) fn create_transform( - ctx: &SmartModuleInstanceContext, - initial_data: SmartModuleInitialData, - // store: &mut impl AsContextMut, -) -> Result> { - if let Some(tr) = SmartModuleFilter::try_instantiate(ctx)? - .map(|transform| Box::new(transform) as Box) - { - Ok(tr) - } else { - Err(EngineError::UnknownSmartModule.into()) - } -}