forked from infinyon/fluvio
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
8 changed files
with
588 additions
and
20 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
pub struct SmartModuleInit {} |
223 changes: 223 additions & 0 deletions
223
crates/fluvio-smartengine/src/wasmedge_engine/instance.rs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,223 @@ | ||
use fluvio_protocol::{Decoder, Encoder}; | ||
use tracing::debug; | ||
use wasmedge_sdk::error::HostFuncError; | ||
use wasmedge_sdk::types::Val; | ||
use wasmedge_sdk::{ | ||
host_function, Caller, CallingFrame, Engine, Executor, Func, ImportObjectBuilder, Instance, | ||
Memory, Module, Store, WasmValue, | ||
}; | ||
|
||
use super::init::SmartModuleInit; | ||
use crate::error::EngineError; | ||
use crate::metrics::SmartModuleChainMetrics; | ||
use crate::wasmedge_engine::memory; | ||
use crate::{config::*, WasmSlice}; | ||
use anyhow::Result; | ||
use fluvio_smartmodule::dataplane::smartmodule::{SmartModuleInput, SmartModuleOutput}; | ||
use std::any::Any; | ||
use std::fmt::{self, Debug}; | ||
use std::sync::{Arc, Mutex}; | ||
|
||
pub struct SmartModuleInstance { | ||
ctx: SmartModuleInstanceContext, | ||
transform: Box<dyn DowncastableTransform>, | ||
// init: Option<SmartModuleInit>, | ||
} | ||
|
||
impl SmartModuleInstance { | ||
#[cfg(test)] | ||
#[allow(clippy::borrowed_box)] | ||
pub(crate) fn transform(&self) -> &Box<dyn DowncastableTransform> { | ||
&self.transform | ||
} | ||
|
||
#[cfg(test)] | ||
pub(crate) fn get_init(&self) -> &Option<SmartModuleInit> { | ||
&None | ||
} | ||
|
||
pub(crate) fn new( | ||
ctx: SmartModuleInstanceContext, | ||
// init: Option<SmartModuleInit>, | ||
transform: Box<dyn DowncastableTransform>, | ||
) -> Self { | ||
Self { | ||
ctx, | ||
// init, | ||
transform, | ||
} | ||
} | ||
|
||
pub(crate) fn process( | ||
&mut self, | ||
input: SmartModuleInput, | ||
engine: &mut Executor, | ||
) -> Result<SmartModuleOutput> { | ||
self.transform.process(input, &mut self.ctx, engine) | ||
} | ||
} | ||
|
||
pub struct SmartModuleInstanceContext { | ||
instance: Instance, | ||
records_cb: Arc<RecordsCallBack>, | ||
// params: SmartModuleExtraParams, | ||
version: i16, | ||
} | ||
|
||
impl SmartModuleInstanceContext { | ||
/// get wasm function from instance | ||
pub(crate) fn get_wasm_func(&self, name: &str) -> Option<Func> { | ||
self.instance.func(name) | ||
} | ||
|
||
/// instantiate new module instance that contain context | ||
pub(crate) fn instantiate( | ||
store: &mut Store, | ||
executor: &mut Executor, | ||
module: Module, | ||
// params: SmartModuleExtraParams, | ||
version: i16, | ||
) -> Result<Self, EngineError> { | ||
debug!("creating WasmModuleInstance"); | ||
let cb = Arc::new(RecordsCallBack::new()); | ||
let records_cb = cb.clone(); | ||
|
||
// See crates/fluvio-smartmodule-derive/src/generator/transform.rs for copy_records | ||
let copy_records_fn = move |caller: CallingFrame, | ||
inputs: Vec<WasmValue>| | ||
-> Result<Vec<WasmValue>, HostFuncError> { | ||
assert_eq!(inputs.len(), 2); | ||
let ptr = inputs[0].to_i32() as u32; | ||
let len = inputs[1].to_i32() as u32; | ||
|
||
debug!(len, "callback from wasm filter"); | ||
let caller = Caller::new(caller); | ||
let memory = caller.memory(0).unwrap(); | ||
|
||
let records = RecordsMemory { ptr, len, memory }; | ||
cb.set(records); | ||
Ok(vec![]) | ||
}; | ||
|
||
let import = ImportObjectBuilder::new() | ||
.with_func::<(i32, i32), ()>("copy_records", copy_records_fn) | ||
.map_err(|e| EngineError::Instantiate(e.into()))? | ||
.build("env") | ||
.map_err(|e| EngineError::Instantiate(e.into()))?; | ||
|
||
debug!("instantiating WASMtime"); | ||
store | ||
.register_import_module(executor, &import) | ||
.map_err(|e| EngineError::Instantiate(e.into()))?; | ||
let instance = store | ||
.register_active_module(executor, &module) | ||
.map_err(|e| EngineError::Instantiate(e.into()))?; | ||
|
||
// This is a hack to avoid them being dropped | ||
// FIXME: manage their lifetimes | ||
std::mem::forget(import); | ||
std::mem::forget(module); | ||
|
||
Ok(Self { | ||
instance, | ||
records_cb, | ||
// params, | ||
version, | ||
}) | ||
} | ||
|
||
pub(crate) fn write_input<E: Encoder>( | ||
&mut self, | ||
input: &E, | ||
engine: &mut impl Engine, | ||
) -> Result<Vec<WasmValue>> { | ||
self.records_cb.clear(); | ||
let mut input_data = Vec::new(); | ||
input.encode(&mut input_data, self.version)?; | ||
debug!( | ||
len = input_data.len(), | ||
version = self.version, | ||
"input encoded" | ||
); | ||
let array_ptr = memory::copy_memory_to_instance(engine, &self.instance, &input_data)?; | ||
let length = input_data.len(); | ||
Ok(vec![ | ||
Val::I32(array_ptr as i32).into(), | ||
Val::I32(length as i32).into(), | ||
Val::I32(self.version as i32).into(), | ||
]) | ||
} | ||
|
||
pub(crate) fn read_output<D: Decoder + Default>(&mut self) -> Result<D> { | ||
let bytes = self | ||
.records_cb | ||
.get() | ||
.and_then(|m| m.copy_memory_from().ok()) | ||
.unwrap_or_default(); | ||
let mut output = D::default(); | ||
output.decode(&mut std::io::Cursor::new(bytes), self.version)?; | ||
Ok(output) | ||
} | ||
} | ||
|
||
// TODO: revise later to see whether Clone is necessary | ||
#[derive(Clone)] | ||
pub struct RecordsMemory { | ||
ptr: u32, | ||
len: u32, | ||
memory: Memory, | ||
} | ||
|
||
impl RecordsMemory { | ||
fn copy_memory_from(&self) -> Result<Vec<u8>> { | ||
let mut bytes = self.memory.read(self.ptr, self.len)?; | ||
Ok(bytes) | ||
} | ||
} | ||
|
||
pub struct RecordsCallBack(Mutex<Option<RecordsMemory>>); | ||
|
||
impl RecordsCallBack { | ||
pub(crate) fn new() -> Self { | ||
Self(Mutex::new(None)) | ||
} | ||
|
||
pub(crate) fn set(&self, records: RecordsMemory) { | ||
let mut write_inner = self.0.lock().unwrap(); | ||
write_inner.replace(records); | ||
} | ||
|
||
pub(crate) fn clear(&self) { | ||
let mut write_inner = self.0.lock().unwrap(); | ||
write_inner.take(); | ||
} | ||
|
||
pub(crate) fn get(&self) -> Option<RecordsMemory> { | ||
let reader = self.0.lock().unwrap(); | ||
reader.clone() | ||
} | ||
} | ||
|
||
pub(crate) trait SmartModuleTransform: Send + Sync { | ||
/// transform records | ||
fn process( | ||
&mut self, | ||
input: SmartModuleInput, | ||
ctx: &mut SmartModuleInstanceContext, | ||
engine: &mut Executor, | ||
) -> Result<SmartModuleOutput>; | ||
|
||
/// return name of transform, this is used for identifying transform and debugging | ||
fn name(&self) -> &str; | ||
} | ||
|
||
// In order turn to any, need following magic trick | ||
pub(crate) trait DowncastableTransform: SmartModuleTransform + Any { | ||
fn as_any(&self) -> &dyn Any; | ||
} | ||
|
||
impl<T: SmartModuleTransform + Any> DowncastableTransform for T { | ||
fn as_any(&self) -> &dyn Any { | ||
self | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
use anyhow::{anyhow, Error, Result}; | ||
use wasmedge_sdk::{types::Val, Engine, Instance}; | ||
|
||
const ALLOC_FN: &str = "alloc"; | ||
const MEMORY: &str = "memory"; | ||
// const ARRAY_SUM_FN: &str = "array_sum"; | ||
// const UPPER_FN: &str = "upper"; | ||
// const DEALLOC_FN: &str = "dealloc"; | ||
|
||
/// Copy a byte array into an instance's linear memory | ||
/// and return the offset relative to the module's memory. | ||
pub(crate) fn copy_memory_to_instance( | ||
engine: &mut impl Engine, | ||
instance: &Instance, | ||
bytes: &[u8], | ||
) -> Result<i32, Error> { | ||
// Get the "memory" export of the module. | ||
// If the module does not export it, just panic, | ||
// since we are not going to be able to copy the data. | ||
let mut memory = instance | ||
.memory(MEMORY) | ||
.ok_or_else(|| anyhow!("Missing memory"))?; | ||
|
||
// The module is not using any bindgen libraries, | ||
// so it should export its own alloc function. | ||
// | ||
// Get the guest's exported alloc function, and call it with the | ||
// length of the byte array we are trying to copy. | ||
// The result is an offset relative to the module's linear memory, | ||
// which is used to copy the bytes into the module's memory. | ||
// Then, return the offset. | ||
let alloc = instance | ||
.func(ALLOC_FN) | ||
.ok_or_else(|| anyhow!("missing alloc"))?; | ||
|
||
let alloc_result = alloc.call(engine, [Val::I32(bytes.len() as i32).into()])?; | ||
|
||
let guest_ptr_offset = alloc_result[0].to_i32(); | ||
|
||
memory.write(bytes, guest_ptr_offset as u32)?; | ||
|
||
Ok(guest_ptr_offset) | ||
} |
Oops, something went wrong.