Skip to content

Commit

Permalink
Add kgen utility to dump random data into Kafka (#3775)
Browse files Browse the repository at this point in the history
  • Loading branch information
umanwizard authored Jul 28, 2020
1 parent 2effefd commit 2aaadfe
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 0 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ members = [
"src/dataflow",
"src/expr",
"src/interchange",
"src/kafka-util",
"src/materialized",
"src/metabase",
"src/ore",
Expand Down
12 changes: 12 additions & 0 deletions src/kafka-util/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[package]
name = "kafka-util"
version = "0.1.0"
authors = ["Brennan Vincent <brennan@materialize.io>"]
edition = "2018"
publish = false

[dependencies]
anyhow = "1.0"
clap = "2.33"
rand = "0.7"
rdkafka = { git = "https://github.com/fede1024/rust-rdkafka.git", features = ["cmake-build", "libz-static"] }
130 changes: 130 additions & 0 deletions src/kafka-util/src/bin/kgen.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright Materialize, Inc. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use anyhow::bail;
use clap::{App, Arg};
use rand::{distributions::Uniform, prelude::Distribution, thread_rng, Rng};
use rdkafka::{
error::KafkaError,
producer::{BaseRecord, DefaultProducerContext, ThreadedProducer},
types::RDKafkaError,
util::Timeout,
ClientConfig,
};
use std::{thread::sleep, time::Duration};

fn gen_value<LenDist: Distribution<usize>, DataDist: Distribution<u8>, R: Rng>(
out: &mut Vec<u8>,
ld: &LenDist,
dd: &DataDist,
rng: &mut R,
) {
let len = ld.sample(rng);
out.reserve(len);
// safety - everything will be set by the below loop before ever being read
unsafe {
out.set_len(len);
}
for i in 0..len {
out[i] = dd.sample(rng);
}
}

fn main() -> anyhow::Result<()> {
let matches = App::new("kgen")
.about("Put random data in Kafka")
.arg(
Arg::with_name("topic")
.short("t")
.long("topic")
.takes_value(true)
.required(true),
)
.arg(
Arg::with_name("num-records")
.short("n")
.long("num-messages")
.takes_value(true)
.required(true),
)
.arg(
// default 1
Arg::with_name("partitions")
.short("p")
.long("partitions")
.takes_value(true),
)
.arg(
Arg::with_name("min")
.short("m")
.long("min-message-size")
.takes_value(true)
.required(true),
)
.arg(
Arg::with_name("max")
.short("M")
.long("max-message-size")
.takes_value(true)
.required(true),
)
.arg(
Arg::with_name("bootstrap")
.short("b")
.long("bootstrap-server")
.takes_value(true),
)
.get_matches();
let topic = matches.value_of("topic").unwrap();
let n: usize = matches.value_of("num-records").unwrap().parse()?;
let partitions: usize = matches
.value_of("partitions")
.map(str::parse)
.transpose()?
.unwrap_or(1);
if partitions == 0 {
bail!("Partitions must a positive number.");
}
let bootstrap = matches.value_of("bootstrap").unwrap_or("localhost:9092");
let min: usize = matches.value_of("min").unwrap().parse()?;
let max: usize = matches.value_of("max").unwrap().parse()?;

let producer: ThreadedProducer<DefaultProducerContext> = ClientConfig::new()
.set("bootstrap.servers", bootstrap)
.create()?;
let ld = Uniform::new_inclusive(min, max);
let dd = Uniform::new_inclusive(0, 255);
let mut buf = vec![];
let mut rng = thread_rng();
for i in 0..n {
if i % 10000 == 0 {
eprintln!("Generating message {}", i);
}
gen_value(&mut buf, &ld, &dd, &mut rng);
let key = i.to_le_bytes();
let mut rec = BaseRecord::to(topic)
.key(&key)
.payload(&buf)
.partition((i % partitions) as i32);
loop {
match producer.send(rec) {
Ok(()) => break,
Err((KafkaError::MessageProduction(RDKafkaError::QueueFull), rec2)) => {
rec = rec2;
sleep(Duration::from_secs(1));
}
Err((e, _)) => {
return Err(e.into());
}
}
}
}
producer.flush(Timeout::Never);
Ok(())
}

0 comments on commit 2aaadfe

Please sign in to comment.