Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Apr 15, 2023
1 parent 7a30006 commit 685ae36
Show file tree
Hide file tree
Showing 27 changed files with 722 additions and 406 deletions.
190 changes: 190 additions & 0 deletions crates/fluvio-smartengine/src/engine/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
use std::any::Any;

use anyhow::Result;
use fluvio_protocol::{Encoder, Decoder};
use fluvio_smartmodule::dataplane::smartmodule::{
SmartModuleInput, SmartModuleOutput, SmartModuleTransformErrorStatus,
};

pub trait WasmInstance {
type Context;
type Func: WasmFn<Context = Self::Context>;

fn get_fn(&self, name: &str, ctx: &mut Self::Context) -> Result<Option<Self::Func>>;

fn write_input<E: Encoder>(
&mut self,
input: &E,
ctx: &mut Self::Context,
) -> Result<(i32, i32, i32)>;
fn read_output<D: Decoder + Default>(&mut self, ctx: &mut Self::Context) -> Result<D>;
}

/// All smartmodule wasm functions have the same ABI:
/// `(ptr: *mut u8, len: usize, version: i16) -> i32`, which is `(param i32 i32 i32) (result i32)` in wasm.
pub trait WasmFn {
type Context;
fn call(&self, ptr: i32, len: i32, version: i32, ctx: &mut Self::Context) -> Result<i32>;
}

pub trait SmartModuleTransform<I: WasmInstance>: Send + Sync {
/// transform records
fn process(
&mut self,
input: SmartModuleInput,
instance: &mut I,
ctx: &mut I::Context,
) -> Result<SmartModuleOutput>;

/// return name of transform, this is used for identifying transform and debugging
fn name(&self) -> &str;
}

// In order turn to any, need following magic trick
pub(crate) trait DowncastableTransform<I: WasmInstance>:
SmartModuleTransform<I> + Any
{
fn as_any(&self) -> &dyn Any;
}

impl<T: SmartModuleTransform<I> + Any, I: WasmInstance> DowncastableTransform<I> for T {
fn as_any(&self) -> &dyn Any {
self
}
}

pub struct SimpleTransformImpl<F: WasmFn + Send + Sync> {
name: String,
func: F,
}

impl<I: WasmInstance, F: WasmFn<Context = I::Context> + Send + Sync> SmartModuleTransform<I>
for SimpleTransformImpl<F>
{
fn process(
&mut self,
input: SmartModuleInput,
instance: &mut I,
ctx: &mut I::Context,
) -> Result<SmartModuleOutput> {
let (ptr, len, version) = instance.write_input(&input, ctx)?;
let output = self.func.call(ptr, len, version, ctx)?;

if output < 0 {
let internal_error = SmartModuleTransformErrorStatus::try_from(output)
.unwrap_or(SmartModuleTransformErrorStatus::UnknownError);
return Err(internal_error.into());
}

let output: SmartModuleOutput = instance.read_output(ctx)?;
Ok(output)
}

fn name(&self) -> &str {
&self.name
}
}

impl<F: WasmFn + Send + Sync> SimpleTransformImpl<F> {
pub(crate) fn try_instantiate<I>(
name: &str,
instance: &mut I,
ctx: &mut <I as WasmInstance>::Context,
) -> Result<Option<Self>>
where
I: WasmInstance<Func = F>,
F: WasmFn<Context = I::Context>,
{
let func = instance
.get_fn(name, ctx)?
.ok_or_else(|| anyhow::anyhow!("{} not found", name))?;
Ok(Some(Self {
name: name.to_owned(),
func,
}))
}
}

mod wasmtime {
use anyhow::Result;
use fluvio_protocol::{Encoder, Decoder};
use tracing::debug;

use std::sync::Arc;

use fluvio_smartmodule::dataplane::smartmodule::SmartModuleExtraParams;
use wasmtime::{Instance, Store};

use super::super::instance::RecordsCallBack;

pub struct WasmTimeInstance {
instance: Instance,
records_cb: Arc<RecordsCallBack>,
params: SmartModuleExtraParams,
version: i16,
}
pub struct WasmTimeContext {
store: Store<()>,
}

pub type WasmTimeFn = wasmtime::TypedFunc<(i32, i32, i32), i32>;

impl super::WasmInstance for WasmTimeInstance {
type Context = WasmTimeContext;

type Func = WasmTimeFn;

fn get_fn(&self, name: &str, ctx: &mut Self::Context) -> Result<Option<Self::Func>> {
match self.instance.get_func(&mut ctx.store, name) {
Some(func) => {
// check type signature
func.typed(&mut ctx.store)
.or_else(|_| func.typed(&ctx.store))
.map(|f| Some(f))
}
None => Ok(None),
}
}

fn write_input<E: Encoder>(
&mut self,
input: &E,
ctx: &mut Self::Context,
) -> anyhow::Result<(i32, i32, i32)> {
self.records_cb.clear();
let mut input_data = Vec::new();
input.encode(&mut input_data, self.version)?;
debug!(
len = input_data.len(),
version = self.version,
"input encoded"
);
let array_ptr = super::super::memory::copy_memory_to_instance(
&mut ctx.store,
&self.instance,
&input_data,
)?;
let length = input_data.len();
Ok((array_ptr as i32, length as i32, self.version as i32))
}

fn read_output<D: Decoder + Default>(&mut self, ctx: &mut Self::Context) -> Result<D> {
let bytes = self
.records_cb
.get()
.and_then(|m| m.copy_memory_from(&ctx.store).ok())
.unwrap_or_default();
let mut output = D::default();
output.decode(&mut std::io::Cursor::new(bytes), self.version)?;
Ok(output)
}
}

impl super::WasmFn for WasmTimeFn {
type Context = WasmTimeContext;

fn call(&self, ptr: i32, len: i32, version: i32, ctx: &mut Self::Context) -> Result<i32> {
WasmTimeFn::call(self, &mut ctx.store, (ptr, len, version))
}
}
}
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
use std::fmt::{self, Debug};

use anyhow::Result;
use tracing::debug;
use wasmtime::{Engine, Module};

use fluvio_smartmodule::dataplane::smartmodule::{SmartModuleInput, SmartModuleOutput};

use crate::config::*;
use crate::init::SmartModuleInit;
use crate::instance::{SmartModuleInstance, SmartModuleInstanceContext};
use crate::{SmartEngine, SmartModuleChainInstance};

use super::init::SmartModuleInit;
use super::instance::{SmartModuleInstance, SmartModuleInstanceContext};

use crate::metrics::SmartModuleChainMetrics;
use crate::state::WasmState;
use crate::transforms::create_transform;
use super::metrics::SmartModuleChainMetrics;
use super::state::WasmState;
use super::transforms::create_transform;

#[derive(Clone)]
pub struct SmartEngine(Engine);
pub struct SmartEngineImp(Engine);

#[allow(clippy::new_without_default)]
impl SmartEngine {
impl SmartEngineImp {
pub fn new() -> Self {
let mut config = wasmtime::Config::default();
config.consume_fuel(true);
Expand All @@ -30,71 +29,39 @@ impl SmartEngine {
}
}

impl Debug for SmartEngine {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "SmartModuleEngine")
}
}

/// Building SmartModule
#[derive(Default)]
pub struct SmartModuleChainBuilder {
smart_modules: Vec<(SmartModuleConfig, Vec<u8>)>,
}

impl SmartModuleChainBuilder {
/// Add SmartModule with a single transform and init
pub fn add_smart_module(&mut self, config: SmartModuleConfig, bytes: Vec<u8>) {
self.smart_modules.push((config, bytes))
/// stop adding smartmodule and return SmartModuleChain that can be executed
pub fn initialize_imp(
builder: super::SmartModuleChainBuilder,
engine: &SmartEngine,
) -> Result<SmartModuleChainInstance> {
let mut instances = Vec::with_capacity(builder.smart_modules.len());
let mut state = engine.inner.new_state();
for (config, bytes) in builder.smart_modules {
let module = Module::new(&engine.inner.0, bytes)?;
let version = config.version();
let ctx =
SmartModuleInstanceContext::instantiate(&mut state, module, config.params, version)?;
let init = SmartModuleInit::try_instantiate(&ctx, &mut state)?;
let transform = create_transform(&ctx, config.initial_data, &mut state)?;
let mut instance = SmartModuleInstance::new(ctx, init, transform);
instance.init(&mut state)?;
instances.push(instance);
}

/// stop adding smartmodule and return SmartModuleChain that can be executed
pub fn initialize(self, engine: &SmartEngine) -> Result<SmartModuleChainInstance> {
let mut instances = Vec::with_capacity(self.smart_modules.len());
let mut state = engine.new_state();
for (config, bytes) in self.smart_modules {
let module = Module::new(&engine.0, bytes)?;
let version = config.version();
let ctx = SmartModuleInstanceContext::instantiate(
&mut state,
module,
config.params,
version,
)?;
let init = SmartModuleInit::try_instantiate(&ctx, &mut state)?;
let transform = create_transform(&ctx, config.initial_data, &mut state)?;
let mut instance = SmartModuleInstance::new(ctx, init, transform);
instance.init(&mut state)?;
instances.push(instance);
}
Ok(SmartModuleChainInstance {
Ok(SmartModuleChainInstance {
inner: SmartModuleChainInstanceImp {
store: state,
instances,
})
}
}

impl<T: Into<Vec<u8>>> From<(SmartModuleConfig, T)> for SmartModuleChainBuilder {
fn from(pair: (SmartModuleConfig, T)) -> Self {
let mut result = Self::default();
result.add_smart_module(pair.0, pair.1.into());
result
}
},
})
}

/// SmartModule Chain Instance that can be executed
pub struct SmartModuleChainInstance {
pub struct SmartModuleChainInstanceImp {
store: WasmState,
instances: Vec<SmartModuleInstance>,
}

impl Debug for SmartModuleChainInstance {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "SmartModuleChainInstance")
}
}

impl SmartModuleChainInstance {
impl SmartModuleChainInstanceImp {
#[cfg(test)]
pub(crate) fn instances(&self) -> &Vec<SmartModuleInstance> {
&self.instances
Expand Down Expand Up @@ -154,7 +121,7 @@ impl SmartModuleChainInstance {
#[cfg(test)]
mod test {

use super::SmartModuleConfig;
use crate::SmartModuleConfig;

#[test]
fn test_param() {
Expand All @@ -174,15 +141,15 @@ mod chaining_test {

use fluvio_smartmodule::{dataplane::smartmodule::SmartModuleInput, Record};

use crate::{
use super::super::{
SmartEngine, SmartModuleChainBuilder, SmartModuleConfig, SmartModuleInitialData,
metrics::SmartModuleChainMetrics,
};

const SM_FILTER_INIT: &str = "fluvio_smartmodule_filter_init";
const SM_MAP: &str = "fluvio_smartmodule_map";

use crate::fixture::read_wasm_module;
use super::super::fixture::read_wasm_module;

#[ignore]
#[test]
Expand All @@ -206,7 +173,8 @@ mod chaining_test {

let mut chain = chain_builder
.initialize(&engine)
.expect("failed to build chain");
.expect("failed to build chain")
.inner;
assert_eq!(chain.instances().len(), 2);

let input = vec![Record::new("hello world")];
Expand Down Expand Up @@ -260,7 +228,8 @@ mod chaining_test {

let mut chain = chain_builder
.initialize(&engine)
.expect("failed to build chain");
.expect("failed to build chain")
.inner;
assert_eq!(chain.instances().len(), 2);

let input = vec![
Expand Down Expand Up @@ -299,7 +268,8 @@ mod chaining_test {
let chain_builder = SmartModuleChainBuilder::default();
let mut chain = chain_builder
.initialize(&engine)
.expect("failed to build chain");
.expect("failed to build chain")
.inner;

assert_eq!(chain.store.get_used_fuel(), 0);

Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use fluvio_smartmodule::dataplane::smartmodule::{
};
use wasmtime::{AsContextMut, TypedFunc};

use crate::instance::SmartModuleInstanceContext;
use super::instance::SmartModuleInstanceContext;

pub(crate) const INIT_FN_NAME: &str = "init";
type WasmInitFn = TypedFunc<(i32, i32, u32), i32>;
Expand Down
Loading

0 comments on commit 685ae36

Please sign in to comment.