Skip to content

Commit

Permalink
fix: workflow of entire system (#254)
Browse files Browse the repository at this point in the history
  • Loading branch information
gangliao authored Apr 22, 2021
1 parent 05e3fe0 commit 9943a00
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 23 deletions.
2 changes: 1 addition & 1 deletion benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ driver = { path = "../src/driver" }
env_logger = "^0.8"
futures = "0.3"
lazy_static = "1.4"

log = "0.4.14"
runtime = { path = "../src/runtime" }
rusoto_core = "0.46.0"
rusoto_lambda = "0.46.0"
Expand Down
54 changes: 37 additions & 17 deletions benchmarks/nexmark/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ use arrow::record_batch::RecordBatch;
use datafusion::datasource::MemTable;
use driver::deploy::lambda;
use lazy_static::lazy_static;
use log::info;
use nexmark::config::Config;
use nexmark::event::{Auction, Bid, Person};
use nexmark::NexMarkSource;
use runtime::prelude::*;
use rusoto_core::Region;
use rusoto_lambda::{
CreateFunctionRequest, InvocationRequest, InvocationResponse, Lambda, LambdaClient,
CreateFunctionRequest, GetFunctionRequest, InvocationRequest, InvocationResponse, Lambda,
LambdaClient,
};
use std::sync::Arc;
use structopt::StructOpt;
Expand Down Expand Up @@ -111,26 +113,30 @@ async fn benchmark(opt: NexmarkBenchmarkOpt) -> Result<()> {
};

// create lambda function based on the generic lambda function code on AWS S3.
let function_arn = create_lambda_function(&lambda_ctx).await?;
let _events = nexmark.generate_data()?;
let func_arn = create_lambda_function(&lambda_ctx).await?;
info!("[OK] Create lambda function {}.", func_arn);

let events = nexmark.generate_data()?;
info!("[OK] Generate nexmark events.");
// TODO:
// 1. out-of-order epoch-based stream processing
// 2. async io programming
//
// let events = events.select(0, 1).ok_or_else(|| {
// SquirtleError::Execution("Failed to select events for Nexmark
// benchmark.".to_string()) })?;

// serialization
// let en_events = serde_json::to_vec(&events).unwrap();

if let StreamWindow::None = nexmark.window {
let response = invoke_lambda_function(function_arn, vec![]).await?;
if opt.debug {
println!(
"{}",
serde_json::from_slice::<String>(&response.payload.unwrap())?
);
for i in 0..opt.seconds {
for j in 0..opt.generators {
let event = events.select(i, j).ok_or_else(|| {
SquirtleError::Execution("Event selection failed.".to_string())
})?;
info!("[OK] Send nexmark event (time: {}, source: {}).", i, j);
let response =
invoke_lambda_function(func_arn.clone(), serde_json::to_vec(&event)?).await?;
if opt.debug {
println!(
"{}",
serde_json::from_slice::<String>(&response.payload.unwrap())?
);
}
}
}
} else {
unimplemented!();
Expand Down Expand Up @@ -164,6 +170,20 @@ async fn invoke_lambda_function(

/// Creates a single lambda function using bootstrap.zip in Amazon S3.
async fn create_lambda_function(ctx: &ExecutionContext) -> Result<String> {
if let Ok(reponse) = LAMBDA_CLIENT
.get_function(GetFunctionRequest {
function_name: ctx.name.clone(),
..Default::default()
})
.await
{
if let Some(config) = reponse.configuration {
return config.function_arn.ok_or_else(|| {
SquirtleError::Internal("Lambda function arn dones't exist.".to_string())
});
}
}

match LAMBDA_CLIENT
.create_function(CreateFunctionRequest {
code: lambda::nexmark_function_code(),
Expand Down
5 changes: 3 additions & 2 deletions src/bin/cli/src/repl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,9 @@ pub fn put_object_to_s3(bucket: &str, key: &str, obj_path: &str) -> Result<(), E
let fname = Path::new(obj_path).parent().unwrap().join("bootstrap.zip");
let w = std::fs::File::create(&fname)?;
let mut zip = zip::ZipWriter::new(w);
let options =
zip::write::FileOptions::default().compression_method(zip::CompressionMethod::Bzip2);
let options = zip::write::FileOptions::default()
.compression_method(zip::CompressionMethod::Bzip2)
.unix_permissions(777);
zip.start_file("bootstrap", options)?;
zip.write_all(&fs::read(&obj_path)?)?;
zip.finish()?;
Expand Down
6 changes: 3 additions & 3 deletions src/driver/src/deploy/lambda.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ pub fn function_code() -> FunctionCode {
FunctionCode {
s3_bucket: Some(LAMBDA_DEPLOYMENT_PACKAGE.s3_bucket.to_owned()),
s3_key: Some(LAMBDA_DEPLOYMENT_PACKAGE.s3_key.to_owned()),
s3_object_version: Some(LAMBDA_DEPLOYMENT_PACKAGE.s3_object_version.to_owned()),
s3_object_version: None,
zip_file: None,
image_uri: None,
}
Expand All @@ -119,7 +119,7 @@ pub fn nexmark_function_code() -> FunctionCode {
FunctionCode {
s3_bucket: Some(NEXMARK_DEPLOYMENT_PACKAGE.s3_bucket.to_owned()),
s3_key: Some(NEXMARK_DEPLOYMENT_PACKAGE.s3_key.to_owned()),
s3_object_version: Some(NEXMARK_DEPLOYMENT_PACKAGE.s3_object_version.to_owned()),
s3_object_version: None,
zip_file: None,
image_uri: None,
}
Expand Down Expand Up @@ -186,7 +186,7 @@ pub fn runtime() -> Option<String> {
/// function. The format includes the file name. It can also include namespaces
/// and other qualifiers, depending on the runtime.
pub fn handler() -> Option<String> {
Some("doesn't matter".to_owned())
Some("handler".to_owned())
}

/// The amount of memory that your function has access to. Increasing the
Expand Down
4 changes: 4 additions & 0 deletions src/function/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,7 @@ path = "src/aws/lambda.rs"
[[bin]]
name = "nexmark_lambda"
path = "src/aws/nexmark/lambda.rs"

[[bin]]
name = "hello"
path = "src/aws/hello.rs"
48 changes: 48 additions & 0 deletions src/function/src/aws/hello.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2020 UMD Database Group. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use lambda_runtime::{handler_fn, Context};
use serde_json::Value;

type Error = Box<dyn std::error::Error + Sync + Send + 'static>;

async fn handler(event: Value, _: Context) -> Result<Value, Error> {
Ok(event)
}

#[tokio::main]
async fn main() -> Result<(), Error> {
lambda_runtime::run(handler_fn(handler)).await?;
Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;

#[tokio::test]
async fn handler_handles() {
let event = json!({
"answer": 42
});

assert_eq!(
handler(event.clone(), Context::default())
.await
.expect("expected Ok(_) value"),
event
)
}
}

0 comments on commit 9943a00

Please sign in to comment.