Skip to content

Commit

Permalink
feat!: nexmark q10 on cloud functions (#392)
Browse files Browse the repository at this point in the history
  • Loading branch information
gangliao authored Dec 30, 2021
1 parent 097d3d5 commit 4832a30
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 11 deletions.
20 changes: 12 additions & 8 deletions benchmarks/nexmark/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ pub struct NexmarkBenchmarkOpt {
#[tokio::main]
async fn main() -> Result<()> {
env_logger::init();
benchmark(NexmarkBenchmarkOpt::from_args()).await?;
benchmark(&mut NexmarkBenchmarkOpt::from_args()).await?;
Ok(())
}

Expand Down Expand Up @@ -119,13 +119,17 @@ pub async fn register_nexmark_tables() -> Result<DataFusionExecutionContext> {
Ok(ctx)
}

pub fn create_nexmark_source(opt: &NexmarkBenchmarkOpt) -> NEXMarkSource {
pub fn create_nexmark_source(opt: &mut NexmarkBenchmarkOpt) -> NEXMarkSource {
let window = match opt.query_number {
0 | 1 | 2 | 3 | 4 | 6 | 9 | 13 => StreamWindow::ElementWise,
0..=4 | 6 | 9 | 10 | 13 => StreamWindow::ElementWise,
5 => StreamWindow::HoppingWindow((10, 5)),
7..=8 => StreamWindow::TumblingWindow(Schedule::Seconds(10)),
_ => unreachable!(),
};

if opt.query_number == 10 {
opt.data_sink_type = 1; // AWS S3
}
NEXMarkSource::new(opt.seconds, opt.generators, opt.events_per_second, window)
}

Expand Down Expand Up @@ -267,17 +271,17 @@ async fn create_file_system() -> Result<String> {
}

#[allow(dead_code)]
async fn benchmark(opt: NexmarkBenchmarkOpt) -> Result<()> {
async fn benchmark(opt: &mut NexmarkBenchmarkOpt) -> Result<()> {
info!(
"Running the NEXMark benchmark with the following options: {:?}",
opt
);
let query_number = opt.query_number;
let nexmark_conf = create_nexmark_source(&opt);
let nexmark_conf = create_nexmark_source(opt);

let mut ctx = register_nexmark_tables().await?;
let plan = physical_plan(&mut ctx, &nexmark_query(query_number)).await?;
let worker = create_nexmark_functions(&opt, nexmark_conf.window.clone(), plan).await?;
let worker = create_nexmark_functions(opt, nexmark_conf.window.clone(), plan).await?;

// The source generator function needs the metadata to determine the type of the
// workers such as single function or a group. We don't want to keep this info
Expand Down Expand Up @@ -372,13 +376,13 @@ mod tests {

#[tokio::test]
async fn nexmark_sql_queries() -> Result<()> {
let opt = NexmarkBenchmarkOpt {
let mut opt = NexmarkBenchmarkOpt {
generators: 1,
seconds: 5,
events_per_second: 10_000,
..Default::default()
};
let conf = create_nexmark_source(&opt);
let conf = create_nexmark_source(&mut opt);
let (event, _) = Arc::new(conf.generate_data()?)
.select(1, 0)
.expect("Failed to select event.");
Expand Down
2 changes: 1 addition & 1 deletion flock/src/datasource/nexmark/nexmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ impl DataStream for NEXMarkStream {

let batch_size = *FLOCK_SYNC_GRANULE_SIZE;
match query_number.expect("Query number is not set.") {
0 | 1 | 2 | 5 | 7 => Ok(to_payload(
0 | 1 | 2 | 5 | 7 | 10 => Ok(to_payload(
&event_bytes_to_batch(&event.bids, NEXMARK_BID.clone(), batch_size),
&[],
uuid,
Expand Down
4 changes: 2 additions & 2 deletions flock_bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ if [ $seconds -gt 60 ]; then
seconds=60
fi

if [ $query -gt 9 ]; then
echo $(echored "Error: Query number must be between 0 and 9.")
if [ $query -gt 10 ]; then
echo $(echored "Error: Query number must be between 0 and 10.")
exit
fi

Expand Down

0 comments on commit 4832a30

Please sign in to comment.