Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: add local launcher interface #425

Merged
merged 1 commit into from
Jan 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions flock-function/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ snmalloc = [ "snmalloc-rs" ]
simd = [ "datafusion/simd" ]

[dependencies]
async-trait = "0.1.42"
aws_lambda_events = "0.5"
base64 = "0.13.0"
bytes = "1.1.0"
Expand Down
10 changes: 8 additions & 2 deletions flock-function/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,29 @@
//! library.

use crate::launcher::Launcher;
use async_trait::async_trait;
use datafusion::arrow::record_batch::RecordBatch;
use flock::error::Result;
use flock::query::Query;

/// AwsLambdaLauncher defines the interface for deploying and executing
/// queries on AWS Lambda.
pub struct AwsLambdaLauncher {}

#[async_trait]
impl Launcher for AwsLambdaLauncher {
fn new(_query: &Query) -> Self {
fn new<T>(_query: &Query<T>) -> Self
where
T: AsRef<str> + Send + Sync + 'static,
{
AwsLambdaLauncher {}
}

fn deploy(&self) -> Result<()> {
unimplemented!();
}

fn execute(&self) -> Result<()> {
async fn execute(&self) -> Result<Vec<RecordBatch>> {
unimplemented!();
}
}
10 changes: 8 additions & 2 deletions flock-function/src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,29 @@
//! library.

use crate::launcher::Launcher;
use async_trait::async_trait;
use datafusion::arrow::record_batch::RecordBatch;
use flock::error::Result;
use flock::query::Query;

/// AzureLauncher defines the interface for deploying and executing
/// queries on Azure Functions.
pub struct AzureLauncher {}

#[async_trait]
impl Launcher for AzureLauncher {
fn new(_query: &Query) -> Self {
fn new<T>(_query: &Query<T>) -> Self
where
T: AsRef<str> + Send + Sync + 'static,
{
AzureLauncher {}
}

fn deploy(&self) -> Result<()> {
unimplemented!();
}

fn execute(&self) -> Result<()> {
async fn execute(&self) -> Result<Vec<RecordBatch>> {
unimplemented!();
}
}
10 changes: 8 additions & 2 deletions flock-function/src/gcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,29 @@
//! library.

use crate::launcher::Launcher;
use async_trait::async_trait;
use datafusion::arrow::record_batch::RecordBatch;
use flock::error::Result;
use flock::query::Query;

/// GCPLauncher defines the interface for deploying and executing
/// queries on GCP Functions.
pub struct GCPLauncher {}

#[async_trait]
impl Launcher for GCPLauncher {
fn new(_query: &Query) -> Self {
fn new<T>(_query: &Query<T>) -> Self
where
T: AsRef<str> + Send + Sync + 'static,
{
GCPLauncher {}
}

fn deploy(&self) -> Result<()> {
unimplemented!();
}

fn execute(&self) -> Result<()> {
async fn execute(&self) -> Result<Vec<RecordBatch>> {
unimplemented!();
}
}
45 changes: 10 additions & 35 deletions flock-function/src/launcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,54 +14,29 @@
//! This crate responsibles for deploying the query to cloud function services
//! on public clouds.

use flock::error::{FlockError, Result};
use async_trait::async_trait;
use datafusion::arrow::record_batch::RecordBatch;
use flock::error::Result;
use flock::query::Query;

/// Launcher is a trait that defines the interface for deploying and executing
/// queries on cloud function services.
#[async_trait]
pub trait Launcher {
/// Create a new launcher.
///
/// # Arguments
/// `flow` - The query flow to be deployed.
fn new(query: &Query) -> Self;
/// `query` - The query to be deployed.
fn new<T: AsRef<str> + Send + Sync + 'static>(query: &Query<T>) -> Self;

/// Deploy a query to a specific cloud function service.
/// It is called before the query is executed.
fn deploy(&self) -> Result<()>;

/// Execute a query on a specific cloud function service.
/// It is called after the query is deployed.
fn execute(&self) -> Result<()>;
}

/// LocalLauncher executes the query locally.
pub struct LocalLauncher {}

impl Launcher for LocalLauncher {
fn new(_query: &Query) -> Self {
LocalLauncher {}
}

fn deploy(&self) -> Result<()> {
Err(FlockError::Internal(
"Local execution doesn't require a deployment.".to_owned(),
))
}

fn execute(&self) -> Result<()> {
unimplemented!();
}
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn version_check() -> Result<()> {
let manifest = cargo_toml::Manifest::from_str(include_str!("../Cargo.toml")).unwrap();
assert_eq!(env!("CARGO_PKG_VERSION"), manifest.package.unwrap().version);
Ok(())
}
///
/// # Returns
/// A vector of record batches.
async fn execute(&self) -> Result<Vec<RecordBatch>>;
}
1 change: 1 addition & 0 deletions flock-function/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ pub mod aws;
pub mod azure;
pub mod gcp;
mod launcher;
pub mod local;
198 changes: 198 additions & 0 deletions flock-function/src/local.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
// Copyright (c) 2020-present, UMD Database Group.
//
// This program is free software: you can use, redistribute, and/or modify
// it under the terms of the GNU Affero General Public License, version 3
// or later ("AGPL"), as published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

//! This crate responsibles for executing queries on the local machine.

use crate::launcher::Launcher;
use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::physical_plan::collect;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::ExecutionPlan;
use flock::error::{FlockError, Result};
use flock::query::Query;
use std::collections::HashSet;
use std::collections::VecDeque;
use std::sync::Arc;

/// LocalLauncher executes the query locally.
pub struct LocalLauncher {
/// The physical plan of the query.
execution_plan: Arc<dyn ExecutionPlan>,
}

#[async_trait]
impl Launcher for LocalLauncher {
fn new<T>(query: &Query<T>) -> Self
where
T: AsRef<str> + Send + Sync + 'static,
{
LocalLauncher {
execution_plan: query.plan().unwrap(),
}
}

fn deploy(&self) -> Result<()> {
Err(FlockError::Internal(
"Local execution doesn't require a deployment.".to_owned(),
))
}

async fn execute(&self) -> Result<Vec<RecordBatch>> {
collect(self.execution_plan.clone())
.await
.map_err(|e| FlockError::Execution(e.to_string()))
}
}

impl LocalLauncher {
/// Compare two execution plans' schemas.
/// Returns true if they are belong to the same plan node.
///
/// # Arguments
/// * `schema1` - The first schema.
/// * `schema2` - The second schema.
///
/// # Returns
/// * `true` - If the schemas belong to the same plan node.
fn compare_schema(schema1: SchemaRef, schema2: SchemaRef) -> bool {
let (superset, subset) = if schema1.fields().len() >= schema2.fields().len() {
(schema1, schema2)
} else {
(schema2, schema1)
};

let fields = superset
.fields()
.iter()
.map(|f| f.name())
.collect::<HashSet<_>>();

subset.fields().iter().all(|f| fields.contains(&f.name()))
}

/// Feeds the query with data.
///
/// # Arguments
/// * `sources` - A list of data sources.
pub fn feed_data_sources(&mut self, sources: &[Vec<Vec<RecordBatch>>]) {
// Breadth-first search
let mut queue = VecDeque::new();
queue.push_front(self.execution_plan.clone());

while !queue.is_empty() {
let mut p = queue.pop_front().unwrap();
if p.children().is_empty() {
for partition in sources {
if LocalLauncher::compare_schema(p.schema(), partition[0][0].schema()) {
unsafe {
Arc::get_mut_unchecked(&mut p)
.as_mut_any()
.downcast_mut::<MemoryExec>()
.unwrap()
.set_partitions(partition);
}
break;
}
}
}

p.children()
.iter()
.enumerate()
.for_each(|(i, _)| queue.push_back(p.children()[i].clone()));
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use datafusion::arrow::array::*;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use flock::assert_batches_eq;
use flock::datasource::DataSource;
use flock::query::Table;

#[tokio::test]
async fn version_check() -> Result<()> {
let manifest = cargo_toml::Manifest::from_str(include_str!("../Cargo.toml")).unwrap();
assert_eq!(env!("CARGO_PKG_VERSION"), manifest.package.unwrap().version);
Ok(())
}

#[tokio::test]
async fn local_launcher() -> Result<()> {
let table_name = "test_table";
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int64, false),
Field::new("c2", DataType::Float64, false),
Field::new("c3", DataType::Utf8, false),
Field::new("c4", DataType::UInt64, false),
Field::new("c5", DataType::Utf8, false),
Field::new("neg", DataType::Int64, false),
]));
let sql = "SELECT MIN(c1), AVG(c4), COUNT(c3) FROM test_table";
let query = Query::new(
sql,
vec![Table(table_name, schema.clone())],
DataSource::Memory,
);

let mut launcher = LocalLauncher::new(&query);

let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int64Array::from(vec![90, 90, 91, 101, 92, 102, 93, 103])),
Arc::new(Float64Array::from(vec![
92.1, 93.2, 95.3, 96.4, 98.5, 99.6, 100.7, 101.8,
])),
Arc::new(StringArray::from(vec![
"a", "a", "d", "b", "b", "d", "c", "c",
])),
Arc::new(UInt64Array::from(vec![33, 1, 54, 33, 12, 75, 2, 87])),
Arc::new(StringArray::from(vec![
"rapport",
"pedantic",
"mimesis",
"haptic",
"baksheesh",
"amok",
"devious",
"c",
])),
Arc::new(Int64Array::from(vec![
-90, -90, -91, -101, -92, -102, -93, -103,
])),
],
)?;

launcher.feed_data_sources(&[vec![vec![batch]]]);
let batches = launcher.execute().await?;

let expected = vec![
"+--------------------+--------------------+----------------------+",
"| MIN(test_table.c1) | AVG(test_table.c4) | COUNT(test_table.c3) |",
"+--------------------+--------------------+----------------------+",
"| 90 | 37.125 | 8 |",
"+--------------------+--------------------+----------------------+",
];

assert_batches_eq!(&expected, &batches);

Ok(())
}
}
2 changes: 2 additions & 0 deletions flock/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ pub enum DataSource {
Json,
/// AWS S3 for baseline benchmark.
S3(NEXMarkSource),
/// Data source from the local memory.
Memory,
/// Unknown data source.
UnknownEvent,
}
Expand Down
2 changes: 1 addition & 1 deletion flock/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub use crate::datasink::{DataSink, DataSinkFormat, DataSinkType};
pub use crate::datasource::{nexmark, tpch, ysb, DataSource, DataStream, RelationPartitions};
pub use crate::encoding::Encoding;
pub use crate::error::{FlockError, Result};
pub use crate::query::Query;
pub use crate::query::{Query, Table};
pub use crate::runtime::arena::{Arena, WindowSession};
pub use crate::runtime::context::{CloudFunction, ExecutionContext};
pub use crate::runtime::executor::{
Expand Down
Loading