Skip to content

Commit

Permalink
fix: rename transform to transmute (#385)
Browse files Browse the repository at this point in the history
* fix: rename transform to transmute

* fix: enable test in ci
  • Loading branch information
gangliao authored Dec 26, 2021
1 parent e612513 commit 2f11574
Show file tree
Hide file tree
Showing 30 changed files with 135 additions and 139 deletions.
7 changes: 5 additions & 2 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ jobs:
with:
filters: |
src:
- 'bench/**'
- 'src/**'
- 'benchmarks/**'
- 'flock/**'
- 'flock-cli/**'
- 'flock-function/**'
- 'playground/**'
- name: build and tests
if: steps.changes.outputs.src == 'true'
run: |
Expand Down
9 changes: 6 additions & 3 deletions benchmarks/nexmark/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ pub fn nexmark_query(query_number: usize) -> String {
mod tests {
use super::*;
use arrow::util::pretty::pretty_format_batches;
use flock::transmute::event_bytes_to_batch;

#[tokio::test]
async fn nexmark_sql_queries() -> Result<()> {
Expand Down Expand Up @@ -370,14 +371,16 @@ mod tests {

flock_ctx
.feed_data_sources(&[
vec![NEXMarkSource::to_batch(&event.bids, NEXMARK_BID.clone())],
vec![NEXMarkSource::to_batch(
vec![event_bytes_to_batch(&event.bids, NEXMARK_BID.clone(), 1024)],
vec![event_bytes_to_batch(
&event.persons,
NEXMARK_PERSON.clone(),
1024,
)],
vec![NEXMarkSource::to_batch(
vec![event_bytes_to_batch(
&event.auctions,
NEXMARK_AUCTION.clone(),
1024,
)],
])
.await?;
Expand Down
9 changes: 7 additions & 2 deletions benchmarks/ysb/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ fn ysb_query() -> String {
mod tests {
use super::*;
use arrow::util::pretty::pretty_format_batches;
use flock::transmute::event_bytes_to_batch;

#[tokio::test]
async fn ysb_sql_query() -> Result<()> {
Expand All @@ -239,8 +240,12 @@ mod tests {

flock_ctx
.feed_data_sources(&[
vec![YSBSource::to_batch(&event.ad_events, YSB_AD_EVENT.clone())],
vec![YSBSource::to_batch(&campaigns, YSB_CAMPAIGN.clone())],
vec![event_bytes_to_batch(
&event.ad_events,
YSB_AD_EVENT.clone(),
1024,
)],
vec![event_bytes_to_batch(&campaigns, YSB_CAMPAIGN.clone(), 1024)],
])
.await?;

Expand Down
2 changes: 1 addition & 1 deletion flock/src/datasink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::config::FLOCK_CONF;
use crate::encoding::Encoding;
use crate::error::{FlockError, Result};
use crate::runtime::payload::DataFrame;
use crate::runtime::transform::*;
use crate::transmute::*;
use arrow::datatypes::Schema;
use arrow::record_batch::RecordBatch;
use arrow_flight::utils::flight_data_from_arrow_batch;
Expand Down
67 changes: 17 additions & 50 deletions flock/src/datasource/nexmark/nexmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,13 @@ use crate::error::FlockError;
use crate::error::Result;
use crate::runtime::payload::{Payload, Uuid};
use crate::runtime::query::StreamWindow;
use crate::runtime::transform::*;
use crate::transmute::*;
use arrow::datatypes::SchemaRef;
use arrow::json;
use arrow::record_batch::RecordBatch;
use lazy_static::lazy_static;
use log::info;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::io::BufReader;
use std::sync::{Arc, Mutex};
use std::thread;

Expand Down Expand Up @@ -181,28 +179,16 @@ impl DataStream for NEXMarkStream {
};
let (r1, r2) = match query_number.expect("Query number is not set.") {
0 | 1 | 2 | 5 | 7 => (
NEXMarkSource::to_batch_v2(&event.bids, NEXMARK_BID.clone(), granule_size * 2),
event_bytes_to_batch(&event.bids, NEXMARK_BID.clone(), granule_size * 2),
vec![],
),
3 | 8 => (
NEXMarkSource::to_batch_v2(
&event.persons,
NEXMARK_PERSON.clone(),
granule_size / 5,
),
NEXMarkSource::to_batch_v2(
&event.auctions,
NEXMARK_AUCTION.clone(),
granule_size / 5,
),
event_bytes_to_batch(&event.persons, NEXMARK_PERSON.clone(), granule_size / 5),
event_bytes_to_batch(&event.auctions, NEXMARK_AUCTION.clone(), granule_size / 5),
),
4 | 6 | 9 => (
NEXMarkSource::to_batch_v2(
&event.auctions,
NEXMARK_AUCTION.clone(),
granule_size / 8,
),
NEXMarkSource::to_batch_v2(&event.bids, NEXMARK_BID.clone(), granule_size * 2),
event_bytes_to_batch(&event.auctions, NEXMARK_AUCTION.clone(), granule_size / 8),
event_bytes_to_batch(&event.bids, NEXMARK_BID.clone(), granule_size * 2),
),
_ => unimplemented!(),
};
Expand Down Expand Up @@ -262,22 +248,23 @@ impl DataStream for NEXMarkStream {
time, persons_num, auctions_num, bids_num
);

let batch_size = *FLOCK_SYNC_GRANULE_SIZE;
match query_number.expect("Query number is not set.") {
0 | 1 | 2 | 5 | 7 => Ok(to_payload(
&NEXMarkSource::to_batch(&event.bids, NEXMARK_BID.clone()),
&event_bytes_to_batch(&event.bids, NEXMARK_BID.clone(), batch_size),
&[],
uuid,
sync,
)),
3 | 8 => Ok(to_payload(
&NEXMarkSource::to_batch(&event.persons, NEXMARK_PERSON.clone()),
&NEXMarkSource::to_batch(&event.auctions, NEXMARK_AUCTION.clone()),
&event_bytes_to_batch(&event.persons, NEXMARK_PERSON.clone(), batch_size),
&event_bytes_to_batch(&event.auctions, NEXMARK_AUCTION.clone(), batch_size),
uuid,
sync,
)),
4 | 6 | 9 => Ok(to_payload(
&NEXMarkSource::to_batch(&event.auctions, NEXMARK_AUCTION.clone()),
&NEXMarkSource::to_batch(&event.bids, NEXMARK_BID.clone()),
&event_bytes_to_batch(&event.auctions, NEXMARK_AUCTION.clone(), batch_size),
&event_bytes_to_batch(&event.bids, NEXMARK_BID.clone(), batch_size),
uuid,
sync,
)),
Expand Down Expand Up @@ -399,28 +386,6 @@ impl NEXMarkSource {
Ok(std::mem::take(&mut events))
}

/// Converts NEXMarkSource events to record batches in Arrow.
pub fn to_batch(events: &[u8], schema: SchemaRef) -> Vec<RecordBatch> {
let batch_size = *FLOCK_SYNC_GRANULE_SIZE;
let mut reader = json::Reader::new(BufReader::new(events), schema, batch_size, None);

let mut batches = vec![];
while let Some(batch) = reader.next().unwrap() {
batches.push(batch);
}
batches
}

/// Converts NEXMarkSource events to record batches in Arrow.
pub fn to_batch_v2(events: &[u8], schema: SchemaRef, batch_size: usize) -> Vec<RecordBatch> {
let mut reader = json::Reader::new(BufReader::new(events), schema, batch_size, None);
let mut batches = vec![];
while let Some(batch) = reader.next().unwrap() {
batches.push(batch);
}
batches
}

/// Counts the number of events. (for testing)
pub fn count_events(&self, events: &NEXMarkStream) -> usize {
let threads: usize = self.config.get_as_or("threads", 100);
Expand Down Expand Up @@ -509,18 +474,20 @@ mod test {
// decompression and deserialization
let de_events: NEXMarkEvent = serde_json::from_value(values).unwrap();

let batch_size = *FLOCK_SYNC_GRANULE_SIZE;

let person_schema = Arc::new(Person::schema());
let batches = NEXMarkSource::to_batch(&de_events.persons, person_schema);
let batches = event_bytes_to_batch(&de_events.persons, person_schema, batch_size);
let formatted = arrow::util::pretty::pretty_format_batches(&batches).unwrap();
println!("{}", formatted);

let auction_schema = Arc::new(Auction::schema());
let batches = NEXMarkSource::to_batch(&de_events.auctions, auction_schema);
let batches = event_bytes_to_batch(&de_events.auctions, auction_schema, batch_size);
let formatted = arrow::util::pretty::pretty_format_batches(&batches).unwrap();
println!("{}", formatted);

let bid_schema = Arc::new(Bid::schema());
let batches = NEXMarkSource::to_batch(&de_events.bids, bid_schema);
let batches = event_bytes_to_batch(&de_events.bids, bid_schema, batch_size);
let formatted = arrow::util::pretty::pretty_format_batches(&batches).unwrap();
println!("{}", formatted);

Expand Down
3 changes: 2 additions & 1 deletion flock/src/datasource/nexmark/queries/q0.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod tests {
use crate::error::Result;
use crate::runtime::executor::plan::physical_plan;
use crate::runtime::query::StreamWindow;
use crate::transmute::event_bytes_to_batch;
use datafusion::datasource::MemTable;
use datafusion::physical_plan::collect;
use std::sync::Arc;
Expand All @@ -42,7 +43,7 @@ mod tests {
// events to record batches
let bm = events.bids.get(&Epoch::new(i)).unwrap();
let (bids, _) = bm.get(&0).unwrap();
let batches = NEXMarkSource::to_batch(bids, schema.clone());
let batches = event_bytes_to_batch(bids, schema.clone(), 1024);

// register memory table
let mut ctx = datafusion::execution::context::ExecutionContext::new();
Expand Down
3 changes: 2 additions & 1 deletion flock/src/datasource/nexmark/queries/q1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod tests {
use crate::error::Result;
use crate::runtime::executor::plan::physical_plan;
use crate::runtime::query::StreamWindow;
use crate::transmute::event_bytes_to_batch;
use datafusion::datasource::MemTable;
use datafusion::physical_plan::collect;
use std::sync::Arc;
Expand All @@ -43,7 +44,7 @@ mod tests {
// events to record batches
let bm = events.bids.get(&Epoch::new(i)).unwrap();
let (bids, _) = bm.get(&0).unwrap();
let batches = NEXMarkSource::to_batch(bids, schema.clone());
let batches = event_bytes_to_batch(bids, schema.clone(), 1024);

// register memory table
let mut ctx = datafusion::execution::context::ExecutionContext::new();
Expand Down
3 changes: 2 additions & 1 deletion flock/src/datasource/nexmark/queries/q10.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod tests {
use crate::error::Result;
use crate::runtime::executor::plan::physical_plan;
use crate::runtime::query::StreamWindow;
use crate::transmute::event_bytes_to_batch;
use datafusion::datasource::MemTable;
use datafusion::physical_plan::collect;
use indoc::indoc;
Expand Down Expand Up @@ -50,7 +51,7 @@ mod tests {
// events to record batches
let bm = events.bids.get(&Epoch::new(i)).unwrap();
let (bids, _) = bm.get(&0).unwrap();
let batches = NEXMarkSource::to_batch(bids, schema.clone());
let batches = event_bytes_to_batch(bids, schema.clone(), 1024);

// register memory table
let mut ctx = datafusion::execution::context::ExecutionContext::new();
Expand Down
4 changes: 2 additions & 2 deletions flock/src/datasource/nexmark/queries/q11.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ mod tests {
use crate::error::{FlockError, Result};
use crate::runtime::executor::plan::physical_plan;
use crate::runtime::query::{Schedule, StreamWindow};
use crate::runtime::transform::*;
use crate::transmute::*;
use arrow::array::{Int32Array, TimestampMillisecondArray, UInt64Array};
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::pretty_format_batches;
Expand Down Expand Up @@ -74,7 +74,7 @@ mod tests {
for i in 0..seconds {
let bm = events.bids.get(&Epoch::new(i)).unwrap();
let (bids, _) = bm.get(&0).unwrap();
let mut batches = vec![NEXMarkSource::to_batch(bids, schema.clone())];
let mut batches = vec![event_bytes_to_batch(bids, schema.clone(), 1024)];

// register memory tables
let mut ctx = datafusion::execution::context::ExecutionContext::new();
Expand Down
3 changes: 2 additions & 1 deletion flock/src/datasource/nexmark/queries/q13.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod tests {
use crate::error::Result;
use crate::runtime::executor::plan::physical_plan;
use crate::runtime::query::StreamWindow;
use crate::transmute::event_bytes_to_batch;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::util::pretty::pretty_format_batches;
use datafusion::datasource::MemTable;
Expand Down Expand Up @@ -105,7 +106,7 @@ mod tests {
// events to record batches
let bm = events.bids.get(&Epoch::new(i)).unwrap();
let (bids, _) = bm.get(&0).unwrap();
let bids_batches = NEXMarkSource::to_batch(bids, bid_schema.clone());
let bids_batches = event_bytes_to_batch(bids, bid_schema.clone(), 1024);

// register memory tables
let bid_table = MemTable::try_new(bid_schema.clone(), vec![bids_batches])?;
Expand Down
3 changes: 2 additions & 1 deletion flock/src/datasource/nexmark/queries/q2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod tests {
use crate::error::Result;
use crate::runtime::executor::plan::physical_plan;
use crate::runtime::query::StreamWindow;
use crate::transmute::event_bytes_to_batch;
use datafusion::datasource::MemTable;
use datafusion::physical_plan::collect;
use std::sync::Arc;
Expand All @@ -43,7 +44,7 @@ mod tests {
// events to record batches
let bm = events.bids.get(&Epoch::new(i)).unwrap();
let (bids, _) = bm.get(&0).unwrap();
let batches = NEXMarkSource::to_batch(bids, schema.clone());
let batches = event_bytes_to_batch(bids, schema.clone(), 1024);

// register memory table
let mut ctx = datafusion::execution::context::ExecutionContext::new();
Expand Down
5 changes: 3 additions & 2 deletions flock/src/datasource/nexmark/queries/q3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod tests {
use crate::error::Result;
use crate::runtime::executor::plan::physical_plan;
use crate::runtime::query::StreamWindow;
use crate::transmute::event_bytes_to_batch;
use datafusion::datasource::MemTable;
use datafusion::physical_plan::collect;
use indoc::indoc;
Expand Down Expand Up @@ -65,11 +66,11 @@ mod tests {
// events to record batches
let am = events.auctions.get(&Epoch::new(i)).unwrap();
let (auctions, _) = am.get(&0).unwrap();
let auctions_batches = NEXMarkSource::to_batch(auctions, auction_schema.clone());
let auctions_batches = event_bytes_to_batch(auctions, auction_schema.clone(), 1024);

let pm = events.persons.get(&Epoch::new(i)).unwrap();
let (persons, _) = pm.get(&0).unwrap();
let person_batches = NEXMarkSource::to_batch(persons, person_schema.clone());
let person_batches = event_bytes_to_batch(persons, person_schema.clone(), 1024);

// register memory tables
let mut ctx = datafusion::execution::context::ExecutionContext::new();
Expand Down
5 changes: 3 additions & 2 deletions flock/src/datasource/nexmark/queries/q4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod tests {
use crate::error::Result;
use crate::runtime::executor::plan::physical_plan;
use crate::runtime::query::StreamWindow;
use crate::transmute::event_bytes_to_batch;
use datafusion::datasource::MemTable;
use datafusion::physical_plan::collect;
use indoc::indoc;
Expand Down Expand Up @@ -65,11 +66,11 @@ mod tests {
// events to record batches
let am = events.auctions.get(&Epoch::new(i)).unwrap();
let (auctions, _) = am.get(&0).unwrap();
let auctions_batches = NEXMarkSource::to_batch(auctions, auction_schema.clone());
let auctions_batches = event_bytes_to_batch(auctions, auction_schema.clone(), 1024);

let bm = events.bids.get(&Epoch::new(i)).unwrap();
let (bids, _) = bm.get(&0).unwrap();
let bids_batches = NEXMarkSource::to_batch(bids, bid_schema.clone());
let bids_batches = event_bytes_to_batch(bids, bid_schema.clone(), 1024);

// register memory tables
let mut ctx = datafusion::execution::context::ExecutionContext::new();
Expand Down
3 changes: 2 additions & 1 deletion flock/src/datasource/nexmark/queries/q5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod tests {
use crate::error::{FlockError, Result};
use crate::runtime::executor::plan::physical_plan;
use crate::runtime::query::StreamWindow;
use crate::transmute::event_bytes_to_batch;
use datafusion::datasource::MemTable;
use datafusion::physical_plan::collect;
use indoc::indoc;
Expand Down Expand Up @@ -90,7 +91,7 @@ mod tests {
}
let bm = events.bids.get(&Epoch::new(j)).unwrap();
let (bids, _) = bm.get(&0).unwrap();
bids_batches.push(NEXMarkSource::to_batch(bids, bid_schema.clone()));
bids_batches.push(event_bytes_to_batch(bids, bid_schema.clone(), 1024));
}

let old_batches = bids_batches.clone();
Expand Down
5 changes: 3 additions & 2 deletions flock/src/datasource/nexmark/queries/q6.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod tests {
use crate::error::Result;
use crate::runtime::executor::plan::physical_plan;
use crate::runtime::query::StreamWindow;
use crate::transmute::event_bytes_to_batch;
use arrow::array::UInt64Array;
use arrow::record_batch::RecordBatch;
use datafusion::datasource::MemTable;
Expand Down Expand Up @@ -95,11 +96,11 @@ mod tests {
// events to record batches
let am = events.auctions.get(&Epoch::new(i)).unwrap();
let (auctions, _) = am.get(&0).unwrap();
let auctions_batches = NEXMarkSource::to_batch(auctions, auction_schema.clone());
let auctions_batches = event_bytes_to_batch(auctions, auction_schema.clone(), 1024);

let bm = events.bids.get(&Epoch::new(i)).unwrap();
let (bids, _) = bm.get(&0).unwrap();
let bids_batches = NEXMarkSource::to_batch(bids, bid_schema.clone());
let bids_batches = event_bytes_to_batch(bids, bid_schema.clone(), 1024);

// register memory tables
let mut ctx = datafusion::execution::context::ExecutionContext::new();
Expand Down
Loading

0 comments on commit 2f11574

Please sign in to comment.