Skip to content

Commit

Permalink
Address code review comments from #4
Browse files Browse the repository at this point in the history
  • Loading branch information
xianwill committed Mar 18, 2021
1 parent c525c07 commit 4204571
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 35 deletions.
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ clap = { version = ">=3.0.0-beta.2,<4", features = ["color"] }
crossbeam = { version = "0" }
env_logger = "0"
futures = "0.3"
# The "sync" feature is undocumented but required in order to swap Rc for Arc
# in the crate, allowing it to be used with futures and threads properly
jmespath = { git = "https://github.com/jmespath/jmespath.rs", features = ["sync"] }
# The "sync" feature is undocumented but required in order to swap Rc for Arc
# in the crate, allowing it to be used with futures and threads properly
# jmespath = { git = "https://github.com/jmespath/jmespath.rs", features = ["sync"] }
jmespatch = { version = "0.3", features = ["sync"] }
lazy_static = "1"
log = "0"
rdkafka = "~0.25.0"
Expand Down
2 changes: 1 addition & 1 deletion README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ link:https://github.com/delta-io/delta-rs[delta-rs] bindings.
* Run kafka-delta-ingest (with a short 10s allowed_latency):

```bash
RUST_LOG=debug cargo run ingest example ./tests/data/example --allowed_latency 10 -t "modified_date: substr(modified,'0','10') "kafka_offset: kafka.offset"
RUST_LOG=debug cargo run ingest example ./tests/data/example --allowed_latency 10 -t 'modified_date: substr(modified,`0`,`10`)' 'kafka_offset: kafka.offset'
```
* In separate shell, produce messages to `example` topic, e.g.:

Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async fn main() -> anyhow::Result<()> {
(@arg ALLOWED_LATENCY: -l --allowed_latency +takes_value default_value("300") "The allowed latency (in seconds) from the time a message is consumed to when it should be written to Delta.")
(@arg MAX_MESSAGES_PER_BATCH: -m --max_messages_per_batch +takes_value default_value("5000") "The maximum number of rows allowed in a parquet row group. This should approximate the number of bytes described by MIN_BYTES_PER_FILE.")
(@arg MIN_BYTES_PER_FILE: -b --min_bytes_per_file +takes_value default_value("134217728") "The target minimum file size (in bytes) for each Delta file. File size may be smaller than this value if ALLOWED_LATENCY does not allow enough time to accumulate the specified number of bytes.")
(@arg TRANSFORM: -t --transform +multiple +takes_value "A list of transforms to apply to each Kafka message.")
(@arg TRANSFORM: -t --transform +multiple +takes_value "A list of transforms to apply to each Kafka message. Each transform should follow the pattern \"property:query\". For example `-t \"modified_date:substr(modified,`0`,`10`)\" \"kafka_offset:kafka.offset\"`.")
)
)
.setting(AppSettings::SubcommandRequiredElseHelp)
Expand Down
115 changes: 90 additions & 25 deletions src/transforms.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use jmespath::{
use jmespatch::{
functions::{ArgumentType, CustomFunction, Signature},
Context, Expression, Rcvar, Runtime,
Context, ErrorReason, Expression, JmespathError, Rcvar, Runtime, RuntimeError, Variable,
};
use rdkafka::Message;
use serde_json::Value;
use std::collections::HashMap;
use std::convert::TryFrom;
use std::sync::Arc;

#[derive(thiserror::Error, Debug)]
Expand All @@ -15,7 +16,7 @@ pub enum TransformError {
#[error("JmespathError: {source}")]
JmesPath {
#[from]
source: jmespath::JmespathError,
source: JmespathError,
},

#[error("serde_json::Error: {source}")]
Expand All @@ -25,16 +26,52 @@ pub enum TransformError {
},
}

// Error thrown from custom functions registered in the jmespath Runtime
struct InvalidTypeError {
expression: String,
offset: usize,
expected: String,
actual: String,
position: usize,
}

// From impl for InvalidTypeError so we can return these from within custom functions
impl From<InvalidTypeError> for JmespathError {
fn from(err: InvalidTypeError) -> Self {
JmespathError::new(
err.expression.as_str(),
err.offset,
ErrorReason::Runtime(RuntimeError::InvalidType {
expected: err.expected,
actual: err.actual,
position: err.position,
}),
)
}
}

impl InvalidTypeError {
fn new(context: &Context, expected: &str, actual: String, position: usize) -> Self {
Self {
expression: context.expression.to_owned(),
offset: context.offset,
expected: expected.to_owned(),
actual: actual.to_string(),
position: position,
}
}
}

lazy_static! {
pub(crate) static ref TRANSFORM_RUNTIME: Runtime = {
static ref TRANSFORM_RUNTIME: Runtime = {
let mut runtime = Runtime::new();
runtime.register_builtin_functions();
runtime.register_function("substr", Box::new(custom_substr()));
runtime.register_function("substr", Box::new(create_substr_fn()));
runtime
};
}

pub(crate) fn compile_transforms(
pub fn compile_transforms(
definitions: &HashMap<String, String>,
) -> Result<HashMap<String, MessageTransform>, TransformError> {
let mut transforms = HashMap::new();
Expand All @@ -47,7 +84,6 @@ pub(crate) fn compile_transforms(
"kafka.timestamp" => MessageTransform::KafkaMetaTransform(KafkaMetaProperty::Timestamp),
_ => {
let expression = TRANSFORM_RUNTIME.compile(v.as_str())?;
let expression = expression;

MessageTransform::ExpressionTransform(expression)
}
Expand All @@ -59,25 +95,58 @@ pub(crate) fn compile_transforms(
Ok(transforms)
}

fn custom_substr() -> CustomFunction {
// Returns a Jmespath CustomFunction for selecting substrings from a string.
// This function can be registered and used within a Jmespath runtime.
//
// Logically the function signature in Rust would be:
//
// ```
// fn substr(path: Expression, skip: i32, take: i32) -> Value;
// ```
//
// For example given the object:
//
// ```
// {
// "name": "William"
// }
// ```
//
// and the expression:
//
// ```
// substr(name,`0`,`4`)
// ```
//
// the value returned will be "Will"
//
fn create_substr_fn() -> CustomFunction {
CustomFunction::new(
Signature::new(
vec![
ArgumentType::String,
ArgumentType::String,
ArgumentType::String,
ArgumentType::Number,
ArgumentType::Number,
],
None,
),
Box::new(|args: &[Rcvar], _context: &mut Context| {
let s = args[0].as_string().unwrap();

let start = args[1].as_string().unwrap().parse::<usize>().unwrap();
let end = args[2].as_string().unwrap().parse::<usize>().unwrap();
Box::new(|args: &[Rcvar], context: &mut Context| {
let s = args[0].as_string().ok_or_else(|| {
InvalidTypeError::new(context, "string", args[0].get_type().to_string(), 0)
})?;

let start = args[1].as_number().ok_or_else(|| {
InvalidTypeError::new(context, "number", args[0].get_type().to_string(), 1)
})? as usize;
let end = args[2].as_number().ok_or_else(|| {
InvalidTypeError::new(context, "number", args[0].get_type().to_string(), 2)
})? as usize;

let s: String = s.chars().skip(start).take(end).collect();

let var = jmespath::Variable::from(serde_json::Value::String(s));
let val = serde_json::Value::String(s);

let var = Variable::try_from(val)?;

Ok(Arc::new(var))
}),
Expand All @@ -96,24 +165,20 @@ pub enum MessageTransform {
ExpressionTransform(Expression<'static>),
}

pub(crate) struct TransformContext {
pub struct TransformContext {
transforms: HashMap<String, MessageTransform>,
}

impl TransformContext {
pub(crate) fn new(transforms: HashMap<String, MessageTransform>) -> Self {
pub fn new(transforms: HashMap<String, MessageTransform>) -> Self {
Self { transforms }
}

pub(crate) fn transform<M>(
&self,
value: &mut Value,
kafka_message: &M,
) -> Result<(), TransformError>
pub fn transform<M>(&self, value: &mut Value, kafka_message: &M) -> Result<(), TransformError>
where
M: Message,
{
let data = jmespath::Variable::from(value.clone());
let data = Variable::try_from(value.clone())?;

match value.as_object_mut() {
Some(map) => {
Expand Down Expand Up @@ -180,7 +245,7 @@ mod tests {

transforms.insert(
"modified_date".to_string(),
"substr(modified, '0', '10')".to_string(),
"substr(modified, `0`, `10`)".to_string(),
);

let expressions = compile_transforms(&transforms).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async fn e2e_smoke_test() {
let mut transforms = HashMap::new();
transforms.insert(
"modified_date".to_string(),
"substr(modified, '0', '10')".to_string(),
"substr(modified, `0`, `10`)".to_string(),
);
transforms.insert("_kafka_offset".to_string(), "kafka.offset".to_string());

Expand Down

0 comments on commit 4204571

Please sign in to comment.