Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Apr 15, 2023
1 parent 7a30006 commit 95c20a2
Show file tree
Hide file tree
Showing 27 changed files with 594 additions and 330 deletions.
190 changes: 190 additions & 0 deletions crates/fluvio-smartengine/src/engine/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
use std::any::Any;

use anyhow::Result;
use fluvio_protocol::{Encoder, Decoder};
use fluvio_smartmodule::dataplane::smartmodule::{
SmartModuleInput, SmartModuleOutput, SmartModuleTransformErrorStatus,
};

pub trait WasmInstance {
type Context;
type Func: WasmFn<Context = Self::Context>;

fn get_fn(&self, name: &str, ctx: &mut Self::Context) -> Result<Option<Self::Func>>;

fn write_input<E: Encoder>(
&mut self,
input: &E,
ctx: &mut Self::Context,
) -> Result<(i32, i32, i32)>;
fn read_output<D: Decoder + Default>(&mut self, ctx: &mut Self::Context) -> Result<D>;
}

/// All smartmodule wasm functions have the same ABI:
/// `(ptr: *mut u8, len: usize, version: i16) -> i32`, which is `(param i32 i32 i32) (result i32)` in wasm.
pub trait WasmFn {
type Context;
fn call(&self, ptr: i32, len: i32, version: i32, ctx: &mut Self::Context) -> Result<i32>;
}

pub trait SmartModuleTransform<I: WasmInstance>: Send + Sync {
/// transform records
fn process(
&mut self,
input: SmartModuleInput,
instance: &mut I,
ctx: &mut I::Context,
) -> Result<SmartModuleOutput>;

/// return name of transform, this is used for identifying transform and debugging
fn name(&self) -> &str;
}

// In order turn to any, need following magic trick
pub(crate) trait DowncastableTransform<I: WasmInstance>:
SmartModuleTransform<I> + Any
{
fn as_any(&self) -> &dyn Any;
}

impl<T: SmartModuleTransform<I> + Any, I: WasmInstance> DowncastableTransform<I> for T {
fn as_any(&self) -> &dyn Any {
self
}
}

pub struct SimpleTransformImpl<F: WasmFn + Send + Sync> {
name: String,
func: F,
}

impl<I: WasmInstance, F: WasmFn<Context = I::Context> + Send + Sync> SmartModuleTransform<I>
for SimpleTransformImpl<F>
{
fn process(
&mut self,
input: SmartModuleInput,
instance: &mut I,
ctx: &mut I::Context,
) -> Result<SmartModuleOutput> {
let (ptr, len, version) = instance.write_input(&input, ctx)?;
let output = self.func.call(ptr, len, version, ctx)?;

if output < 0 {
let internal_error = SmartModuleTransformErrorStatus::try_from(output)
.unwrap_or(SmartModuleTransformErrorStatus::UnknownError);
return Err(internal_error.into());
}

let output: SmartModuleOutput = instance.read_output(ctx)?;
Ok(output)
}

fn name(&self) -> &str {
&self.name
}
}

impl<F: WasmFn + Send + Sync> SimpleTransformImpl<F> {
pub(crate) fn try_instantiate<I>(
name: &str,
instance: &mut I,
ctx: &mut <I as WasmInstance>::Context,
) -> Result<Option<Self>>
where
I: WasmInstance<Func = F>,
F: WasmFn<Context = I::Context>,
{
let func = instance
.get_fn(name, ctx)?
.ok_or_else(|| anyhow::anyhow!("{} not found", name))?;
Ok(Some(Self {
name: name.to_owned(),
func,
}))
}
}

mod wasmtime {
use anyhow::Result;
use fluvio_protocol::{Encoder, Decoder};
use tracing::debug;

use std::sync::Arc;

use fluvio_smartmodule::dataplane::smartmodule::SmartModuleExtraParams;
use wasmtime::{Instance, Store};

use super::super::instance::RecordsCallBack;

pub struct WasmTimeInstance {
instance: Instance,
records_cb: Arc<RecordsCallBack>,
params: SmartModuleExtraParams,
version: i16,
}
pub struct WasmTimeContext {
store: Store<()>,
}

pub type WasmTimeFn = wasmtime::TypedFunc<(i32, i32, i32), i32>;

impl super::WasmInstance for WasmTimeInstance {
type Context = WasmTimeContext;

type Func = WasmTimeFn;

fn get_fn(&self, name: &str, ctx: &mut Self::Context) -> Result<Option<Self::Func>> {
match self.instance.get_func(&mut ctx.store, name) {
Some(func) => {
// check type signature
func.typed(&mut ctx.store)
.or_else(|_| func.typed(&ctx.store))
.map(|f| Some(f))
}
None => Ok(None),
}
}

fn write_input<E: Encoder>(
&mut self,
input: &E,
ctx: &mut Self::Context,
) -> anyhow::Result<(i32, i32, i32)> {
self.records_cb.clear();
let mut input_data = Vec::new();
input.encode(&mut input_data, self.version)?;
debug!(
len = input_data.len(),
version = self.version,
"input encoded"
);
let array_ptr = super::super::memory::copy_memory_to_instance(
&mut ctx.store,
&self.instance,
&input_data,
)?;
let length = input_data.len();
Ok((array_ptr as i32, length as i32, self.version as i32))
}

fn read_output<D: Decoder + Default>(&mut self, ctx: &mut Self::Context) -> Result<D> {
let bytes = self
.records_cb
.get()
.and_then(|m| m.copy_memory_from(&ctx.store).ok())
.unwrap_or_default();
let mut output = D::default();
output.decode(&mut std::io::Cursor::new(bytes), self.version)?;
Ok(output)
}
}

impl super::WasmFn for WasmTimeFn {
type Context = WasmTimeContext;

fn call(&self, ptr: i32, len: i32, version: i32, ctx: &mut Self::Context) -> Result<i32> {
WasmTimeFn::call(self, &mut ctx.store, (ptr, len, version))
}
}
}
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ use wasmtime::{Engine, Module};

use fluvio_smartmodule::dataplane::smartmodule::{SmartModuleInput, SmartModuleOutput};

use crate::config::*;
use crate::init::SmartModuleInit;
use crate::instance::{SmartModuleInstance, SmartModuleInstanceContext};
use super::config::*;
use super::init::SmartModuleInit;
use super::instance::{SmartModuleInstance, SmartModuleInstanceContext};

use crate::metrics::SmartModuleChainMetrics;
use crate::state::WasmState;
use crate::transforms::create_transform;
use super::metrics::SmartModuleChainMetrics;
use super::state::WasmState;
use super::transforms::create_transform;

#[derive(Clone)]
pub struct SmartEngine(Engine);
Expand Down Expand Up @@ -174,15 +174,15 @@ mod chaining_test {

use fluvio_smartmodule::{dataplane::smartmodule::SmartModuleInput, Record};

use crate::{
use super::super::{
SmartEngine, SmartModuleChainBuilder, SmartModuleConfig, SmartModuleInitialData,
metrics::SmartModuleChainMetrics,
};

const SM_FILTER_INIT: &str = "fluvio_smartmodule_filter_init";
const SM_MAP: &str = "fluvio_smartmodule_map";

use crate::fixture::read_wasm_module;
use super::super::fixture::read_wasm_module;

#[ignore]
#[test]
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use fluvio_smartmodule::dataplane::smartmodule::{
};
use wasmtime::{AsContextMut, TypedFunc};

use crate::instance::SmartModuleInstanceContext;
use super::instance::SmartModuleInstanceContext;

pub(crate) const INIT_FN_NAME: &str = "init";
type WasmInitFn = TypedFunc<(i32, i32, u32), i32>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ use fluvio_smartmodule::dataplane::smartmodule::{
SmartModuleExtraParams, SmartModuleInput, SmartModuleOutput, SmartModuleInitInput,
};

use crate::error::EngineError;
use crate::init::SmartModuleInit;
use crate::{WasmSlice, memory};
use crate::state::WasmState;
use super::error::EngineError;
use super::init::SmartModuleInit;
use super::{WasmSlice, memory};
use super::state::WasmState;

pub(crate) struct SmartModuleInstance {
ctx: SmartModuleInstanceContext,
Expand Down Expand Up @@ -186,7 +186,7 @@ pub struct RecordsMemory {
}

impl RecordsMemory {
fn copy_memory_from(&self, store: impl AsContext) -> Result<Vec<u8>> {
pub fn copy_memory_from(&self, store: impl AsContext) -> Result<Vec<u8>> {
let mut bytes = vec![0u8; self.len as u32 as usize];
self.memory.read(store, self.ptr as usize, &mut bytes)?;
Ok(bytes)
Expand Down
File renamed without changes.
File renamed without changes.
40 changes: 40 additions & 0 deletions crates/fluvio-smartengine/src/engine/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#[cfg(test)]
mod fixture;
mod config;

pub type WasmSlice = (i32, i32, u32);
pub type Version = i16;

cfg_if::cfg_if! {
// TODO: turn on this check when ready
// #[cfg(all(feature = "wasmedge_engine", feature = "wasmtime_engine"))]
// compile_error!(
// "Both `wasmedge_engine` and `wasmtime_engine` features are enabled, but \
// only one WASM runtime is supported at a time"
// );


if #[cfg(feature = "wasmtime-engine")] {
pub(crate) mod memory;
pub(crate) mod transforms;
pub(crate) mod init;
pub(crate) mod error;
pub mod metrics;
mod state;
mod engine;
pub use config::{SmartModuleConfig, SmartModuleConfigBuilder, SmartModuleConfigBuilderError, SmartModuleInitialData};
pub mod instance;

pub(crate) mod common;

// TODO: move wasmtime engine to a module; or maybe add some common abstraction?
pub use engine::{SmartEngine, SmartModuleChainBuilder, SmartModuleChainInstance};


} else if #[cfg(feature = "wasmedge-engine")] {
mod wasmedge_engine;
mod test_use {
pub use super::wasmedge_engine::{SmartEngine, SmartModuleChainBuilder, SmartModuleChainInstance};
}
}
}
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use fluvio_smartmodule::dataplane::smartmodule::{
SmartModuleInput, SmartModuleOutput, SmartModuleAggregateInput, SmartModuleAggregateOutput,
SmartModuleTransformErrorStatus,
};
use crate::{
use super::super::{
instance::{SmartModuleInstanceContext, SmartModuleTransform},
SmartModuleInitialData,
state::WasmState,
Expand Down Expand Up @@ -111,14 +111,14 @@ mod test {
Record,
};

use crate::{
use crate::engine::{
SmartEngine, SmartModuleChainBuilder, SmartModuleConfig, SmartModuleInitialData,
metrics::SmartModuleChainMetrics,
};

const SM_AGGEGRATE: &str = "fluvio_smartmodule_aggregate";

use crate::fixture::read_wasm_module;
use crate::engine::fixture::read_wasm_module;

#[ignore]
#[test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ mod test {
Record,
};

use crate::{
use crate::engine::{
SmartEngine, SmartModuleChainBuilder, SmartModuleConfig, metrics::SmartModuleChainMetrics,
transforms::simple_transform::ARRAY_MAP_FN_NAME,
};

const SM_ARRAY_MAP: &str = "fluvio_smartmodule_array_map_array";

use crate::fixture::read_wasm_module;
use crate::engine::fixture::read_wasm_module;

#[ignore]
#[test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ mod test {
Record,
};

use crate::{
use crate::engine::{
SmartEngine, SmartModuleChainBuilder, SmartModuleConfig, metrics::SmartModuleChainMetrics,
transforms::simple_transform::FILTER_FN_NAME,
};

const SM_FILTER: &str = "fluvio_smartmodule_filter";
const SM_FILTER_INIT: &str = "fluvio_smartmodule_filter_init";

use crate::fixture::read_wasm_module;
use crate::engine::fixture::read_wasm_module;

#[ignore]
#[test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ mod test {
Record,
};

use crate::{
use crate::engine::{
SmartEngine, SmartModuleChainBuilder, SmartModuleConfig, metrics::SmartModuleChainMetrics,
transforms::simple_transform::FILTER_MAP_FN_NAME,
};

const SM_FILTER_MAP: &str = "fluvio_smartmodule_filter_map";

use crate::fixture::read_wasm_module;
use crate::engine::fixture::read_wasm_module;

#[ignore]
#[test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ mod test {
Record,
};

use crate::{
use crate::engine::{
SmartEngine, SmartModuleChainBuilder, SmartModuleConfig, metrics::SmartModuleChainMetrics,
transforms::simple_transform::MAP_FN_NAME,
};
use crate::fixture::read_wasm_module;
use crate::engine::fixture::read_wasm_module;

const SM_MAP: &str = "fluvio_smartmodule_map";

Expand Down
Loading

0 comments on commit 95c20a2

Please sign in to comment.