Skip to content

Commit

Permalink
refactor: improve modularity of smartengine (#3152)
Browse files Browse the repository at this point in the history
This PR is part of #3151 (see more context there), containing only trivial code reorganizations, without any actual changes.

To summarize:
- a new `engine` mod, corresponding to the feature `engine`. The implementations are moved to `engine::wasmtime_engine`.
- There are some reusable code not coupled with the engine. So I extracted them into a new module `config`.
  • Loading branch information
xxchan committed Apr 17, 2023
1 parent d0857c2 commit 5f8149d
Show file tree
Hide file tree
Showing 21 changed files with 141 additions and 129 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/fluvio-smartengine/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fluvio-smartengine"
version = "0.7.2"
version = "0.7.3"
edition = "2021"
license = "Apache-2.0"
authors = ["Fluvio Contributors <team@fluvio.io>"]
Expand Down
73 changes: 73 additions & 0 deletions crates/fluvio-smartengine/src/engine/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use derive_builder::Builder;
use fluvio_smartmodule::dataplane::smartmodule::SmartModuleExtraParams;

const DEFAULT_SMARTENGINE_VERSION: i16 = 17;

/// Initial seed data to passed, this will be send back as part of the output
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum SmartModuleInitialData {
None,
Aggregate { accumulator: Vec<u8> },
}

impl SmartModuleInitialData {
pub fn with_aggregate(accumulator: Vec<u8>) -> Self {
Self::Aggregate { accumulator }
}
}

impl Default for SmartModuleInitialData {
fn default() -> Self {
Self::None
}
}

/// SmartModule configuration
#[derive(Builder)]
pub struct SmartModuleConfig {
#[builder(default, setter(strip_option))]
pub(crate) initial_data: SmartModuleInitialData,
#[builder(default)]
pub(crate) params: SmartModuleExtraParams,
// this will be deprecated in the future
#[builder(default, setter(into, strip_option))]
pub(crate) version: Option<i16>,
}

impl SmartModuleConfigBuilder {
/// add initial parameters
pub fn param(&mut self, key: impl Into<String>, value: impl Into<String>) -> &mut Self {
let mut new = self;
let mut params = new.params.take().unwrap_or_default();
params.insert(key.into(), value.into());
new.params = Some(params);
new
}
}

impl SmartModuleConfig {
pub fn builder() -> SmartModuleConfigBuilder {
SmartModuleConfigBuilder::default()
}

pub(crate) fn version(&self) -> i16 {
self.version.unwrap_or(DEFAULT_SMARTENGINE_VERSION)
}
}

#[cfg(feature = "transformation")]
impl From<crate::transformation::TransformationStep> for SmartModuleConfig {
fn from(step: crate::transformation::TransformationStep) -> Self {
Self {
initial_data: SmartModuleInitialData::None,
params: step
.with
.into_iter()
.map(|(k, v)| (k, v.into()))
.collect::<std::collections::BTreeMap<String, String>>()
.into(),
version: None,
}
}
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
17 changes: 17 additions & 0 deletions crates/fluvio-smartengine/src/engine/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
pub mod metrics;
/// SmartModule configuration
mod config;
pub use config::{
SmartModuleConfig, SmartModuleConfigBuilder, SmartModuleConfigBuilderError,
SmartModuleInitialData,
};
mod error;

#[cfg(test)]
mod fixture;

pub type WasmSlice = (i32, i32, u32);
pub type Version = i16;

mod wasmtime;
pub use self::wasmtime::{SmartEngine, SmartModuleChainBuilder, SmartModuleChainInstance};
Original file line number Diff line number Diff line change
@@ -1,22 +1,19 @@
use std::fmt::{self, Debug};

use anyhow::Result;
use derive_builder::Builder;
use tracing::debug;
use wasmtime::{Engine, Module};

use fluvio_smartmodule::dataplane::smartmodule::{
SmartModuleExtraParams, SmartModuleInput, SmartModuleOutput,
};
use fluvio_smartmodule::dataplane::smartmodule::{SmartModuleInput, SmartModuleOutput};

use crate::init::SmartModuleInit;
use crate::instance::{SmartModuleInstance, SmartModuleInstanceContext};
use crate::SmartModuleConfig;

use crate::metrics::SmartModuleChainMetrics;
use crate::state::WasmState;
use crate::transforms::create_transform;
use super::init::SmartModuleInit;
use super::instance::{SmartModuleInstance, SmartModuleInstanceContext};

const DEFAULT_SMARTENGINE_VERSION: i16 = 17;
use super::metrics::SmartModuleChainMetrics;
use super::state::WasmState;
use super::transforms::create_transform;

#[derive(Clone)]
pub struct SmartEngine(Engine);
Expand Down Expand Up @@ -155,79 +152,10 @@ impl SmartModuleChainInstance {
}
}

/// Initial seed data to passed, this will be send back as part of the output
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum SmartModuleInitialData {
None,
Aggregate { accumulator: Vec<u8> },
}

impl SmartModuleInitialData {
pub fn with_aggregate(accumulator: Vec<u8>) -> Self {
Self::Aggregate { accumulator }
}
}

impl Default for SmartModuleInitialData {
fn default() -> Self {
Self::None
}
}

/// SmartModule configuration
#[derive(Builder)]
pub struct SmartModuleConfig {
#[builder(default, setter(strip_option))]
initial_data: SmartModuleInitialData,
#[builder(default)]
params: SmartModuleExtraParams,
// this will be deprecated in the future
#[builder(default, setter(into, strip_option))]
version: Option<i16>,
}

impl SmartModuleConfigBuilder {
/// add initial parameters
pub fn param(&mut self, key: impl Into<String>, value: impl Into<String>) -> &mut Self {
let mut new = self;
let mut params = new.params.take().unwrap_or_default();
params.insert(key.into(), value.into());
new.params = Some(params);
new
}
}

impl SmartModuleConfig {
pub fn builder() -> SmartModuleConfigBuilder {
SmartModuleConfigBuilder::default()
}

pub(crate) fn version(&self) -> i16 {
self.version.unwrap_or(DEFAULT_SMARTENGINE_VERSION)
}
}

#[cfg(feature = "transformation")]
impl From<crate::transformation::TransformationStep> for SmartModuleConfig {
fn from(step: crate::transformation::TransformationStep) -> Self {
Self {
initial_data: SmartModuleInitialData::None,
params: step
.with
.into_iter()
.map(|(k, v)| (k, v.into()))
.collect::<std::collections::BTreeMap<String, String>>()
.into(),
version: None,
}
}
}

#[cfg(test)]
mod test {

use super::SmartModuleConfig;
use crate::SmartModuleConfig;

#[test]
fn test_param() {
Expand All @@ -247,15 +175,15 @@ mod chaining_test {

use fluvio_smartmodule::{dataplane::smartmodule::SmartModuleInput, Record};

use crate::{
use super::super::{
SmartEngine, SmartModuleChainBuilder, SmartModuleConfig, SmartModuleInitialData,
metrics::SmartModuleChainMetrics,
};

const SM_FILTER_INIT: &str = "fluvio_smartmodule_filter_init";
const SM_MAP: &str = "fluvio_smartmodule_map";

use crate::fixture::read_wasm_module;
use super::super::fixture::read_wasm_module;

#[ignore]
#[test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use fluvio_smartmodule::dataplane::smartmodule::{
};
use wasmtime::{AsContextMut, TypedFunc};

use crate::instance::SmartModuleInstanceContext;
use super::instance::SmartModuleInstanceContext;

pub(crate) const INIT_FN_NAME: &str = "init";
type WasmInitFn = TypedFunc<(i32, i32, u32), i32>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ use fluvio_smartmodule::dataplane::smartmodule::{
SmartModuleExtraParams, SmartModuleInput, SmartModuleOutput, SmartModuleInitInput,
};

use crate::error::EngineError;
use crate::init::SmartModuleInit;
use crate::{WasmSlice, memory};
use crate::state::WasmState;
use super::error::EngineError;
use super::init::SmartModuleInit;
use super::{WasmSlice, memory};
use super::state::WasmState;

pub(crate) struct SmartModuleInstance {
ctx: SmartModuleInstanceContext,
Expand Down
File renamed without changes.
9 changes: 9 additions & 0 deletions crates/fluvio-smartengine/src/engine/wasmtime/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
pub(crate) mod memory;
pub(crate) mod transforms;
pub(crate) mod init;
pub(crate) mod state;
pub(crate) mod engine;
pub(crate) mod instance;
pub use engine::{SmartEngine, SmartModuleChainBuilder, SmartModuleChainInstance};

use super::*;
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ use fluvio_smartmodule::dataplane::smartmodule::{
SmartModuleInput, SmartModuleOutput, SmartModuleAggregateInput, SmartModuleAggregateOutput,
SmartModuleTransformErrorStatus,
};
use crate::{
use crate::engine::SmartModuleInitialData;
use crate::engine::wasmtime::{
instance::{SmartModuleInstanceContext, SmartModuleTransform},
SmartModuleInitialData,
state::WasmState,
};

Expand Down Expand Up @@ -111,14 +111,14 @@ mod test {
Record,
};

use crate::{
use crate::engine::{
SmartEngine, SmartModuleChainBuilder, SmartModuleConfig, SmartModuleInitialData,
metrics::SmartModuleChainMetrics,
};

const SM_AGGEGRATE: &str = "fluvio_smartmodule_aggregate";

use crate::fixture::read_wasm_module;
use crate::engine::fixture::read_wasm_module;

#[ignore]
#[test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ mod test {
Record,
};

use crate::{
use crate::engine::{
SmartEngine, SmartModuleChainBuilder, SmartModuleConfig, metrics::SmartModuleChainMetrics,
transforms::simple_transform::ARRAY_MAP_FN_NAME,
wasmtime::transforms::simple_transform::ARRAY_MAP_FN_NAME,
};

const SM_ARRAY_MAP: &str = "fluvio_smartmodule_array_map_array";

use crate::fixture::read_wasm_module;
use crate::engine::fixture::read_wasm_module;

#[ignore]
#[test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ mod test {
Record,
};

use crate::{
use crate::engine::{
SmartEngine, SmartModuleChainBuilder, SmartModuleConfig, metrics::SmartModuleChainMetrics,
transforms::simple_transform::FILTER_FN_NAME,
wasmtime::transforms::simple_transform::FILTER_FN_NAME,
};

const SM_FILTER: &str = "fluvio_smartmodule_filter";
const SM_FILTER_INIT: &str = "fluvio_smartmodule_filter_init";

use crate::fixture::read_wasm_module;
use crate::engine::fixture::read_wasm_module;

#[ignore]
#[test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ mod test {
Record,
};

use crate::{
use crate::engine::{
SmartEngine, SmartModuleChainBuilder, SmartModuleConfig, metrics::SmartModuleChainMetrics,
transforms::simple_transform::FILTER_MAP_FN_NAME,
wasmtime::transforms::simple_transform::FILTER_MAP_FN_NAME,
};

const SM_FILTER_MAP: &str = "fluvio_smartmodule_filter_map";

use crate::fixture::read_wasm_module;
use crate::engine::fixture::read_wasm_module;

#[ignore]
#[test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ mod test {
Record,
};

use crate::{
use crate::engine::{
SmartEngine, SmartModuleChainBuilder, SmartModuleConfig, metrics::SmartModuleChainMetrics,
transforms::simple_transform::MAP_FN_NAME,
wasmtime::transforms::simple_transform::MAP_FN_NAME,
};
use crate::fixture::read_wasm_module;
use crate::engine::fixture::read_wasm_module;

const SM_MAP: &str = "fluvio_smartmodule_map";

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
pub(crate) mod filter;
pub(crate) mod map;
pub(crate) mod array_map;
pub(crate) mod filter_map;
pub(crate) mod aggregate;
mod filter;
mod map;
mod array_map;
mod filter_map;
mod aggregate;
pub(crate) use instance::create_transform;
mod simple_transform;

Expand All @@ -11,11 +11,8 @@ mod instance {
use anyhow::{Result};
use wasmtime::AsContextMut;

use crate::{
instance::{SmartModuleInstanceContext, DowncastableTransform},
error::EngineError,
SmartModuleInitialData,
};
use crate::engine::{error::EngineError, SmartModuleInitialData};
use super::super::instance::{SmartModuleInstanceContext, DowncastableTransform};

use super::{
simple_transform::{
Expand Down
Loading

0 comments on commit 5f8149d

Please sign in to comment.