Skip to content

Commit

Permalink
make create_transform common & reorg trait imp to imp
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Apr 18, 2023
1 parent 11e1751 commit abd4fb6
Show file tree
Hide file tree
Showing 9 changed files with 320 additions and 274 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

149 changes: 54 additions & 95 deletions crates/fluvio-smartengine/src/engine/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@ use fluvio_smartmodule::dataplane::smartmodule::{
SmartModuleExtraParams,
};

pub trait WasmInstance {
use crate::SmartModuleInitialData;

use super::error::EngineError;

pub(crate) trait WasmInstance {
type Context;
type Func: WasmFn<Context = Self::Context>;
type Func: WasmFn<Context = Self::Context> + Send + Sync + 'static;

fn params(&self) -> SmartModuleExtraParams;

Expand All @@ -26,12 +30,12 @@ pub trait WasmInstance {

/// All smartmodule wasm functions have the same ABI:
/// `(ptr: *mut u8, len: usize, version: i16) -> i32`, which is `(param i32 i32 i32) (result i32)` in wasm.
pub trait WasmFn {
pub(crate) trait WasmFn {
type Context;
fn call(&self, ptr: i32, len: i32, version: i32, ctx: &mut Self::Context) -> Result<i32>;
}

pub trait SmartModuleTransform<I: WasmInstance>: Send + Sync {
pub(crate) trait SmartModuleTransform<I: WasmInstance>: Send + Sync {
/// transform records
fn process(
&mut self,
Expand All @@ -57,13 +61,13 @@ impl<T: SmartModuleTransform<I> + Any, I: WasmInstance> DowncastableTransform<I>
}
}

pub struct SimpleTransformImpl<F: WasmFn + Send + Sync> {
pub(crate) struct SimpleTransform<F: WasmFn + Send + Sync> {
name: String,
func: F,
}

impl<I: WasmInstance, F: WasmFn<Context = I::Context> + Send + Sync> SmartModuleTransform<I>
for SimpleTransformImpl<F>
for SimpleTransform<F>
{
fn process(
&mut self,
Expand All @@ -89,15 +93,15 @@ impl<I: WasmInstance, F: WasmFn<Context = I::Context> + Send + Sync> SmartModule
}
}

impl<F: WasmFn + Send + Sync> SimpleTransformImpl<F> {
pub(crate) fn try_instantiate<I>(
impl<F: WasmFn + Send + Sync> SimpleTransform<F> {
pub(crate) fn try_instantiate<I, C>(
name: &str,
instance: &mut I,
ctx: &mut <I as WasmInstance>::Context,
ctx: &mut C,
) -> Result<Option<Self>>
where
I: WasmInstance<Func = F>,
F: WasmFn<Context = I::Context>,
I: WasmInstance<Func = F, Context = C>,
F: WasmFn<Context = C>,
{
let func = instance
.get_fn(name, ctx)?
Expand All @@ -109,6 +113,45 @@ impl<F: WasmFn + Send + Sync> SimpleTransformImpl<F> {
}
}

pub(crate) const FILTER_FN_NAME: &str = "filter";
pub(crate) const MAP_FN_NAME: &str = "map";
pub(crate) const FILTER_MAP_FN_NAME: &str = "filter_map";
pub(crate) const ARRAY_MAP_FN_NAME: &str = "array_map";

pub(crate) fn create_transform<I, C>(
instance: &mut I,
ctx: &mut C,
_initial_data: SmartModuleInitialData,
) -> Result<Box<dyn DowncastableTransform<I>>>
where
I: WasmInstance<Context = C>,
{
if let Some(tr) = SimpleTransform::try_instantiate(FILTER_FN_NAME, instance, ctx)?
.map(|transform| Box::new(transform) as Box<dyn DowncastableTransform<I>>)
{
Ok(tr)
} else if let Some(tr) = SimpleTransform::try_instantiate(MAP_FN_NAME, instance, ctx)?
.map(|transform| Box::new(transform) as Box<dyn DowncastableTransform<I>>)
{
Ok(tr)
} else if let Some(tr) = SimpleTransform::try_instantiate(FILTER_MAP_FN_NAME, instance, ctx)?
.map(|transform| Box::new(transform) as Box<dyn DowncastableTransform<I>>)
{
Ok(tr)
} else if let Some(tr) = SimpleTransform::try_instantiate(ARRAY_MAP_FN_NAME, instance, ctx)?
.map(|transform| Box::new(transform) as Box<dyn DowncastableTransform<I>>)
{
Ok(tr)
// TODO: AGGREGATE
// } else if let Some(tr) = SmartModuleAggregate::try_instantiate(ctx, initial_data, store)?
// .map(|transform| Box::new(transform) as Box<dyn DowncastableTransform<I>>)
// {
// Ok(tr)
} else {
Err(EngineError::UnknownSmartModule.into())
}
}

pub(crate) const INIT_FN_NAME: &str = "init";

pub(crate) struct SmartModuleInit<F: WasmFn>(F);
Expand Down Expand Up @@ -220,87 +263,3 @@ impl<I: WasmInstance<Func = F>, F: WasmFn + Send + Sync> SmartModuleInstance<I,
}
}
}

mod wasmtime {
use anyhow::Result;
use fluvio_protocol::{Encoder, Decoder};
use tracing::debug;

use std::sync::Arc;

use fluvio_smartmodule::dataplane::smartmodule::SmartModuleExtraParams;
use wasmtime::{Instance, Store};

use crate::engine::wasmtime::instance::RecordsCallBack;

pub struct WasmTimeInstance {
instance: Instance,
records_cb: Arc<RecordsCallBack>,
params: SmartModuleExtraParams,
version: i16,
}
pub struct WasmTimeContext {
store: Store<()>,
}

pub type WasmTimeFn = wasmtime::TypedFunc<(i32, i32, i32), i32>;

impl super::WasmInstance for WasmTimeInstance {
type Context = WasmTimeContext;

type Func = WasmTimeFn;

fn get_fn(&self, name: &str, ctx: &mut Self::Context) -> Result<Option<Self::Func>> {
match self.instance.get_func(&mut ctx.store, name) {
Some(func) => {
// check type signature
func.typed(&mut ctx.store)
.or_else(|_| func.typed(&ctx.store))
.map(|f| Some(f))
}
None => Ok(None),
}
}

fn write_input<E: Encoder>(
&mut self,
input: &E,
ctx: &mut Self::Context,
) -> anyhow::Result<(i32, i32, i32)> {
self.records_cb.clear();
let mut input_data = Vec::new();
input.encode(&mut input_data, self.version)?;
debug!(
len = input_data.len(),
version = self.version,
"input encoded"
);
let array_ptr = crate::engine::wasmtime::memory::copy_memory_to_instance(
&mut ctx.store,
&self.instance,
&input_data,
)?;
let length = input_data.len();
Ok((array_ptr as i32, length as i32, self.version as i32))
}

fn read_output<D: Decoder + Default>(&mut self, ctx: &mut Self::Context) -> Result<D> {
let bytes = self
.records_cb
.get()
.and_then(|m| m.copy_memory_from(&ctx.store).ok())
.unwrap_or_default();
let mut output = D::default();
output.decode(&mut std::io::Cursor::new(bytes), self.version)?;
Ok(output)
}
}

impl super::WasmFn for WasmTimeFn {
type Context = WasmTimeContext;

fn call(&self, ptr: i32, len: i32, version: i32, ctx: &mut Self::Context) -> Result<i32> {
WasmTimeFn::call(self, &mut ctx.store, (ptr, len, version))
}
}
}
153 changes: 153 additions & 0 deletions crates/fluvio-smartengine/src/engine/wasmedge/imp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
use tracing::debug;
use wasmedge_sdk::error::HostFuncError;
use wasmedge_sdk::types::Val;
use wasmedge_sdk::{
Executor, Func, Module, Store, CallingFrame, WasmValue, Caller, ImportObjectBuilder,
};

use crate::engine::wasmedge::instance::{RecordsCallBack, RecordsMemory};

use crate::engine::common::{WasmFn, WasmInstance};
use crate::engine::error::EngineError;

use anyhow::Result;
use fluvio_smartmodule::dataplane::smartmodule::{SmartModuleExtraParams};
use std::sync::Arc;

pub(crate) struct WasmedgeInstance {
instance: wasmedge_sdk::Instance,
records_cb: Arc<super::instance::RecordsCallBack>,
params: SmartModuleExtraParams,
version: i16,
}

pub(crate) struct WasmedgeContext {
pub engine: Executor,
}

pub type WasmedgeFn = Func;

impl WasmInstance for WasmedgeInstance {
type Context = WasmedgeContext;

type Func = WasmedgeFn;

fn get_fn(&self, name: &str, _ctx: &mut Self::Context) -> Result<Option<Self::Func>> {
match self.instance.func(name) {
// check type signature
Some(func) => Ok(Some(func)),
None => Ok(None),
}
}

fn write_input<E: fluvio_protocol::Encoder>(
&mut self,
input: &E,
ctx: &mut Self::Context,
) -> Result<(i32, i32, i32)> {
self.records_cb.clear();
let mut input_data = Vec::new();
input.encode(&mut input_data, self.version)?;
debug!(
len = input_data.len(),
version = self.version,
"input encoded"
);
let array_ptr =
super::memory::copy_memory_to_instance(&ctx.engine, &self.instance, &input_data)?;
let length = input_data.len();
Ok((array_ptr, length as i32, self.version as i32))
}

fn read_output<D: fluvio_protocol::Decoder + Default>(
&mut self,
_ctx: &mut Self::Context,
) -> Result<D> {
let bytes = self
.records_cb
.get()
.and_then(|m| m.copy_memory_from().ok())
.unwrap_or_default();
let mut output = D::default();
output.decode(&mut std::io::Cursor::new(bytes), self.version)?;
Ok(output)
}

fn params(&self) -> SmartModuleExtraParams {
self.params.clone()
}
}

impl WasmFn for WasmedgeFn {
type Context = WasmedgeContext;

fn call(&self, ptr: i32, len: i32, version: i32, ctx: &mut Self::Context) -> Result<i32> {
let res = self.call(
&ctx.engine,
vec![
Val::I32(ptr).into(),
Val::I32(len).into(),
Val::I32(version).into(),
],
)?;
Ok(res[0].to_i32())
}
}

impl WasmedgeInstance {
/// instantiate new module instance that contain context
pub(crate) fn instantiate(
store: &mut Store,
executor: &mut Executor,
module: Module,
params: SmartModuleExtraParams,
version: i16,
) -> Result<Self, EngineError> {
debug!("creating WasmModuleInstance");
let cb = Arc::new(RecordsCallBack::new());
let records_cb = cb.clone();

// See crates/fluvio-smartmodule-derive/src/generator/transform.rs for copy_records
let copy_records_fn = move |caller: CallingFrame,
inputs: Vec<WasmValue>|
-> Result<Vec<WasmValue>, HostFuncError> {
assert_eq!(inputs.len(), 2);
let ptr = inputs[0].to_i32() as u32;
let len = inputs[1].to_i32() as u32;

debug!(len, "callback from wasm filter");
let caller = Caller::new(caller);
let memory = caller.memory(0).unwrap();

let records = RecordsMemory { ptr, len, memory };
cb.set(records);
Ok(vec![])
};

let import = ImportObjectBuilder::new()
.with_func::<(i32, i32), ()>("copy_records", copy_records_fn)
.map_err(|e| EngineError::Instantiate(e.into()))?
.build("env")
.map_err(|e| EngineError::Instantiate(e.into()))?;

debug!("instantiating WASMtime");
store
.register_import_module(executor, &import)
.map_err(|e| EngineError::Instantiate(e.into()))?;
let instance = store
.register_active_module(executor, &module)
.map_err(|e| EngineError::Instantiate(e.into()))?;

// This is a hack to avoid them being dropped
// FIXME: manage their lifetimes
std::mem::forget(import);
std::mem::forget(module);

Ok(Self {
instance,
records_cb,
params,
version,
})
}
}
11 changes: 11 additions & 0 deletions crates/fluvio-smartengine/src/engine/wasmedge/init.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use std::convert::TryFrom;
use std::fmt::Debug;

use anyhow::{Result, Ok};
use fluvio_smartmodule::dataplane::smartmodule::{
SmartModuleInitInput, SmartModuleInitOutput, SmartModuleInitErrorStatus,
};

use crate::engine::common::{WasmFn, WasmInstance};

use super::instance::SmartModuleInstanceContext;
Loading

0 comments on commit abd4fb6

Please sign in to comment.