Skip to content

Commit

Permalink
implement filter transform
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Mar 27, 2023
1 parent 3c47164 commit 6d81878
Show file tree
Hide file tree
Showing 9 changed files with 589 additions and 22 deletions.
18 changes: 7 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/fluvio-smartengine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ wasmtime-wasi = { version = "6.0.0", optional = true }
serde = { version = "1", features = ['derive'] }
serde_json = { version = "1.0", optional = true }
serde_yaml = { version = "0.9", default-features = false, optional = true }
wasmedge-sdk = { version = "0.7", optional = true }
# Use the Clone trait
wasmedge-sdk = { git = "https://github.com/WasmEdge/WasmEdge", rev = "fe5fe2c", optional = true }

cfg-if = "1.0.0"
derive_builder = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-smartengine/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use derive_builder::Builder;
use fluvio_smartmodule::dataplane::smartmodule::{SmartModuleExtraParams};
use fluvio_smartmodule::dataplane::smartmodule::SmartModuleExtraParams;

const DEFAULT_SMARTENGINE_VERSION: i16 = 17;

Expand Down
1 change: 1 addition & 0 deletions crates/fluvio-smartengine/src/wasmedge_engine/init.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub struct SmartModuleInit {}
223 changes: 223 additions & 0 deletions crates/fluvio-smartengine/src/wasmedge_engine/instance.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
use fluvio_protocol::{Decoder, Encoder};
use tracing::debug;
use wasmedge_sdk::error::HostFuncError;
use wasmedge_sdk::types::Val;
use wasmedge_sdk::{
host_function, Caller, CallingFrame, Engine, Executor, Func, ImportObjectBuilder, Instance,
Memory, Module, Store, WasmValue,
};

use super::init::SmartModuleInit;
use crate::error::EngineError;
use crate::metrics::SmartModuleChainMetrics;
use crate::wasmedge_engine::memory;
use crate::{config::*, WasmSlice};
use anyhow::Result;
use fluvio_smartmodule::dataplane::smartmodule::{SmartModuleInput, SmartModuleOutput};
use std::any::Any;
use std::fmt::{self, Debug};
use std::sync::{Arc, Mutex};

pub struct SmartModuleInstance {
ctx: SmartModuleInstanceContext,
transform: Box<dyn DowncastableTransform>,
// init: Option<SmartModuleInit>,
}

impl SmartModuleInstance {
#[cfg(test)]
#[allow(clippy::borrowed_box)]
pub(crate) fn transform(&self) -> &Box<dyn DowncastableTransform> {
&self.transform
}

#[cfg(test)]
pub(crate) fn get_init(&self) -> &Option<SmartModuleInit> {
&None
}

pub(crate) fn new(
ctx: SmartModuleInstanceContext,
// init: Option<SmartModuleInit>,
transform: Box<dyn DowncastableTransform>,
) -> Self {
Self {
ctx,
// init,
transform,
}
}

pub(crate) fn process(
&mut self,
input: SmartModuleInput,
engine: &mut Executor,
) -> Result<SmartModuleOutput> {
self.transform.process(input, &mut self.ctx, engine)
}
}

pub struct SmartModuleInstanceContext {
instance: Instance,
records_cb: Arc<RecordsCallBack>,
// params: SmartModuleExtraParams,
version: i16,
}

impl SmartModuleInstanceContext {
/// get wasm function from instance
pub(crate) fn get_wasm_func(&self, name: &str) -> Option<Func> {
self.instance.func(name)
}

/// instantiate new module instance that contain context
pub(crate) fn instantiate(
store: &mut Store,
executor: &mut Executor,
module: Module,
// params: SmartModuleExtraParams,
version: i16,
) -> Result<Self, EngineError> {
debug!("creating WasmModuleInstance");
let cb = Arc::new(RecordsCallBack::new());
let records_cb = cb.clone();

// See crates/fluvio-smartmodule-derive/src/generator/transform.rs for copy_records
let copy_records_fn = move |caller: CallingFrame,
inputs: Vec<WasmValue>|
-> Result<Vec<WasmValue>, HostFuncError> {
assert_eq!(inputs.len(), 2);
let ptr = inputs[0].to_i32() as u32;
let len = inputs[1].to_i32() as u32;

debug!(len, "callback from wasm filter");
let caller = Caller::new(caller);
let memory = caller.memory(0).unwrap();

let records = RecordsMemory { ptr, len, memory };
cb.set(records);
Ok(vec![])
};

let import = ImportObjectBuilder::new()
.with_func::<(i32, i32), ()>("copy_records", copy_records_fn)
.map_err(|e| EngineError::Instantiate(e.into()))?
.build("env")
.map_err(|e| EngineError::Instantiate(e.into()))?;

debug!("instantiating WASMtime");
store
.register_import_module(executor, &import)
.map_err(|e| EngineError::Instantiate(e.into()))?;
let instance = store
.register_active_module(executor, &module)
.map_err(|e| EngineError::Instantiate(e.into()))?;

// This is a hack to avoid them being dropped
// FIXME: manage their lifetimes
std::mem::forget(import);
std::mem::forget(module);

Ok(Self {
instance,
records_cb,
// params,
version,
})
}

pub(crate) fn write_input<E: Encoder>(
&mut self,
input: &E,
engine: &mut impl Engine,
) -> Result<Vec<WasmValue>> {
self.records_cb.clear();
let mut input_data = Vec::new();
input.encode(&mut input_data, self.version)?;
debug!(
len = input_data.len(),
version = self.version,
"input encoded"
);
let array_ptr = memory::copy_memory_to_instance(engine, &self.instance, &input_data)?;
let length = input_data.len();
Ok(vec![
Val::I32(array_ptr as i32).into(),
Val::I32(length as i32).into(),
Val::I32(self.version as i32).into(),
])
}

pub(crate) fn read_output<D: Decoder + Default>(&mut self) -> Result<D> {
let bytes = self
.records_cb
.get()
.and_then(|m| m.copy_memory_from().ok())
.unwrap_or_default();
let mut output = D::default();
output.decode(&mut std::io::Cursor::new(bytes), self.version)?;
Ok(output)
}
}

// TODO: revise later to see whether Clone is necessary
#[derive(Clone)]
pub struct RecordsMemory {
ptr: u32,
len: u32,
memory: Memory,
}

impl RecordsMemory {
fn copy_memory_from(&self) -> Result<Vec<u8>> {
let mut bytes = self.memory.read(self.ptr, self.len)?;
Ok(bytes)
}
}

pub struct RecordsCallBack(Mutex<Option<RecordsMemory>>);

impl RecordsCallBack {
pub(crate) fn new() -> Self {
Self(Mutex::new(None))
}

pub(crate) fn set(&self, records: RecordsMemory) {
let mut write_inner = self.0.lock().unwrap();
write_inner.replace(records);
}

pub(crate) fn clear(&self) {
let mut write_inner = self.0.lock().unwrap();
write_inner.take();
}

pub(crate) fn get(&self) -> Option<RecordsMemory> {
let reader = self.0.lock().unwrap();
reader.clone()
}
}

pub(crate) trait SmartModuleTransform: Send + Sync {
/// transform records
fn process(
&mut self,
input: SmartModuleInput,
ctx: &mut SmartModuleInstanceContext,
engine: &mut Executor,
) -> Result<SmartModuleOutput>;

/// return name of transform, this is used for identifying transform and debugging
fn name(&self) -> &str;
}

// In order turn to any, need following magic trick
pub(crate) trait DowncastableTransform: SmartModuleTransform + Any {
fn as_any(&self) -> &dyn Any;
}

impl<T: SmartModuleTransform + Any> DowncastableTransform for T {
fn as_any(&self) -> &dyn Any {
self
}
}
43 changes: 43 additions & 0 deletions crates/fluvio-smartengine/src/wasmedge_engine/memory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use anyhow::{anyhow, Error, Result};
use wasmedge_sdk::{types::Val, Engine, Instance};

const ALLOC_FN: &str = "alloc";
const MEMORY: &str = "memory";
// const ARRAY_SUM_FN: &str = "array_sum";
// const UPPER_FN: &str = "upper";
// const DEALLOC_FN: &str = "dealloc";

/// Copy a byte array into an instance's linear memory
/// and return the offset relative to the module's memory.
pub(crate) fn copy_memory_to_instance(
engine: &mut impl Engine,
instance: &Instance,
bytes: &[u8],
) -> Result<i32, Error> {
// Get the "memory" export of the module.
// If the module does not export it, just panic,
// since we are not going to be able to copy the data.
let mut memory = instance
.memory(MEMORY)
.ok_or_else(|| anyhow!("Missing memory"))?;

// The module is not using any bindgen libraries,
// so it should export its own alloc function.
//
// Get the guest's exported alloc function, and call it with the
// length of the byte array we are trying to copy.
// The result is an offset relative to the module's linear memory,
// which is used to copy the bytes into the module's memory.
// Then, return the offset.
let alloc = instance
.func(ALLOC_FN)
.ok_or_else(|| anyhow!("missing alloc"))?;

let alloc_result = alloc.call(engine, [Val::I32(bytes.len() as i32).into()])?;

let guest_ptr_offset = alloc_result[0].to_i32();

memory.write(bytes, guest_ptr_offset as u32)?;

Ok(guest_ptr_offset)
}
Loading

0 comments on commit 6d81878

Please sign in to comment.