Skip to content

Commit

Permalink
nexmark benchmark: local execution - query 6 (#237)
Browse files Browse the repository at this point in the history
  • Loading branch information
gangliao authored Apr 10, 2021
1 parent 5874a88 commit 0f59e50
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/runtime/src/datasource/nexmark/queries/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ mod q1;
mod q3;
mod q4;
mod q5;
mod q6;
mod q7;
mod q8;
158 changes: 158 additions & 0 deletions src/runtime/src/datasource/nexmark/queries/q6.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Copyright 2021 UMD Database Group. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#[allow(dead_code)]
fn main() {}

#[cfg(test)]
mod tests {
use super::*;
use crate::datasource::nexmark::event::{Auction, Bid, Date, Person};
use crate::datasource::nexmark::{NexMarkEvents, NexMarkSource};
use crate::error::Result;
use crate::executor::plan::physical_plan;
use crate::query::StreamWindow;
use arrow::array::UInt64Array;
use arrow::json;
use arrow::record_batch::RecordBatch;
use datafusion::datasource::MemTable;
use datafusion::physical_plan::expressions::Column;
use datafusion::physical_plan::limit::truncate_batch;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::{collect, collect_partitioned};
use datafusion::physical_plan::{ExecutionPlan, Partitioning};
use futures::stream::StreamExt;
use std::io::BufReader;
use std::io::Write;
use std::sync::Arc;

async fn repartition(
input_partitions: Vec<Vec<RecordBatch>>,
partitioning: Partitioning,
) -> Result<Vec<Vec<RecordBatch>>> {
// create physical plan
let exec = MemoryExec::try_new(&input_partitions, input_partitions[0][0].schema(), None)?;
let exec = RepartitionExec::try_new(Arc::new(exec), partitioning)?;

// execute and collect results
let mut output_partitions = vec![];
for i in 0..exec.partitioning().partition_count() {
// execute this *output* partition and collect all batches
let mut stream = exec.execute(i).await?;
let mut batches = vec![];
while let Some(result) = stream.next().await {
batches.push(result?);
}
output_partitions.push(batches);
}
Ok(output_partitions)
}

#[tokio::test]
async fn local_query_6() -> Result<()> {
// benchmark configuration
let seconds = 2;
let threads = 1;
let event_per_second = 1000;
let nex = NexMarkSource::new(seconds, threads, event_per_second, StreamWindow::None);

// data source generation
let events = nex.generate_data()?;

let sql1 = concat!(
"SELECT COUNT(DISTINCT seller) ",
"FROM auction INNER JOIN bid ON a_id = auction ",
"WHERE b_date_time between a_date_time and expires ",
);

let sql2 = concat!(
"SELECT seller, MAX(price) AS final ",
"FROM auction INNER JOIN bid ON a_id = auction ",
"WHERE b_date_time between a_date_time and expires ",
"GROUP BY a_id, seller ORDER by seller;"
);

let sql3 = "SELECT seller, AVG(final) FROM Q GROUP BY seller;";

let auction_schema = Arc::new(Auction::schema());
let bid_schema = Arc::new(Bid::schema());

// sequential processing
for i in 0..seconds {
// events to record batches
let am = events.auctions.get(&Date::new(i)).unwrap();
let (auctions, _) = am.get(&0).unwrap();
let auctions_batches = NexMarkSource::to_batch(&auctions, auction_schema.clone());

let bm = events.bids.get(&Date::new(i)).unwrap();
let (bids, _) = bm.get(&0).unwrap();
let bids_batches = NexMarkSource::to_batch(&bids, bid_schema.clone());

// register memory tables
let mut ctx = datafusion::execution::context::ExecutionContext::new();
let auction_table = MemTable::try_new(auction_schema.clone(), vec![auctions_batches])?;
ctx.register_table("auction", Arc::new(auction_table));

let bid_table = MemTable::try_new(bid_schema.clone(), vec![bids_batches])?;
ctx.register_table("bid", Arc::new(bid_table));

// optimize query plan and execute it

// 1. get the total distinct sellers during the epoch
let plan = physical_plan(&mut ctx, &sql1)?;
let batches = collect(plan).await?;
let total_distinct_sellers = batches[0]
.column(0)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap()
.value(0);

// 2. get the max price of auctions for each seller
let plan = physical_plan(&mut ctx, &sql2)?;
let batches = collect_partitioned(plan).await?;
let batches = repartition(
batches,
Partitioning::HashDiff(
vec![Arc::new(Column::new(&"seller"))],
total_distinct_sellers as usize,
),
)
.await?;

// 3. simulate `Partition By 10 recent rows for each seller`
let output_partitions = batches
.iter()
.map(|v| {
assert_eq!(v.len(), 1);
truncate_batch(&v[0], 10)
})
.collect::<Vec<RecordBatch>>();

// 4. the average selling price per seller for their last 10 closed auctions.
let q_table =
MemTable::try_new(output_partitions[0].schema(), vec![output_partitions])?;
ctx.register_table("Q", Arc::new(q_table));
let plan = physical_plan(&mut ctx, &sql3)?;
let output_partitions = collect(plan).await?;

// show output
let formatted = arrow::util::pretty::pretty_format_batches(&output_partitions).unwrap();
println!("{}", formatted);
}

Ok(())
}
}

0 comments on commit 0f59e50

Please sign in to comment.