Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor!: impl aws launcher - part 2 #427

Merged
merged 2 commits into from
Jan 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 55 additions & 28 deletions flock-function/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
//! library.

extern crate daggy;
use crate::launcher::Launcher;
use crate::launcher::{ExecutionMode, Launcher};
use async_trait::async_trait;
use daggy::NodeIndex;
use datafusion::arrow::record_batch::RecordBatch;
Expand All @@ -28,6 +28,7 @@ use flock::error::Result;
use flock::query::Query;
use flock::runtime::context::*;
use flock::runtime::plan::CloudExecutionPlan;
use log::debug;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
Expand Down Expand Up @@ -91,7 +92,7 @@ impl Launcher for AwsLambdaLauncher {
/// the results to the next lambda function. This greatly simplifies the
/// code size and complexity of the distributed query engine. Meanwhile, the
/// latency is significantly reduced.
async fn execute(&self) -> Result<Vec<RecordBatch>> {
async fn execute(&self, _: ExecutionMode) -> Result<Vec<RecordBatch>> {
unimplemented!();
}
}
Expand All @@ -114,37 +115,61 @@ impl AwsLambdaLauncher {
///
/// This function creates a new context for each query stage in the DAG.
fn create_cloud_contexts(&mut self) -> Result<()> {
let dag = &mut self.dag;
let count = dag.node_count();
assert!(count < 100);

let concurrency = (0..count)
.map(|i| dag.get_node(NodeIndex::new(i)).unwrap().concurrency)
.collect::<Vec<usize>>();
debug!("Creating cloud contexts for both central and distributed query processing.");

// Creates the cloud contexts for the distributed mode
{
let dag = &mut self.dag;
let count = dag.node_count();
assert!(count < 100);

let concurrency = (0..count)
.map(|i| dag.get_node(NodeIndex::new(i)).unwrap().concurrency)
.collect::<Vec<usize>>();

(0..count).rev().for_each(|i| {
let node = dag.get_node_mut(NodeIndex::new(i)).unwrap();
let query_code = self.query_code.as_ref().expect("query code not set");

let next = if i == 0 {
CloudFunction::Sink(self.sink_type.clone())
} else if concurrency[i - 1 /* parent */] == 1 {
CloudFunction::Group((
format!("{}-{:02}", query_code, count - 1 - (i - 1)),
*FLOCK_FUNCTION_CONCURRENCY,
))
} else {
CloudFunction::Lambda(format!("{}-{:02}", query_code, count - 1 - (i - 1)))
};

let ctx = ExecutionContext {
plan: CloudExecutionPlan::new(node.stage.clone(), None),
name: format!("{}-{:02}", query_code, count - 1 - i),
next,
};

node.context = Some(ctx);
});
}

(0..count).rev().for_each(|i| {
let node = dag.get_node_mut(NodeIndex::new(i)).unwrap();
// Creates the cloud contexts for centralized mode
{
let query_code = self.query_code.as_ref().expect("query code not set");

let next = if i == 0 {
CloudFunction::Sink(self.sink_type.clone())
} else if concurrency[i - 1 /* parent */] == 1 {
CloudFunction::Group((
format!("{}-{:02}", query_code, count - 1 - (i - 1)),
let _data_source_ctx = ExecutionContext {
plan: CloudExecutionPlan::new(vec![FLOCK_EMPTY_PLAN.clone()], None),
name: FLOCK_DATA_SOURCE_FUNC_NAME.clone(),
next: CloudFunction::Group((
format!("{}-{:02}", query_code, 0),
*FLOCK_FUNCTION_CONCURRENCY,
))
} else {
CloudFunction::Lambda(format!("{}-{:02}", query_code, count - 1 - (i - 1)))
)),
};

let ctx = ExecutionContext {
plan: CloudExecutionPlan::new(node.stage.clone(), None),
name: format!("{}-{:02}", query_code, count - 1 - i),
next,
let _worker_ctx = ExecutionContext {
// TODO: add option to store the execution plan in S3.
plan: CloudExecutionPlan::new(vec![self.plan.clone()], None),
name: format!("{}-{:02}", query_code, 0),
next: CloudFunction::Sink(self.sink_type.clone()),
};

node.context = Some(ctx);
});
}

Ok(())
}
Expand All @@ -163,6 +188,7 @@ mod tests {
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use flock::assert_batches_eq;
use flock::datasource::DataSource;
use flock::query::QueryType;
use flock::query::Table;
use std::sync::Arc;

Expand Down Expand Up @@ -192,6 +218,7 @@ mod tests {
DataSource::Memory,
DataSinkType::Blackhole,
None,
QueryType::OLAP,
))
}

Expand Down
4 changes: 2 additions & 2 deletions flock-function/src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
//! The `azure` crate contains the Azure-specific parts of the `flock-function`
//! library.

use crate::launcher::Launcher;
use crate::launcher::{ExecutionMode, Launcher};
use async_trait::async_trait;
use datafusion::arrow::record_batch::RecordBatch;
use flock::error::Result;
Expand All @@ -38,7 +38,7 @@ impl Launcher for AzureLauncher {
unimplemented!();
}

async fn execute(&self) -> Result<Vec<RecordBatch>> {
async fn execute(&self, _: ExecutionMode) -> Result<Vec<RecordBatch>> {
unimplemented!();
}
}
4 changes: 2 additions & 2 deletions flock-function/src/gcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
//! The `gcp` crate contains the GCP-specific parts of the `flock-function`
//! library.

use crate::launcher::Launcher;
use crate::launcher::{ExecutionMode, Launcher};
use async_trait::async_trait;
use datafusion::arrow::record_batch::RecordBatch;
use flock::error::Result;
Expand All @@ -38,7 +38,7 @@ impl Launcher for GCPLauncher {
unimplemented!();
}

async fn execute(&self) -> Result<Vec<RecordBatch>> {
async fn execute(&self, _: ExecutionMode) -> Result<Vec<RecordBatch>> {
unimplemented!();
}
}
17 changes: 16 additions & 1 deletion flock-function/src/launcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,18 @@ use datafusion::arrow::record_batch::RecordBatch;
use flock::error::Result;
use flock::query::Query;

/// The execution model for the query.
#[allow(dead_code)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ExecutionMode {
/// In centralized mode, the query is executed on a single cloud
/// function.
Centralized,
/// In distributed mode, the query is represented as a DAG and
/// executed on multiple cloud functions.
Distributed,
}

/// Launcher is a trait that defines the interface for deploying and executing
/// queries on cloud function services.
#[async_trait]
Expand All @@ -39,7 +51,10 @@ pub trait Launcher {
/// Execute a query on a specific cloud function service.
/// It is called after the query is deployed.
///
/// # Arguments
/// `mode` - The execution mode of the query.
///
/// # Returns
/// A vector of record batches.
async fn execute(&self) -> Result<Vec<RecordBatch>>;
async fn execute(&self, mode: ExecutionMode) -> Result<Vec<RecordBatch>>;
}
16 changes: 13 additions & 3 deletions flock-function/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

//! This crate responsibles for executing queries on the local machine.

use crate::launcher::Launcher;
use crate::launcher::{ExecutionMode, Launcher};
use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::record_batch::RecordBatch;
Expand Down Expand Up @@ -50,7 +50,8 @@ impl Launcher for LocalLauncher {
))
}

async fn execute(&self) -> Result<Vec<RecordBatch>> {
async fn execute(&self, mode: ExecutionMode) -> Result<Vec<RecordBatch>> {
assert!(mode == ExecutionMode::Centralized);
collect(self.execution_plan.clone())
.await
.map_err(|e| FlockError::Execution(e.to_string()))
Expand Down Expand Up @@ -115,6 +116,13 @@ impl LocalLauncher {
.for_each(|(i, _)| queue.push_back(p.children()[i].clone()));
}
}

/// Collects the results of the query.
pub async fn collect(&self) -> Result<Vec<RecordBatch>> {
collect(self.execution_plan.clone())
.await
.map_err(|e| FlockError::Execution(e.to_string()))
}
}

#[cfg(test)]
Expand All @@ -126,6 +134,7 @@ mod tests {
use flock::assert_batches_eq;
use flock::datasink::DataSinkType;
use flock::datasource::DataSource;
use flock::query::QueryType;
use flock::query::Table;

#[tokio::test]
Expand Down Expand Up @@ -153,6 +162,7 @@ mod tests {
DataSource::Memory,
DataSinkType::Blackhole,
None,
QueryType::OLAP,
);

let mut launcher = LocalLauncher::new(&query).await?;
Expand Down Expand Up @@ -185,7 +195,7 @@ mod tests {
)?;

launcher.feed_data_sources(&[vec![vec![batch]]]);
let batches = launcher.execute().await?;
let batches = launcher.collect().await?;

let expected = vec![
"+--------------------+--------------------+----------------------+",
Expand Down
1 change: 0 additions & 1 deletion flock/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ simd = [ "datafusion/simd" ]
async-trait = "0.1.42"
aws_lambda_events = "0.5"
base64 = "0.13.0"
blake2 = "0.9"
bytes = "1.0.1"
chrono = "0.4.19"
daggy = { git = "https://github.com/flock-lab/daggy", branch = "master" }
Expand Down
3 changes: 2 additions & 1 deletion flock/src/configs/flock.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ target_batch_size = 16384
# Default raw record batch size in the payload (512 KB)
payload_batch_size = 524288

# Lambda function concurrency
# Lambda function group's concurrency or size, and the concurrency
# of each function in the group is 1
concurrency = 32

aggregate_threshold = 10485760
Expand Down
2 changes: 1 addition & 1 deletion flock/src/distributed_plan/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ fn build_query_dag_from_serde_json(plan: Arc<dyn ExecutionPlan>) -> Result<Query
dag.insert(
leaf,
vec![Value::Object(left_obj), Value::Object(right_obj)],
1,
*FLOCK_FUNCTION_CONCURRENCY,
)?;
return Ok(dag);
}
Expand Down
14 changes: 8 additions & 6 deletions flock/src/driver/funcgen/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ use daggy::{NodeIndex, Walker};
use crate::datasource::DataSource;
use crate::driver::funcgen::dag::*;
use crate::prelude::*;
use blake2::{Blake2b, Digest};
use chrono::{DateTime, Utc};
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::physical_plan::ExecutionPlan;
use std::collections::hash_map::DefaultHasher;
use std::collections::{HashMap, VecDeque};
use std::hash::{Hash, Hasher};
use std::sync::Arc;

/// `QueryFlow` contains all the context information of the current query
Expand Down Expand Up @@ -112,7 +113,9 @@ impl QueryFlow {
query: &dyn Query,
dag: &mut QueryDag,
) -> HashMap<NodeIndex, ExecutionContext> {
let mut query_code = base64::encode(&Blake2b::digest(query.sql().as_bytes()));
let mut hasher = DefaultHasher::new();
query.sql().hash(&mut hasher);
let mut query_code = hasher.finish().to_string();
query_code.truncate(16);
let timestamp = chrono::offset::Utc::now();

Expand Down Expand Up @@ -172,8 +175,6 @@ mod tests {
use datafusion::datasource::MemTable;
use datafusion::execution::context::ExecutionContext;

use blake2::{Blake2b, Digest};

async fn init_query_flow(sql: &str) -> Result<QueryFlow> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Expand Down Expand Up @@ -325,8 +326,9 @@ mod tests {
async fn lambda_function_name() -> Result<()> {
// The hash of the SQL statement is used as the first 16 characters of the
// function name.
let hash = Blake2b::digest(b"SELECT b FROM t ORDER BY b ASC LIMIT 3");
let mut s1 = base64::encode(&hash);
let mut hasher = DefaultHasher::new();
b"SELECT b FROM t ORDER BY b ASC LIMIT 3".hash(&mut hasher);
let mut s1 = hasher.finish().to_string();
s1.truncate(16);

// The sub-plan index in the dag is used as the second 2 characters of the
Expand Down
2 changes: 1 addition & 1 deletion flock/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub use crate::datasink::{DataSink, DataSinkFormat, DataSinkType};
pub use crate::datasource::{nexmark, tpch, ysb, DataSource, DataStream, RelationPartitions};
pub use crate::encoding::Encoding;
pub use crate::error::{FlockError, Result};
pub use crate::query::{Query, Table};
pub use crate::query::{Query, QueryType, StreamType, Table};
pub use crate::runtime::arena::{Arena, WindowSession};
pub use crate::runtime::context::{self, CloudFunction, ExecutionContext};
pub use crate::runtime::payload::{DataFrame, Payload, Uuid, UuidBuilder};
Expand Down
32 changes: 32 additions & 0 deletions flock/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,34 @@ impl<T: AsRef<str>> Debug for Table<T> {
}
}

/// The stream type for the query.
#[allow(dead_code)]
#[derive(Clone, Debug)]
pub enum StreamType {
/// Regular stream.
Regular,
/// NEXMark Benchmark.
NEXMarkBench,
/// Yahoo! Streaming Benchmark.
YSBBench,
}

/// The query type for the query.
#[allow(dead_code)]
#[derive(Clone, Debug)]
pub enum QueryType {
/// OLAP workload.
OLAP,
/// Streaming workload.
Streaming(StreamType),
}

impl Default for QueryType {
fn default() -> Self {
QueryType::Streaming(StreamType::NEXMarkBench)
}
}

/// SQL queries in your application code execute over in-application batches.
#[derive(Debug, Default, Clone)]
pub struct Query<T: AsRef<str>> {
Expand All @@ -61,6 +89,8 @@ pub struct Query<T: AsRef<str>> {
/// the function name is generated from `sql`. To make the debugging easier,
/// we define human-readable function name for benchmarking.
pub query_code: Option<T>,
/// The query type.
pub query_type: QueryType,
}

impl<T: AsRef<str>> Query<T> {
Expand All @@ -71,13 +101,15 @@ impl<T: AsRef<str>> Query<T> {
datasource: DataSource,
datasink: DataSinkType,
query_code: Option<T>,
query_type: QueryType,
) -> Self {
Self {
sql,
tables,
datasource,
datasink,
query_code,
query_type,
}
}

Expand Down
Loading