Skip to content

Commit

Permalink
[*] add filter push down optimizer to work with MySQL
Browse files Browse the repository at this point in the history
  • Loading branch information
BohuTANG committed Dec 16, 2020
1 parent 87a7e75 commit be0d33f
Show file tree
Hide file tree
Showing 17 changed files with 188 additions and 85 deletions.
48 changes: 29 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,25 @@ Give thanks to [ClickHouse](https://github.com/ClickHouse/ClickHouse) and [Arrow
* **High Scalability**
* **High Reliability**

## Status
#### SQL Support

- [x] Projection
- [x] Filter (WHERE)
- [x] Limit
- [x] Aggregate
- [x] Common math functions
- [ ] Sorting
- [ ] Subqueries
- [ ] Joins


## Architecture

| Crate | Description | Status |
|-----------|-------------|-------------|
| optimizers | Optimizer for distributed plan | WIP |
| distributed | Distributed scheduler and executor for planner | TODO |
| [optimizers](src/optimizers) | Optimizer for distributed plan | WIP |
| [datablocks](src/datablocks) | Vectorized data processing unit | WIP |
| [datastreams](src/datastreams) | Async streaming iterators | WIP |
| [datasources](src/datasources) | Interface to the datasource([system.numbers for performance](src/datasources/system)/Remote(S3 or other table storage engine)) | WIP |
Expand All @@ -38,7 +50,7 @@ Give thanks to [ClickHouse](https://github.com/ClickHouse/ClickHouse) and [Arrow

* Dataset: 10,000,000,000 (10 Billion), system.numbers_mt
* Hardware: 8vCPUx16G KVM Cloud Instance

* Rust: rustc 1.50.0-nightly (f76ecd066 2020-12-15)

|Query |FuseQuery Cost| ClickHouse Cost|
|-------------------------------|---------------| ----|
Expand Down Expand Up @@ -83,14 +95,14 @@ $ mysql -h127.0.0.1 -P3307
###### Explain

```
mysql> explain select number as a, number/2 as b, number+1 as c from system.numbers_mt(10000000) where number < 4 limit 10;
mysql> explain select (number+1) as c1, number/2 as c2 from system.numbers_mt(10000000) where (c1+c2+1) < 100 limit 3;
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| explain |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| └─ Limit: 10
└─ Projection: number as a, number / 2 as b, number + 1 as c
└─ Filter: number < 4
└─ ReadDataSource: scan parts [8](Read from system.numbers_mt table) |
| └─ Limit: 3
└─ Projection: (number + 1) as c1, (number / 2) as c2
└─ Filter: ((((number + 1) + (number / 2)) + 1) < 100)
└─ ReadDataSource: scan parts [8](Read from system.numbers_mt table) |
|
└─ LimitTransform × 1 processor
└─ Merge (LimitTransform × 8 processors) to (MergeProcessor × 1)
Expand All @@ -99,23 +111,21 @@ mysql> explain select number as a, number/2 as b, number+1 as c from system.num
└─ FilterTransform × 8 processors
└─ SourceTransform × 8 processors |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
2 rows in set (0.01 sec)
2 rows in set (0.00 sec)
```

###### Select

```
mysql> select number as a, number/2 as b, number+1 as c from system.numbers_mt(10000000) where number < 4 limit 10;
+------+------+------+
| a | b | c |
+------+------+------+
| 0 | 0 | 1 |
| 1 | 0 | 2 |
| 2 | 1 | 3 |
| 3 | 1 | 4 |
+------+------+------+
4 rows in set (0.10 sec)
mysql> select (number+1) as c1, number/2 as c2 from system.numbers_mt(10000000) where (c1+c2+1) < 100 limit 3;
+------+------+
| c1 | c2 |
+------+------+
| 1 | 0 |
| 2 | 0 |
| 3 | 1 |
+------+------+
3 rows in set (0.06 sec)
```

## How to Test?
Expand Down
10 changes: 10 additions & 0 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
FROM rust:1.48.0-buster AS builder

RUN cargo install cargo-build-deps
RUN rustup toolchain install nightly

COPY Makefile /tmp/fusequery
COPY Cargo.toml /tmp/fusequery
COPY src /tmp/fusequery
WORKDIR /tmp/fusequery
RUN make run
1 change: 1 addition & 0 deletions src/optimizers/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ impl Optimizer {
vec![left.as_ref().clone(), right.as_ref().clone()]
}
ExpressionPlan::Function { args, .. } => args.clone(),
ExpressionPlan::Wildcard => vec![expr.clone()],
})
}
}
16 changes: 10 additions & 6 deletions src/optimizers/optimizer_filter_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,29 @@ impl FilterPushDownOptimizer {
}

/// replaces columns by its name on the projection.
fn rewrite(
fn rewrite_alias_expr(
expr: &ExpressionPlan,
projection: &HashMap<String, ExpressionPlan>,
) -> FuseQueryResult<ExpressionPlan> {
let expressions = Optimizer::expression_plan_children(expr)?;

let expressions = expressions
.iter()
.map(|e| rewrite(e, &projection))
.map(|e| rewrite_alias_expr(e, &projection))
.collect::<FuseQueryResult<Vec<_>>>()?;

if let ExpressionPlan::Field(name) = expr {
if let Some(expr) = projection.get(name) {
return Ok(expr.clone());
}
}
Ok(rewrite_expression(&expr, &expressions))
Ok(rebuild_alias_from_exprs(&expr, &expressions))
}

fn rewrite_expression(expr: &ExpressionPlan, expressions: &[ExpressionPlan]) -> ExpressionPlan {
fn rebuild_alias_from_exprs(
expr: &ExpressionPlan,
expressions: &[ExpressionPlan],
) -> ExpressionPlan {
match expr {
ExpressionPlan::Alias(alias, _) => {
ExpressionPlan::Alias(alias.clone(), Box::from(expressions[0].clone()))
Expand All @@ -51,6 +54,7 @@ fn rewrite_expression(expr: &ExpressionPlan, expressions: &[ExpressionPlan]) ->
op: op.clone(),
args: expressions.to_vec(),
},
other => other.clone(),
}
}

Expand All @@ -60,12 +64,12 @@ impl IOptimizer for FilterPushDownOptimizer {
}

fn optimize(&mut self, plan: &PlanNode) -> FuseQueryResult<PlanNode> {
let mut plans = plan.to_plans()?;
let mut plans = plan.node_to_plans()?;
let projection_map = Optimizer::projection_to_map(plan)?;

for plan in plans.iter_mut() {
if let PlanNode::Filter(filter) = plan {
let rewritten = rewrite(&filter.predicate, &projection_map)?;
let rewritten = rewrite_alias_expr(&filter.predicate, &projection_map)?;
let new_filter = FilterPlan {
predicate: rewritten,
input: filter.input.clone(),
Expand Down
8 changes: 4 additions & 4 deletions src/optimizers/optimizer_filter_push_down_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// Code is licensed under AGPL License, Version 3.0.

#[test]
fn test_filter_push_down() -> crate::error::FuseQueryResult<()> {
fn test_filter_push_down_optimizer() -> crate::error::FuseQueryResult<()> {
use std::sync::Arc;

use crate::contexts::*;
Expand All @@ -18,14 +18,14 @@ fn test_filter_push_down() -> crate::error::FuseQueryResult<()> {
));
let plan = Planner::new().build_from_sql(
ctx.clone(),
"select (number+1) as c1, number from system.numbers_mt where c1=1",
"select (number+1) as c1, number as c2 from system.numbers_mt where (c1+c2+1)=1",
)?;

let mut filter_push_down = FilterPushDownOptimizer::create();
let optimized = filter_push_down.optimize(&plan)?;
let expect = "\
└─ Projection: number + 1 as c1, number\
\n └─ Filter: number + 1 = 1\
└─ Projection: (number + 1) as c1, number as c2\
\n └─ Filter: ((((number + 1) + number) + 1) = 1)\
\n └─ ReadDataSource: scan parts [8](Read from system.numbers_mt table)";
let actual = format!("{:?}", optimized);
assert_eq!(expect, actual);
Expand Down
1 change: 1 addition & 0 deletions src/planners/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// Code is licensed under AGPL License, Version 3.0.

mod plan_filter_test;
mod plan_select_test;

mod parser;
mod plan_aggregate;
Expand Down
30 changes: 26 additions & 4 deletions src/planners/plan_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use std::sync::Arc;
use crate::datavalues::{DataField, DataSchema, DataSchemaRef};
use crate::error::FuseQueryResult;
use crate::planners::{
AggregatePlan, EmptyPlan, ExpressionPlan, FilterPlan, LimitPlan, PlanNode, ProjectionPlan,
ScanPlan,
field, AggregatePlan, EmptyPlan, ExplainPlan, ExpressionPlan, FilterPlan, LimitPlan, PlanNode,
ProjectionPlan, ScanPlan, SelectPlan,
};

pub struct PlanBuilder {
Expand Down Expand Up @@ -37,14 +37,24 @@ impl PlanBuilder {
/// Apply a projection.
pub fn project(&self, exprs: Vec<ExpressionPlan>) -> FuseQueryResult<Self> {
let input_schema = self.plan.schema();
let fields: Vec<DataField> = exprs

let mut projection_exprs = vec![];
exprs.iter().for_each(|v| match v {
ExpressionPlan::Wildcard => {
for i in 0..input_schema.fields().len() {
projection_exprs.push(field(input_schema.fields()[i].name()))
}
}
_ => projection_exprs.push(v.clone()),
});
let fields: Vec<DataField> = projection_exprs
.iter()
.map(|expr| expr.to_field(&input_schema))
.collect::<FuseQueryResult<_>>()?;

Ok(Self::from(&PlanNode::Projection(ProjectionPlan {
input: Arc::new(self.plan.clone()),
expr: exprs,
expr: projection_exprs,
schema: Arc::new(DataSchema::new(fields)),
})))
}
Expand Down Expand Up @@ -114,6 +124,18 @@ impl PlanBuilder {
})))
}

pub fn select(&self) -> FuseQueryResult<Self> {
Ok(Self::from(&PlanNode::Select(SelectPlan {
plan: Box::new(self.plan.clone()),
})))
}

pub fn explain(&self) -> FuseQueryResult<Self> {
Ok(Self::from(&PlanNode::Explain(ExplainPlan {
plan: Box::new(self.plan.clone()),
})))
}

/// Build the plan
pub fn build(&self) -> FuseQueryResult<PlanNode> {
Ok(self.plan.clone())
Expand Down
6 changes: 4 additions & 2 deletions src/planners/plan_display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ impl PlanNode {
// Empty.
PlanNode::Empty(_) => write!(f, ""),
PlanNode::Scan(_) => write!(f, ""),
PlanNode::Select(_) => write!(f, ""),
PlanNode::Select(_) => {
write!(f, "")
}
PlanNode::Explain(_) => write!(f, ""),
}
}
Expand All @@ -75,7 +77,7 @@ impl fmt::Debug for PlanNode {
prefix: "└─",
};

let mut plans = self.to_plans().map_err(|_| std::fmt::Error)?;
let mut plans = self.children_to_plans().map_err(|_| std::fmt::Error)?;
plans.reverse();
for node in plans.iter() {
node.format(f, setting)?;
Expand Down
4 changes: 2 additions & 2 deletions src/planners/plan_explain_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ fn test_explain_plan() -> crate::error::FuseQueryResult<()> {
ctx.clone(),
"explain select number as c1, number as c2, number as c3,(number+1) from system.numbers_mt where (number+1)=4",
)?;
let expect = "└─ Projection: number as c1, number as c2, number as c3, number + 1\
\n └─ Filter: number + 1 = 4\
let expect = "└─ Projection: number as c1, number as c2, number as c3, (number + 1)\
\n └─ Filter: ((number + 1) = 4)\
\n └─ ReadDataSource: scan parts [8](Read from system.numbers_mt table)";
let actual = format!("{:?}", plan);
assert_eq!(expect, actual);
Expand Down
9 changes: 7 additions & 2 deletions src/planners/plan_expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use std::fmt;

use crate::datavalues::{DataField, DataSchemaRef, DataValue};
use crate::error::FuseQueryResult;
use crate::error::{FuseQueryError, FuseQueryResult};
use crate::functions::{
AliasFunction, ConstantFunction, FieldFunction, Function, ScalarFunctionFactory,
};
Expand All @@ -24,6 +24,7 @@ pub enum ExpressionPlan {
op: String,
args: Vec<ExpressionPlan>,
},
Wildcard,
}

impl ExpressionPlan {
Expand Down Expand Up @@ -63,6 +64,9 @@ impl ExpressionPlan {
func.set_depth(depth);
AliasFunction::try_create(alias.clone(), func)
}
ExpressionPlan::Wildcard => Err(FuseQueryError::Internal(
"Cannot transform wildcard to function".to_string(),
)),
}
}

Expand Down Expand Up @@ -92,9 +96,10 @@ impl fmt::Debug for ExpressionPlan {
ExpressionPlan::Field(ref v) => write!(f, "{:#}", v),
ExpressionPlan::Constant(ref v) => write!(f, "{:#}", v),
ExpressionPlan::BinaryExpression { left, op, right } => {
write!(f, "{:?} {} {:?}", left, op, right,)
write!(f, "({:?} {} {:?})", left, op, right,)
}
ExpressionPlan::Function { op, args } => write!(f, "{}({:?})", op, args),
ExpressionPlan::Wildcard => write!(f, "*"),
}
}
}
2 changes: 1 addition & 1 deletion src/planners/plan_filter_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ fn test_filter_plan() -> crate::error::FuseQueryResult<()> {

let expect = "\
└─ Projection: number\
\n └─ Filter: number = 1\
\n └─ Filter: (number = 1)\
\n └─ ReadDataSource: scan parts [8](Read from system.numbers_mt table)";
let actual = format!("{:?}", plan);

Expand Down
24 changes: 17 additions & 7 deletions src/planners/plan_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl PlanNode {
}
}

pub fn to_plans(&self) -> FuseQueryResult<Vec<PlanNode>> {
fn to_array(&self, with_parent: bool) -> FuseQueryResult<Vec<PlanNode>> {
let max_depth = 128;
let mut depth = 0;
let mut result = vec![];
Expand Down Expand Up @@ -88,10 +88,16 @@ impl PlanNode {
depth += 1;
}
PlanNode::Select(v) => {
if with_parent {
result.push(PlanNode::Select(v.clone()));
}
plan = v.plan.as_ref().clone();
depth += 1;
}
PlanNode::Explain(v) => {
if with_parent {
result.push(PlanNode::Explain(v.clone()));
}
plan = v.plan.as_ref().clone();
depth += 1;
}
Expand All @@ -114,6 +120,14 @@ impl PlanNode {
Ok(result)
}

pub fn children_to_plans(&self) -> FuseQueryResult<Vec<PlanNode>> {
self.to_array(false)
}

pub fn node_to_plans(&self) -> FuseQueryResult<Vec<PlanNode>> {
self.to_array(true)
}

pub fn plans_to_node(array: &[PlanNode]) -> FuseQueryResult<PlanNode> {
let mut builder = PlanBuilder::empty(false);
for plan in array {
Expand All @@ -134,14 +148,10 @@ impl PlanNode {
builder = PlanBuilder::from(&PlanNode::ReadSource(v.clone()))
}
PlanNode::Explain(_v) => {
return Ok(PlanNode::Explain(ExplainPlan {
plan: Box::new(builder.build()?),
}))
builder = builder.explain()?;
}
PlanNode::Select(_v) => {
return Ok(PlanNode::Select(SelectPlan {
plan: Box::new(builder.build()?),
}))
builder = builder.select()?;
}
PlanNode::Empty(_) => {}
PlanNode::Scan(_) => {}
Expand Down
Loading

0 comments on commit be0d33f

Please sign in to comment.