Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support WasmEdge as an alternative engine #1

Draft
wants to merge 33 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
fb5b2fe
init work: make the public API clear
xxchan Feb 26, 2023
7a30006
implement filter transform
xxchan Feb 26, 2023
685ae36
wip
xxchan Apr 9, 2023
15686cc
make the public API clear
xxchan Feb 26, 2023
1937438
move wasmtime_engine to a different mod
xxchan Apr 15, 2023
6f1604e
fix wasi feature
xxchan Apr 15, 2023
6b88555
rename wasmtime-engine -> wasmtime
xxchan Apr 15, 2023
cd038a3
Merge branch 'master' into xxchan/clena
xxchan Apr 18, 2023
86326da
reorg
xxchan Apr 18, 2023
e959a9d
Merge branch 'xxchan/clena' into xxchan/wasmedge
xxchan Apr 18, 2023
f17b8ef
support init for wasmedge/common
xxchan Apr 18, 2023
8b11252
Merge branch 'master' into xxchan/wasmedge
xxchan Apr 18, 2023
11e1751
make SmartModuleInstance common
xxchan Apr 18, 2023
abd4fb6
make create_transform common & reorg trait imp to imp
xxchan Apr 18, 2023
66bcbdd
more movement
xxchan Apr 18, 2023
0132522
Merge branch 'master' into xxchan/wasmedge
xxchan Apr 29, 2023
da7a2e2
add other tests for wasmedge
xxchan Apr 29, 2023
26d9053
support agg for common/wasmedge
xxchan Apr 29, 2023
56de150
movement
xxchan Apr 29, 2023
14c4c3c
refactor wasmtime to use the common code
xxchan Apr 29, 2023
3d8f4bf
WasmTime -> Wasmtime
xxchan Apr 29, 2023
d0bb940
move transform unit tests to common
xxchan Apr 29, 2023
7924834
rename Wasmedge -> WasmEdge
xxchan Apr 29, 2023
f2f55e8
minor tweaks
xxchan Apr 29, 2023
e6f7e2e
change features
xxchan May 15, 2023
06d579f
rm wasmedge
xxchan May 31, 2023
6ff943b
Merge branch 'master' into xxchan/wasmedge
xxchan May 31, 2023
7e559c1
remove wasmedge
xxchan May 31, 2023
7d23e1a
fmt
xxchan May 31, 2023
f5650e3
fmt with group_imports = "StdExternalCrate"
xxchan May 31, 2023
fe98b8f
try to make diff smaller
xxchan May 31, 2023
136c572
make diff smaller
xxchan May 31, 2023
72f4ed7
clippy
xxchan May 31, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
make create_transform common & reorg trait imp to imp
  • Loading branch information
xxchan committed Apr 18, 2023
commit abd4fb61fb6df8e9858059a75569aaeb757dfd7e
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

149 changes: 54 additions & 95 deletions crates/fluvio-smartengine/src/engine/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@ use fluvio_smartmodule::dataplane::smartmodule::{
SmartModuleExtraParams,
};

pub trait WasmInstance {
use crate::SmartModuleInitialData;

use super::error::EngineError;

pub(crate) trait WasmInstance {
type Context;
type Func: WasmFn<Context = Self::Context>;
type Func: WasmFn<Context = Self::Context> + Send + Sync + 'static;

fn params(&self) -> SmartModuleExtraParams;

Expand All @@ -26,12 +30,12 @@ pub trait WasmInstance {

/// All smartmodule wasm functions have the same ABI:
/// `(ptr: *mut u8, len: usize, version: i16) -> i32`, which is `(param i32 i32 i32) (result i32)` in wasm.
pub trait WasmFn {
pub(crate) trait WasmFn {
type Context;
fn call(&self, ptr: i32, len: i32, version: i32, ctx: &mut Self::Context) -> Result<i32>;
}

pub trait SmartModuleTransform<I: WasmInstance>: Send + Sync {
pub(crate) trait SmartModuleTransform<I: WasmInstance>: Send + Sync {
/// transform records
fn process(
&mut self,
Expand All @@ -57,13 +61,13 @@ impl<T: SmartModuleTransform<I> + Any, I: WasmInstance> DowncastableTransform<I>
}
}

pub struct SimpleTransformImpl<F: WasmFn + Send + Sync> {
pub(crate) struct SimpleTransform<F: WasmFn + Send + Sync> {
name: String,
func: F,
}

impl<I: WasmInstance, F: WasmFn<Context = I::Context> + Send + Sync> SmartModuleTransform<I>
for SimpleTransformImpl<F>
for SimpleTransform<F>
{
fn process(
&mut self,
Expand All @@ -89,15 +93,15 @@ impl<I: WasmInstance, F: WasmFn<Context = I::Context> + Send + Sync> SmartModule
}
}

impl<F: WasmFn + Send + Sync> SimpleTransformImpl<F> {
pub(crate) fn try_instantiate<I>(
impl<F: WasmFn + Send + Sync> SimpleTransform<F> {
pub(crate) fn try_instantiate<I, C>(
name: &str,
instance: &mut I,
ctx: &mut <I as WasmInstance>::Context,
ctx: &mut C,
) -> Result<Option<Self>>
where
I: WasmInstance<Func = F>,
F: WasmFn<Context = I::Context>,
I: WasmInstance<Func = F, Context = C>,
F: WasmFn<Context = C>,
{
let func = instance
.get_fn(name, ctx)?
Expand All @@ -109,6 +113,45 @@ impl<F: WasmFn + Send + Sync> SimpleTransformImpl<F> {
}
}

pub(crate) const FILTER_FN_NAME: &str = "filter";
pub(crate) const MAP_FN_NAME: &str = "map";
pub(crate) const FILTER_MAP_FN_NAME: &str = "filter_map";
pub(crate) const ARRAY_MAP_FN_NAME: &str = "array_map";

pub(crate) fn create_transform<I, C>(
instance: &mut I,
ctx: &mut C,
_initial_data: SmartModuleInitialData,
) -> Result<Box<dyn DowncastableTransform<I>>>
where
I: WasmInstance<Context = C>,
{
if let Some(tr) = SimpleTransform::try_instantiate(FILTER_FN_NAME, instance, ctx)?
.map(|transform| Box::new(transform) as Box<dyn DowncastableTransform<I>>)
{
Ok(tr)
} else if let Some(tr) = SimpleTransform::try_instantiate(MAP_FN_NAME, instance, ctx)?
.map(|transform| Box::new(transform) as Box<dyn DowncastableTransform<I>>)
{
Ok(tr)
} else if let Some(tr) = SimpleTransform::try_instantiate(FILTER_MAP_FN_NAME, instance, ctx)?
.map(|transform| Box::new(transform) as Box<dyn DowncastableTransform<I>>)
{
Ok(tr)
} else if let Some(tr) = SimpleTransform::try_instantiate(ARRAY_MAP_FN_NAME, instance, ctx)?
.map(|transform| Box::new(transform) as Box<dyn DowncastableTransform<I>>)
{
Ok(tr)
// TODO: AGGREGATE
// } else if let Some(tr) = SmartModuleAggregate::try_instantiate(ctx, initial_data, store)?
// .map(|transform| Box::new(transform) as Box<dyn DowncastableTransform<I>>)
// {
// Ok(tr)
} else {
Err(EngineError::UnknownSmartModule.into())
}
}

pub(crate) const INIT_FN_NAME: &str = "init";

pub(crate) struct SmartModuleInit<F: WasmFn>(F);
Expand Down Expand Up @@ -220,87 +263,3 @@ impl<I: WasmInstance<Func = F>, F: WasmFn + Send + Sync> SmartModuleInstance<I,
}
}
}

mod wasmtime {
use anyhow::Result;
use fluvio_protocol::{Encoder, Decoder};
use tracing::debug;

use std::sync::Arc;

use fluvio_smartmodule::dataplane::smartmodule::SmartModuleExtraParams;
use wasmtime::{Instance, Store};

use crate::engine::wasmtime::instance::RecordsCallBack;

pub struct WasmTimeInstance {
instance: Instance,
records_cb: Arc<RecordsCallBack>,
params: SmartModuleExtraParams,
version: i16,
}
pub struct WasmTimeContext {
store: Store<()>,
}

pub type WasmTimeFn = wasmtime::TypedFunc<(i32, i32, i32), i32>;

impl super::WasmInstance for WasmTimeInstance {
type Context = WasmTimeContext;

type Func = WasmTimeFn;

fn get_fn(&self, name: &str, ctx: &mut Self::Context) -> Result<Option<Self::Func>> {
match self.instance.get_func(&mut ctx.store, name) {
Some(func) => {
// check type signature
func.typed(&mut ctx.store)
.or_else(|_| func.typed(&ctx.store))
.map(|f| Some(f))
}
None => Ok(None),
}
}

fn write_input<E: Encoder>(
&mut self,
input: &E,
ctx: &mut Self::Context,
) -> anyhow::Result<(i32, i32, i32)> {
self.records_cb.clear();
let mut input_data = Vec::new();
input.encode(&mut input_data, self.version)?;
debug!(
len = input_data.len(),
version = self.version,
"input encoded"
);
let array_ptr = crate::engine::wasmtime::memory::copy_memory_to_instance(
&mut ctx.store,
&self.instance,
&input_data,
)?;
let length = input_data.len();
Ok((array_ptr as i32, length as i32, self.version as i32))
}

fn read_output<D: Decoder + Default>(&mut self, ctx: &mut Self::Context) -> Result<D> {
let bytes = self
.records_cb
.get()
.and_then(|m| m.copy_memory_from(&ctx.store).ok())
.unwrap_or_default();
let mut output = D::default();
output.decode(&mut std::io::Cursor::new(bytes), self.version)?;
Ok(output)
}
}

impl super::WasmFn for WasmTimeFn {
type Context = WasmTimeContext;

fn call(&self, ptr: i32, len: i32, version: i32, ctx: &mut Self::Context) -> Result<i32> {
WasmTimeFn::call(self, &mut ctx.store, (ptr, len, version))
}
}
}
153 changes: 153 additions & 0 deletions crates/fluvio-smartengine/src/engine/wasmedge/imp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
use tracing::debug;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you reorder terms in of:

  • std
  • third party
  • fluvio dep

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's OK. Are you going to review the PR now? I will resolve such issues if general ideas LGTY.

P.S. this PR is generally complete except minor points.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you split this into PR just related to without WasmEdge? We need to discuss how to maintain WasmEdge related code.

Thanks

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's OK. But I'd like to elaborate a bit: only folder wasmedge is WasmEdge-specific code, and that's ~300 lines of simple code. To discuss how to maintain WasmEdge related code, and to get the ideas of how the common code work, it might be better to keep WasmEdge related code now. This can actually make it easier to review this PR.

I can remove the WasmEdge related code after the code review if that's not decided.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like have review by team but can't seem to add reviewers

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created a PR to the main repo infinyon#3257

use wasmedge_sdk::error::HostFuncError;
use wasmedge_sdk::types::Val;
use wasmedge_sdk::{
Executor, Func, Module, Store, CallingFrame, WasmValue, Caller, ImportObjectBuilder,
};

use crate::engine::wasmedge::instance::{RecordsCallBack, RecordsMemory};

use crate::engine::common::{WasmFn, WasmInstance};
use crate::engine::error::EngineError;

use anyhow::Result;
use fluvio_smartmodule::dataplane::smartmodule::{SmartModuleExtraParams};
use std::sync::Arc;

pub(crate) struct WasmedgeInstance {
instance: wasmedge_sdk::Instance,
records_cb: Arc<super::instance::RecordsCallBack>,
params: SmartModuleExtraParams,
version: i16,
}

pub(crate) struct WasmedgeContext {
pub engine: Executor,
}

pub type WasmedgeFn = Func;

impl WasmInstance for WasmedgeInstance {
type Context = WasmedgeContext;

type Func = WasmedgeFn;

fn get_fn(&self, name: &str, _ctx: &mut Self::Context) -> Result<Option<Self::Func>> {
match self.instance.func(name) {
// check type signature
Some(func) => Ok(Some(func)),
None => Ok(None),
}
}

fn write_input<E: fluvio_protocol::Encoder>(
&mut self,
input: &E,
ctx: &mut Self::Context,
) -> Result<(i32, i32, i32)> {
self.records_cb.clear();
let mut input_data = Vec::new();
input.encode(&mut input_data, self.version)?;
debug!(
len = input_data.len(),
version = self.version,
"input encoded"
);
let array_ptr =
super::memory::copy_memory_to_instance(&ctx.engine, &self.instance, &input_data)?;
let length = input_data.len();
Ok((array_ptr, length as i32, self.version as i32))
}

fn read_output<D: fluvio_protocol::Decoder + Default>(
&mut self,
_ctx: &mut Self::Context,
) -> Result<D> {
let bytes = self
.records_cb
.get()
.and_then(|m| m.copy_memory_from().ok())
.unwrap_or_default();
let mut output = D::default();
output.decode(&mut std::io::Cursor::new(bytes), self.version)?;
Ok(output)
}

fn params(&self) -> SmartModuleExtraParams {
self.params.clone()
}
}

impl WasmFn for WasmedgeFn {
type Context = WasmedgeContext;

fn call(&self, ptr: i32, len: i32, version: i32, ctx: &mut Self::Context) -> Result<i32> {
let res = self.call(
&ctx.engine,
vec![
Val::I32(ptr).into(),
Val::I32(len).into(),
Val::I32(version).into(),
],
)?;
Ok(res[0].to_i32())
}
}

impl WasmedgeInstance {
/// instantiate new module instance that contain context
pub(crate) fn instantiate(
store: &mut Store,
executor: &mut Executor,
module: Module,
params: SmartModuleExtraParams,
version: i16,
) -> Result<Self, EngineError> {
debug!("creating WasmModuleInstance");
let cb = Arc::new(RecordsCallBack::new());
let records_cb = cb.clone();

// See crates/fluvio-smartmodule-derive/src/generator/transform.rs for copy_records
let copy_records_fn = move |caller: CallingFrame,
inputs: Vec<WasmValue>|
-> Result<Vec<WasmValue>, HostFuncError> {
assert_eq!(inputs.len(), 2);
let ptr = inputs[0].to_i32() as u32;
let len = inputs[1].to_i32() as u32;

debug!(len, "callback from wasm filter");
let caller = Caller::new(caller);
let memory = caller.memory(0).unwrap();

let records = RecordsMemory { ptr, len, memory };
cb.set(records);
Ok(vec![])
};

let import = ImportObjectBuilder::new()
.with_func::<(i32, i32), ()>("copy_records", copy_records_fn)
.map_err(|e| EngineError::Instantiate(e.into()))?
.build("env")
.map_err(|e| EngineError::Instantiate(e.into()))?;

debug!("instantiating WASMtime");
store
.register_import_module(executor, &import)
.map_err(|e| EngineError::Instantiate(e.into()))?;
let instance = store
.register_active_module(executor, &module)
.map_err(|e| EngineError::Instantiate(e.into()))?;

// This is a hack to avoid them being dropped
// FIXME: manage their lifetimes
std::mem::forget(import);
std::mem::forget(module);

Ok(Self {
instance,
records_cb,
params,
version,
})
}
}
11 changes: 11 additions & 0 deletions crates/fluvio-smartengine/src/engine/wasmedge/init.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use std::convert::TryFrom;
use std::fmt::Debug;

use anyhow::{Result, Ok};
use fluvio_smartmodule::dataplane::smartmodule::{
SmartModuleInitInput, SmartModuleInitOutput, SmartModuleInitErrorStatus,
};

use crate::engine::common::{WasmFn, WasmInstance};

use super::instance::SmartModuleInstanceContext;
Loading