Skip to content

Commit

Permalink
lambda: netmark data source on cloud (#246)
Browse files Browse the repository at this point in the history
* lambda: netmark data source on cloud

* fix: ignore cloud test
  • Loading branch information
gangliao authored Apr 17, 2021
1 parent fcd2096 commit 437144d
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 159 deletions.
81 changes: 26 additions & 55 deletions src/runtime/src/datasource/nexmark/nexmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,36 +54,15 @@ pub struct NexMarkStream {
#[derive(Debug, Default, Serialize, Deserialize, Abomonation, PartialEq, Eq)]
pub struct NexMarkEvent {
/// The encoded Person events.
pub persons: Vec<u8>,
pub persons: Vec<u8>,
/// The encoded Auction events.
pub auctions: Vec<u8>,
pub auctions: Vec<u8>,
/// The encoded Bid events.
pub bids: Vec<u8>,
/// The number of Person events.
pub num_persons: usize,
/// The number of Auction events.
pub num_auctions: usize,
/// The number pf Bid events.
pub num_bids: usize,
pub bids: Vec<u8>,
/// The logical timestamp for the current epoch.
pub epoch: usize,
pub epoch: usize,
/// The data source identifier.
pub source: usize,
}

impl NexMarkEvent {
/// Encoding NexMarkEvent to bytes for network transmission.
pub fn encode(&self, encoding: Encoding) -> Vec<u8> {
let mut bytes = Vec::new();
unsafe {
encode(self, &mut bytes).unwrap();
}
if encoding != Encoding::None {
bytes = encoding.compress(&bytes);
}

bytes
}
pub source: usize,
}

impl NexMarkStream {
Expand All @@ -106,27 +85,24 @@ impl NexMarkStream {
let epoch = Epoch::new(time);

if let Some(map) = self.persons.get(&epoch) {
if let Some((persons, num_persons)) = map.get(&source) {
if let Some((persons, _)) = map.get(&source) {
event.persons = persons.clone();
event.num_persons = *num_persons;
}
}

if let Some(map) = self.auctions.get(&epoch) {
if let Some((auctions, num_auctions)) = map.get(&source) {
if let Some((auctions, _)) = map.get(&source) {
event.auctions = auctions.clone();
event.num_auctions = *num_auctions;
}
}

if let Some(map) = self.bids.get(&epoch) {
if let Some((bids, num_bids)) = map.get(&source) {
if let Some((bids, _)) = map.get(&source) {
event.bids = bids.clone();
event.num_bids = *num_bids;
}
}

if event.num_persons == 0 && event.num_auctions == 0 && event.num_bids == 0 {
if event.persons.is_empty() && event.auctions.is_empty() && event.bids.is_empty() {
return None;
}

Expand Down Expand Up @@ -336,30 +312,25 @@ mod test {
let events = events.select(0, 1).unwrap();

// serialization and compression
let en_events = events.encode(Encoding::Zstd);
let values = serde_json::to_value(events).unwrap();

// decompression and deserialization
unsafe {
let mut bytes = Encoding::Zstd.decompress(&en_events);
if let Some((de_events, _)) = decode::<NexMarkEvent>(&mut bytes) {
assert_eq!(events, *de_events);

let person_schema = Arc::new(Person::schema());
let batches = NexMarkSource::to_batch(&(*de_events).persons, person_schema);
let formatted = arrow::util::pretty::pretty_format_batches(&batches).unwrap();
println!("{}", formatted);

let auction_schema = Arc::new(Auction::schema());
let batches = NexMarkSource::to_batch(&(*de_events).auctions, auction_schema);
let formatted = arrow::util::pretty::pretty_format_batches(&batches).unwrap();
println!("{}", formatted);

let bid_schema = Arc::new(Bid::schema());
let batches = NexMarkSource::to_batch(&(*de_events).bids, bid_schema);
let formatted = arrow::util::pretty::pretty_format_batches(&batches).unwrap();
println!("{}", formatted);
}
}
let de_events: NexMarkEvent = serde_json::from_value(values).unwrap();

let person_schema = Arc::new(Person::schema());
let batches = NexMarkSource::to_batch(&de_events.persons, person_schema);
let formatted = arrow::util::pretty::pretty_format_batches(&batches).unwrap();
println!("{}", formatted);

let auction_schema = Arc::new(Auction::schema());
let batches = NexMarkSource::to_batch(&de_events.auctions, auction_schema);
let formatted = arrow::util::pretty::pretty_format_batches(&batches).unwrap();
println!("{}", formatted);

let bid_schema = Arc::new(Bid::schema());
let batches = NexMarkSource::to_batch(&de_events.bids, bid_schema);
let formatted = arrow::util::pretty::pretty_format_batches(&batches).unwrap();
println!("{}", formatted);

Ok(())
}
Expand Down
12 changes: 7 additions & 5 deletions src/test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,33 @@ keywords = [ "testing", "utility", "fake", "dataset" ]
edition = "2018"

[dependencies]
abomonation = { git = "https://github.com/TimelyDataflow/abomonation", branch = "master" }
arrow = { git = "https://github.com/DSLAM-UMD/arrow", branch = "scq", features = [ "simd" ] }
base64 = "0.13.0"
chrono = { version = "0.4.19", features = [ "serde" ] }
daggy = "0.7.0"
datafusion = { git = "https://github.com/DSLAM-UMD/arrow", branch = "scq", features = [ "simd" ] }
driver = { path = "../driver" }
fake = { version = "2.4", features = [ 'derive', 'chrono' ] }
lambda_runtime = { git = "https://github.com/awslabs/aws-lambda-rust-runtime/", branch = "master" }

rand = "0.8.3"
runtime = { path = "../runtime" }
serde = { version = "1.0", features = [ "derive" ] }
serde_bytes = "0.11"
serde_json = "1.0"

tokio = { version = "1.2", features = [ "macros", "io-util", "sync", "rt-multi-thread" ] }
umd_lambda_runtime = { git = "https://github.com/DSLAM-UMD/aws-lambda-rust-runtime", branch = "scq" }

[dev-dependencies]
aws_lambda_events = "0.4"
rusoto_core = "0.46.0"

rusoto_lambda = "0.46.0"

[lib]
name = "test_utils"
path = "src/lib.rs"
crate-type = [ "lib" ]

[[bin]]
name = "test_umd_runtime"
path = "src/umd_runtime.rs"
name = "test_nextmark"
path = "src/nextmark.rs"
131 changes: 131 additions & 0 deletions src/test/src/nextmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright 2020 UMD Database Group. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use lambda_runtime::{handler_fn, Context};
use nexmark::event::{Auction, Bid, Person};
use nexmark::{NexMarkEvent, NexMarkSource};
use runtime::prelude::*;
use std::sync::Arc;

async fn handler(event: NexMarkEvent, _: Context) -> Result<NexMarkEvent> {
let person_schema = Arc::new(Person::schema());
let batches = NexMarkSource::to_batch(&event.persons, person_schema);
let formatted = arrow::util::pretty::pretty_format_batches(&batches).unwrap();
println!("{}", formatted);

let auction_schema = Arc::new(Auction::schema());
let batches = NexMarkSource::to_batch(&event.auctions, auction_schema);
let formatted = arrow::util::pretty::pretty_format_batches(&batches).unwrap();
println!("{}", formatted);

let bid_schema = Arc::new(Bid::schema());
let batches = NexMarkSource::to_batch(&event.bids, bid_schema);
let formatted = arrow::util::pretty::pretty_format_batches(&batches).unwrap();
println!("{}", formatted);

Ok(event)
}

#[tokio::main]
async fn main() -> Result<()> {
lambda_runtime::run(handler_fn(handler)).await?;
Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
use nexmark::config::Config;
use rusoto_core::Region;
use rusoto_lambda::{InvocationRequest, Lambda, LambdaClient};

#[tokio::test]
async fn umd_lambda_serialization() -> Result<()> {
let mut config = Config::new();
config.insert("threads", 10.to_string());
config.insert("seconds", 1.to_string());
config.insert("events-per-second", 100.to_string());
let nex = NexMarkSource {
config,
..Default::default()
};
let events = nex.generate_data()?;
let events = events.select(0, 1).unwrap();

let ret_events = handler(events, Context::default()).await?;

let person_schema = Arc::new(Person::schema());
let batches = NexMarkSource::to_batch(&ret_events.persons, person_schema);
let formatted = arrow::util::pretty::pretty_format_batches(&batches).unwrap();
println!("{}", formatted);

let auction_schema = Arc::new(Auction::schema());
let batches = NexMarkSource::to_batch(&ret_events.auctions, auction_schema);
let formatted = arrow::util::pretty::pretty_format_batches(&batches).unwrap();
println!("{}", formatted);

let bid_schema = Arc::new(Bid::schema());
let batches = NexMarkSource::to_batch(&ret_events.bids, bid_schema);
let formatted = arrow::util::pretty::pretty_format_batches(&batches).unwrap();
println!("{}", formatted);

Ok(())
}

#[tokio::test]
#[ignore]
async fn umd_serde_on_cloud() -> Result<()> {
let mut config = Config::new();
config.insert("threads", 10.to_string());
config.insert("seconds", 1.to_string());
config.insert("events-per-second", 100.to_string());
let nex = NexMarkSource {
config,
..Default::default()
};
let events = nex.generate_data()?;
let events = events.select(0, 1).unwrap();

// serialization and compression
let en_events = serde_json::to_vec(&events).unwrap();

let client = LambdaClient::new(Region::UsEast1);
let request = InvocationRequest {
function_name: "arn:aws:lambda:us-east-1:942368842860:function:umd_runtime".to_string(),
payload: Some(en_events.into()),
invocation_type: Some("RequestResponse".to_string()),
..Default::default()
};
let response = client.invoke(request).await.unwrap();

// decompression and deserialization
let de_events: NexMarkEvent = serde_json::from_slice(&response.payload.unwrap()).unwrap();
let person_schema = Arc::new(Person::schema());
let batches = NexMarkSource::to_batch(&de_events.persons, person_schema);
let formatted = arrow::util::pretty::pretty_format_batches(&batches).unwrap();
println!("{}", formatted);

let auction_schema = Arc::new(Auction::schema());
let batches = NexMarkSource::to_batch(&de_events.auctions, auction_schema);
let formatted = arrow::util::pretty::pretty_format_batches(&batches).unwrap();
println!("{}", formatted);

let bid_schema = Arc::new(Bid::schema());
let batches = NexMarkSource::to_batch(&de_events.bids, bid_schema);
let formatted = arrow::util::pretty::pretty_format_batches(&batches).unwrap();
println!("{}", formatted);

Ok(())
}
}
99 changes: 0 additions & 99 deletions src/test/src/umd_runtime.rs

This file was deleted.

0 comments on commit 437144d

Please sign in to comment.