Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support WasmEdge as an alternative engine #1

Draft
wants to merge 33 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
fb5b2fe
init work: make the public API clear
xxchan Feb 26, 2023
7a30006
implement filter transform
xxchan Feb 26, 2023
685ae36
wip
xxchan Apr 9, 2023
15686cc
make the public API clear
xxchan Feb 26, 2023
1937438
move wasmtime_engine to a different mod
xxchan Apr 15, 2023
6f1604e
fix wasi feature
xxchan Apr 15, 2023
6b88555
rename wasmtime-engine -> wasmtime
xxchan Apr 15, 2023
cd038a3
Merge branch 'master' into xxchan/clena
xxchan Apr 18, 2023
86326da
reorg
xxchan Apr 18, 2023
e959a9d
Merge branch 'xxchan/clena' into xxchan/wasmedge
xxchan Apr 18, 2023
f17b8ef
support init for wasmedge/common
xxchan Apr 18, 2023
8b11252
Merge branch 'master' into xxchan/wasmedge
xxchan Apr 18, 2023
11e1751
make SmartModuleInstance common
xxchan Apr 18, 2023
abd4fb6
make create_transform common & reorg trait imp to imp
xxchan Apr 18, 2023
66bcbdd
more movement
xxchan Apr 18, 2023
0132522
Merge branch 'master' into xxchan/wasmedge
xxchan Apr 29, 2023
da7a2e2
add other tests for wasmedge
xxchan Apr 29, 2023
26d9053
support agg for common/wasmedge
xxchan Apr 29, 2023
56de150
movement
xxchan Apr 29, 2023
14c4c3c
refactor wasmtime to use the common code
xxchan Apr 29, 2023
3d8f4bf
WasmTime -> Wasmtime
xxchan Apr 29, 2023
d0bb940
move transform unit tests to common
xxchan Apr 29, 2023
7924834
rename Wasmedge -> WasmEdge
xxchan Apr 29, 2023
f2f55e8
minor tweaks
xxchan Apr 29, 2023
e6f7e2e
change features
xxchan May 15, 2023
06d579f
rm wasmedge
xxchan May 31, 2023
6ff943b
Merge branch 'master' into xxchan/wasmedge
xxchan May 31, 2023
7e559c1
remove wasmedge
xxchan May 31, 2023
7d23e1a
fmt
xxchan May 31, 2023
f5650e3
fmt with group_imports = "StdExternalCrate"
xxchan May 31, 2023
fe98b8f
try to make diff smaller
xxchan May 31, 2023
136c572
make diff smaller
xxchan May 31, 2023
72f4ed7
clippy
xxchan May 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
make SmartModuleInstance common
  • Loading branch information
xxchan committed Apr 18, 2023
commit 11e175105c813ef6c852ab49d57699b89f59d472
57 changes: 57 additions & 0 deletions crates/fluvio-smartengine/src/engine/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ use fluvio_protocol::{Encoder, Decoder};
use fluvio_smartmodule::dataplane::smartmodule::{
SmartModuleInput, SmartModuleOutput, SmartModuleTransformErrorStatus,
SmartModuleInitErrorStatus, SmartModuleInitOutput, SmartModuleInitInput,
SmartModuleExtraParams,
};

pub trait WasmInstance {
type Context;
type Func: WasmFn<Context = Self::Context>;

fn params(&self) -> SmartModuleExtraParams;

fn get_fn(&self, name: &str, ctx: &mut Self::Context) -> Result<Option<Self::Func>>;

fn write_input<E: Encoder>(
Expand Down Expand Up @@ -164,6 +167,60 @@ impl<F: WasmFn + Send + Sync> SmartModuleInit<F> {
}
}

pub(crate) struct SmartModuleInstance<I: WasmInstance<Func = F>, F: WasmFn> {
pub instance: I,
pub transform: Box<dyn DowncastableTransform<I>>,
pub init: Option<SmartModuleInit<F>>,
}

impl<I: WasmInstance<Func = F>, F: WasmFn + Send + Sync> SmartModuleInstance<I, F> {
#[cfg(test)]
#[allow(clippy::borrowed_box)]
pub(crate) fn transform(&self) -> &Box<dyn DowncastableTransform<I>> {
&self.transform
}

#[cfg(test)]
pub(crate) fn get_init(&self) -> &Option<SmartModuleInit<F>> {
&self.init
}

pub(crate) fn new(
instance: I,
init: Option<SmartModuleInit<F>>,
transform: Box<dyn DowncastableTransform<I>>,
) -> Self {
Self {
instance,
init,
transform,
}
}

pub(crate) fn process(
&mut self,
input: SmartModuleInput,
ctx: &mut I::Context,
) -> Result<SmartModuleOutput> {
self.transform.process(input, &mut self.instance, ctx)
}

pub fn init<C>(&mut self, ctx: &mut I::Context) -> Result<()>
where
I: WasmInstance<Context = C>,
F: WasmFn<Context = C>,
{
if let Some(init) = &mut self.init {
let input = SmartModuleInitInput {
params: self.instance.params(),
};
init.initialize(input, &mut self.instance, ctx)
} else {
Ok(())
}
}
}

mod wasmtime {
use anyhow::Result;
use fluvio_protocol::{Encoder, Decoder};
Expand Down
11 changes: 5 additions & 6 deletions crates/fluvio-smartengine/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl SmartModuleChainBuilder {
}

pub fn initialize(self, engine: &SmartEngine) -> anyhow::Result<SmartModuleChainInstance> {
initialize_imp(self, engine)
initialize_imp(self, &engine.inner).map(|inner| SmartModuleChainInstance { inner })
}
}

Expand Down Expand Up @@ -97,16 +97,15 @@ cfg_if::cfg_if! {


if #[cfg(feature = "wasmtime-engine")] {
mod wasmtime;
use self::wasmtime::{SmartEngineImp, initialize_imp, SmartModuleChainInstanceImp};
// mod wasmtime;
// use self::wasmtime::{SmartEngineImp, initialize_imp, SmartModuleChainInstanceImp};
}
}

cfg_if::cfg_if! {
if #[cfg(feature = "wasmedge-engine")] {
mod wasmedge;
mod test_use {
pub use super::wasmedge::{SmartEngine, SmartModuleChainBuilder, SmartModuleChainInstance};
}
use self::wasmedge::{SmartEngineImp, initialize_imp, SmartModuleChainInstanceImp};

}
}
77 changes: 2 additions & 75 deletions crates/fluvio-smartengine/src/engine/wasmedge/instance.rs
Original file line number Diff line number Diff line change
@@ -1,79 +1,6 @@
use fluvio_protocol::{Decoder, Encoder};
use tracing::debug;
use wasmedge_sdk::error::HostFuncError;
use wasmedge_sdk::types::Val;
use wasmedge_sdk::{
host_function, Caller, CallingFrame, Engine, Executor, Func, ImportObjectBuilder, Instance,
Memory, Module, Store, WasmValue,
};

use super::{WasmedgeInstance, WasmedgeContext};
use crate::engine::common::SmartModuleInit;
use crate::engine::common::DowncastableTransform;
use crate::engine::error::EngineError;
use crate::metrics::SmartModuleChainMetrics;
use crate::engine::wasmedge::memory;
use crate::engine::{config::*, WasmSlice};
use wasmedge_sdk::Memory;
use anyhow::Result;
use fluvio_smartmodule::dataplane::smartmodule::{
SmartModuleInput, SmartModuleOutput, SmartModuleInitInput, SmartModuleExtraParams,
};
use std::any::Any;
use std::fmt::{self, Debug};
use std::sync::{Arc, Mutex};
use super::WasmedgeFn;

use super::Init;

pub(crate) struct SmartModuleInstance {
pub instance: WasmedgeInstance,
pub transform: Box<dyn DowncastableTransform<WasmedgeInstance>>,
pub init: Option<Init>,
}

impl SmartModuleInstance {
#[cfg(test)]
#[allow(clippy::borrowed_box)]
pub(crate) fn transform(&self) -> &Box<dyn DowncastableTransform<WasmedgeInstance>> {
&self.transform
}

#[cfg(test)]
pub(crate) fn get_init(&self) -> &Option<Init> {
&self.init
}

pub(crate) fn new(
instance: WasmedgeInstance,
init: Option<Init>,
transform: Box<dyn DowncastableTransform<WasmedgeInstance>>,
) -> Self {
Self {
instance,
init,
transform,
}
}

pub(crate) fn process(
&mut self,
input: SmartModuleInput,
ctx: &mut WasmedgeContext,
) -> Result<SmartModuleOutput> {
self.transform.process(input, &mut self.instance, ctx)
}

pub fn init(&mut self, ctx: &mut WasmedgeContext) -> Result<()> {
if let Some(init) = &mut self.init {
let input = SmartModuleInitInput {
params: self.instance.params.clone(),
};
init.initialize(input, &mut self.instance, ctx)
} else {
Ok(())
}
}
}
use std::sync::Mutex;

// TODO: revise later to see whether Clone is necessary
#[derive(Clone)]
Expand Down
127 changes: 48 additions & 79 deletions crates/fluvio-smartengine/src/engine/wasmedge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,28 @@ mod instance;
mod transforms;
use instance::*;
mod memory;
use memory::*;

use tracing::debug;
use wasmedge_sdk::error::HostFuncError;
use wasmedge_sdk::types::Val;
use wasmedge_sdk::{
Executor, Func, Instance, Memory, Module, Store, CallingFrame, WasmValue, Caller,
ImportObjectBuilder,
Executor, Func, Module, Store, CallingFrame, WasmValue, Caller, ImportObjectBuilder,
};

use crate::{SmartModuleChainBuilder};
use crate::engine::common::WasmFn;
use crate::engine::config::*;
use crate::engine::error::EngineError;
use crate::metrics::SmartModuleChainMetrics;
use anyhow::Result;
use fluvio_smartmodule::dataplane::smartmodule::{
SmartModuleInput, SmartModuleOutput, SmartModuleExtraParams,
};
use std::any::Any;
use std::fmt::{self, Debug};
use std::sync::{Arc, Mutex};
use std::sync::Arc;

use self::transforms::create_transform;

use super::common::SmartModuleInit;

type Init = SmartModuleInit<WasmedgeFn>;
type SmartModuleInit = super::common::SmartModuleInit<WasmedgeFn>;
type SmartModuleInstance = super::common::SmartModuleInstance<WasmedgeInstance, WasmedgeFn>;

pub struct WasmedgeInstance {
instance: wasmedge_sdk::Instance,
Expand Down Expand Up @@ -72,7 +67,7 @@ impl super::common::WasmInstance for WasmedgeInstance {
let array_ptr =
memory::copy_memory_to_instance(&mut ctx.engine, &self.instance, &input_data)?;
let length = input_data.len();
Ok((array_ptr as i32, length as i32, self.version as i32))
Ok((array_ptr, length as i32, self.version as i32))
}

fn read_output<D: fluvio_protocol::Decoder + Default>(
Expand All @@ -88,6 +83,10 @@ impl super::common::WasmInstance for WasmedgeInstance {
output.decode(&mut std::io::Cursor::new(bytes), self.version)?;
Ok(output)
}

fn params(&self) -> SmartModuleExtraParams {
self.params.clone()
}
}

impl WasmFn for WasmedgeFn {
Expand All @@ -97,9 +96,9 @@ impl WasmFn for WasmedgeFn {
let res = self.call(
&ctx.engine,
vec![
Val::I32(ptr as i32).into(),
Val::I32(len as i32).into(),
Val::I32(version as i32).into(),
Val::I32(ptr).into(),
Val::I32(len).into(),
Val::I32(version).into(),
],
)?;
Ok(res[0].to_i32())
Expand Down Expand Up @@ -164,88 +163,58 @@ impl WasmedgeInstance {
}
}

pub struct SmartEngine();
#[derive(Clone)]
pub struct SmartEngineImp();

#[allow(clippy::new_without_default)]
impl SmartEngine {
impl SmartEngineImp {
pub fn new() -> Self {
Self()
}
}

impl Debug for SmartEngine {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "SmartModuleEngine")
}
}

/// Building SmartModule
#[derive(Default)]
pub struct SmartModuleChainBuilder {
smart_modules: Vec<(SmartModuleConfig, Vec<u8>)>,
}

impl SmartModuleChainBuilder {
/// Add SmartModule with a single transform and init
pub fn add_smart_module(&mut self, config: SmartModuleConfig, bytes: Vec<u8>) {
self.smart_modules.push((config, bytes))
}

/// stop adding smartmodule and return SmartModuleChain that can be executed
pub fn initialize(self, _engine: &SmartEngine) -> Result<SmartModuleChainInstance> {
let mut executor = Executor::new(None, None).expect("Failed to create WasmEdge executor");
let mut store = Store::new().expect("Failed to create WasmEdge store");
let mut ctx = WasmedgeContext { engine: executor };

let mut instances = Vec::with_capacity(self.smart_modules.len());
// let mut state = engine.new_state();
for (config, bytes) in self.smart_modules {
let module = Module::from_bytes(None, bytes)?;
let version = config.version();
let mut instance = WasmedgeInstance::instantiate(
&mut store,
&mut ctx.engine,
module,
config.params,
version,
)?;

let init = Init::try_instantiate(&mut instance, &mut ctx)?;
let transform = create_transform(&mut instance, &mut ctx, config.initial_data)?;
let mut instance = SmartModuleInstance {
instance,
transform,
init,
};
instance.init(&mut ctx)?;
instances.push(instance);
}
pub fn initialize_imp(
builder: SmartModuleChainBuilder,
_engine: &SmartEngineImp,
) -> Result<SmartModuleChainInstanceImp> {
let executor = Executor::new(None, None).expect("Failed to create WasmEdge executor");
let mut store = Store::new().expect("Failed to create WasmEdge store");
let mut ctx = WasmedgeContext { engine: executor };

let mut instances = Vec::with_capacity(builder.smart_modules.len());
// let mut state = engine.new_state();
for (config, bytes) in builder.smart_modules {
let module = Module::from_bytes(None, bytes)?;
let version = config.version();
let mut instance = WasmedgeInstance::instantiate(
&mut store,
&mut ctx.engine,
module,
config.params,
version,
)?;

Ok(SmartModuleChainInstance { ctx, instances })
let init = SmartModuleInit::try_instantiate(&mut instance, &mut ctx)?;
let transform = create_transform(&mut instance, &mut ctx, config.initial_data)?;
let mut instance = SmartModuleInstance {
instance,
transform,
init,
};
instance.init(&mut ctx)?;
instances.push(instance);
}
}

impl<T: Into<Vec<u8>>> From<(SmartModuleConfig, T)> for SmartModuleChainBuilder {
fn from(pair: (SmartModuleConfig, T)) -> Self {
let mut result = Self::default();
result.add_smart_module(pair.0, pair.1.into());
result
}
Ok(SmartModuleChainInstanceImp { ctx, instances })
}

/// SmartModule Chain Instance that can be executed
pub struct SmartModuleChainInstance {
pub struct SmartModuleChainInstanceImp {
ctx: WasmedgeContext,
instances: Vec<SmartModuleInstance>,
}

impl Debug for SmartModuleChainInstance {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "SmartModuleChainInstance")
}
}

impl SmartModuleChainInstance {
impl SmartModuleChainInstanceImp {
/// A single record is processed thru all smartmodules in the chain.
/// The output of one smartmodule is the input of the next smartmodule.
/// A single record may result in multiple records.
Expand Down
Loading