Skip to content

Commit

Permalink
Added avro validation
Browse files Browse the repository at this point in the history
  • Loading branch information
jefbarn committed Jul 28, 2021
1 parent cb8dec4 commit 230a467
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 14 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pgx-macros = "0.1.20"
serde_json = "1.0.59"
jsonschema = "0.12.0"
jtd = "0.3"
avro-rs = "0.13.0"

[dev-dependencies]
pgx-tests = "0.1.20"
Expand Down
1 change: 1 addition & 0 deletions sql/load-order.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
json_schema.generated.sql
json_type_def.generated.sql
avro.generated.sql
99 changes: 99 additions & 0 deletions src/avro.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use avro_rs::types::Record;
use avro_rs::{Schema, Writer};
use pgx::*;

#[pg_extern]
fn avro_is_valid(schema: JsonB, instance: JsonB) -> bool {
let parsed_schema =
Schema::parse(&schema.0).unwrap_or_else(|e| panic!("Error parsing schema: {:?}", e));
let mut writer = Writer::new(&parsed_schema, Vec::new());

let val;
if schema.0["type"] == "record" {
let mut record = Record::new(&parsed_schema).unwrap();
for (k, v) in instance.0.as_object().unwrap() {
record.put(k, avro_rs::types::Value::from(v.clone()))
}
val = avro_rs::types::Value::from(record)
} else {
val = avro_rs::types::Value::from(instance.0);
}

writer.append(val).is_ok()
}

#[cfg(any(test, feature = "pg_test"))]
mod tests {
use pgx::*;
use std::panic;

#[pg_test]
fn test_avro_is_valid_bad_schema() {
let result = panic::catch_unwind(|| {
let _valid = Spi::get_one::<bool>(
r#"
select avro_is_valid('{
"type": "record",
"name": "test",
"fields": [
{"name": "a", "type": "number", "default": 42},
{"name": "b", "type": "string"}
]
}'::jsonb, '{
"a": 42,
"b": "foo"
}'::jsonb)"#,
);
});
assert!(result.is_err());
}

#[pg_test]
fn test_avro_is_valid_true() {
let valid = Spi::get_one::<bool>(
r#"
select avro_is_valid('{
"type": "record",
"name": "test",
"fields": [
{"name": "a", "type": "long", "default": 42},
{"name": "b", "type": "string"}
]
}'::jsonb, '{
"a": 27,
"b": "foo"
}'::jsonb)"#,
);
assert_eq!(valid, Some(true))
}

#[pg_test]
fn test_avro_is_valid_false() {
let valid = Spi::get_one::<bool>(
r#"
select avro_is_valid('{
"type": "record",
"name": "test",
"fields": [
{"name": "a", "type": "long", "default": 42},
{"name": "b", "type": "string"}
]
}'::jsonb, '{
"a": "27",
"b": "foo"
}'::jsonb)"#,
);
assert_eq!(valid, Some(false))
}

// #[pg_test]
// fn test_avro_get_errors() {
// let (_value, description) = Spi::get_two::<JsonB, String>(
// "select * from avro_get_errors('{\"maxLength\": 5}', '\"foobar\"'::jsonb)",
// );
// assert_eq!(
// description,
// Some("\"foobar\" is longer than 5 characters".to_string())
// )
// }
}
4 changes: 2 additions & 2 deletions src/json_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ fn json_schema_get_errors(
name!(schema_path, String),
),
> {
let compiled = JSONSchema::compile(&schema.0)
let parsed_schema = JSONSchema::compile(&schema.0)
.unwrap_or_else(|err| panic!("Error compiling schema: {:#?}", err));

let result = compiled
let result = parsed_schema
.validate(&instance.0)
.err()
.into_iter()
Expand Down
20 changes: 8 additions & 12 deletions src/json_type_def.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use jtd::Schema;
use pgx::*;
use serde_json::json;

#[pg_extern]
fn jtd_is_valid(schema: JsonB, instance: JsonB) -> bool {
let schema = Schema::from_serde_schema(serde_json::from_value(schema.0).unwrap()).unwrap();
let result = jtd::validate(&schema, &instance.0, Default::default()).unwrap();
let parsed_schema =
Schema::from_serde_schema(serde_json::from_value(schema.0).unwrap()).unwrap();
let result = jtd::validate(&parsed_schema, &instance.0, Default::default()).unwrap();

result.is_empty()
}
Expand All @@ -14,16 +14,12 @@ fn jtd_is_valid(schema: JsonB, instance: JsonB) -> bool {
fn jtd_get_errors(
schema: JsonB,
instance: JsonB,
) -> impl std::iter::Iterator<
Item = (
name!(instance_path, String),
name!(schema_path, String),
),
> {
let schema = Schema::from_serde_schema(serde_json::from_value(schema.0).unwrap()).unwrap();
let result = jtd::validate(&schema, &instance.0, Default::default()).unwrap();
) -> impl std::iter::Iterator<Item = (name!(instance_path, String), name!(schema_path, String))> {
let parsed_schema =
Schema::from_serde_schema(serde_json::from_value(schema.0).unwrap()).unwrap();
let result = jtd::validate(&parsed_schema, &instance.0, Default::default()).unwrap();

let new: Vec<(JsonB, String, String, String, String)> = result
let new: Vec<(String, String)> = result
.into_iter()
.map(|e| {
let (instance_path, schema_path) = e.into_owned_paths();
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod json_schema;
mod json_type_def;
mod avro;


use pgx::*;
Expand Down

0 comments on commit 230a467

Please sign in to comment.