Skip to content

Commit

Permalink
nextmark: local execution - query 4 (#228)
Browse files Browse the repository at this point in the history
  • Loading branch information
gangliao authored Apr 7, 2021
1 parent dd048cd commit ed8b9bb
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 5 deletions.
2 changes: 1 addition & 1 deletion src/runtime/src/datasource/nexmark/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ impl Bid {
Field::new("auction", DataType::Int32, false),
Field::new("bidder", DataType::Int32, false),
Field::new("price", DataType::Int32, false),
Field::new("date_time", DataType::Date64, false),
Field::new("b_date_time", DataType::Date64, false),
])
}

Expand Down
2 changes: 2 additions & 0 deletions src/runtime/src/datasource/nexmark/queries/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,6 @@

mod q0;
mod q1;
// mod q2;
mod q3;
mod q4;
2 changes: 1 addition & 1 deletion src/runtime/src/datasource/nexmark/queries/q1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ mod tests {
// data source generation
let events = nex.generate_data()?;

let sql = "SELECT auction, bidder, 0.908 * price as price, date_time FROM bid";
let sql = "SELECT auction, bidder, 0.908 * price as price, b_date_time FROM bid";

let schema = Arc::new(Bid::schema());

Expand Down
9 changes: 6 additions & 3 deletions src/runtime/src/datasource/nexmark/queries/q3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ mod tests {
#[tokio::test]
async fn local_query_3() -> Result<()> {
// benchmark configuration
let nex = NexMarkSource::new(5, 1, 1_000, StreamWindow::None);
let seconds = 5;
let threads = 1;
let event_per_second = 1000;
let nex = NexMarkSource::new(seconds, threads, event_per_second, StreamWindow::None);

// data source generation
let events = nex.generate_data()?;
Expand All @@ -44,14 +47,14 @@ mod tests {
"FROM ",
" auction INNER JOIN person on seller = p_id ",
"WHERE ",
" (state = 'or' OR state = 'id' OR state = 'ca');"
" category = 10 and (state = 'or' OR state = 'id' OR state = 'ca');"
);

let auction_schema = Arc::new(Auction::schema());
let person_schema = Arc::new(Person::schema());

// sequential processing
for i in 0..5 {
for i in 0..seconds {
// events to record batches
let am = events.auctions.get(&Date::new(i)).unwrap();
let (auctions, _) = am.get(&0).unwrap();
Expand Down
90 changes: 90 additions & 0 deletions src/runtime/src/datasource/nexmark/queries/q4.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2021 UMD Database Group. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#[allow(dead_code)]
fn main() {}

#[cfg(test)]
mod tests {
use super::*;
use crate::datasource::nexmark::event::{Auction, Bid, Date, Person};
use crate::datasource::nexmark::{NexMarkEvents, NexMarkSource};
use crate::error::Result;
use crate::executor::plan::physical_plan;
use crate::query::StreamWindow;
use arrow::json;
use datafusion::datasource::MemTable;
use datafusion::physical_plan::collect;
use std::io::BufReader;
use std::io::Write;
use std::sync::Arc;

#[tokio::test]
async fn local_query_4() -> Result<()> {
// benchmark configuration
let seconds = 2;
let threads = 1;
let event_per_second = 1000;
let nex = NexMarkSource::new(seconds, threads, event_per_second, StreamWindow::None);

// data source generation
let events = nex.generate_data()?;

let sql = concat!(
"SELECT ",
" category, ",
" AVG(final) ",
"FROM ( ",
" SELECT MAX(price) AS final, category ",
" FROM auction INNER JOIN bid on a_id = auction ",
" WHERE b_date_time BETWEEN a_date_time AND expires ",
" GROUP BY a_id, category ",
") as Q ",
"GROUP BY category;"
);

let auction_schema = Arc::new(Auction::schema());
let bid_schema = Arc::new(Bid::schema());

// sequential processing
for i in 0..seconds {
// events to record batches
let am = events.auctions.get(&Date::new(i)).unwrap();
let (auctions, _) = am.get(&0).unwrap();
let auctions_batches = NexMarkSource::to_batch(&auctions, auction_schema.clone());

let bm = events.bids.get(&Date::new(i)).unwrap();
let (bids, _) = bm.get(&0).unwrap();
let bids_batches = NexMarkSource::to_batch(&bids, bid_schema.clone());

// register memory tables
let mut ctx = datafusion::execution::context::ExecutionContext::new();
let auction_table = MemTable::try_new(auction_schema.clone(), vec![auctions_batches])?;
ctx.register_table("auction", Arc::new(auction_table));

let bid_table = MemTable::try_new(bid_schema.clone(), vec![bids_batches])?;
ctx.register_table("bid", Arc::new(bid_table));

// optimize query plan and execute it
let physical_plan = physical_plan(&mut ctx, &sql)?;
let batches = collect(physical_plan).await?;

// show output
let formatted = arrow::util::pretty::pretty_format_batches(&batches).unwrap();
println!("{}", formatted);
}

Ok(())
}
}

0 comments on commit ed8b9bb

Please sign in to comment.