Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo committed Jul 19, 2024
1 parent 05b1317 commit b7decf2
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 116 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions relay-metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ fnv = { workspace = true }
hash32 = { workspace = true }
hashbrown = { workspace = true }
itertools = { workspace = true }
rand = { workspace = true }
rand_pcg = { workspace = true }
relay-base-schema = { workspace = true }
relay-cardinality = { workspace = true }
relay-cogs = { workspace = true }
Expand Down
272 changes: 156 additions & 116 deletions relay-metrics/benches/benches.rs
Original file line number Diff line number Diff line change
@@ -1,135 +1,192 @@
use std::collections::BTreeMap;
use std::fmt;

use criterion::{black_box, criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion};
use rand::distributions::Uniform;
use rand::Rng;
use rand_pcg::Pcg32;
use relay_base_schema::project::ProjectKey;
use relay_common::time::UnixTimestamp;
use relay_metrics::{
aggregator::{Aggregator, AggregatorConfig},
Bucket, BucketValue, DistributionValue, FiniteF64,
};
use std::cell::RefCell;
use std::collections::BTreeMap;
use std::fmt;

struct NumbersGenerator {
min: usize,
max: usize,
current_value: RefCell<usize>,
}

impl NumbersGenerator {
fn new(min: usize, max: usize) -> Self {
Self {
min,
max,
current_value: RefCell::new(0),
}
}

fn next(&self) -> usize {
let seed = *self.current_value.borrow() as u128;
let mut generator = Pcg32::new((seed >> 64) as u64, seed as u64);
let dist = Uniform::new(self.min, self.max + 1);
let value = generator.sample(dist);

*self.current_value.borrow_mut() += 1;

/// Struct representing a testcase for which insert + flush are timed.
struct MetricInput {
num_metrics: usize,
num_metric_names: usize,
num_project_keys: usize,
bucket: Bucket,
value
}
}

impl MetricInput {
// Separate from actual metric insertion as we do not want to time this function, its logic and
// especially not creation and destruction of a large vector.
//
// In theory we could also create all vectors upfront instead of having a MetricInput struct,
// but that would take a lot of memory. This way we can at least free some RAM between
// benchmarks.
struct BucketsGenerator {
base_timestamp: UnixTimestamp,
percentage_backdated: f32,
num_buckets: usize,
metric_ids_generator: NumbersGenerator,
project_keys_generator: NumbersGenerator,
timestamp_shifts_generator: NumbersGenerator,
}
impl BucketsGenerator {
fn get_buckets(&self) -> Vec<(ProjectKey, Bucket)> {
let mut rv = Vec::new();

for i in 0..self.num_metrics {
let key_id = i % self.num_project_keys;
let metric_name = format!("c:transactions/foo{}", i % self.num_metric_names);
let mut bucket = self.bucket.clone();
bucket.name = metric_name.into();
let key = ProjectKey::parse(&format!("{key_id:0width$x}", width = 32)).unwrap();
rv.push((key, bucket));
let mut buckets = Vec::with_capacity(self.num_buckets);

let backdated = ((self.num_buckets as f32 * self.percentage_backdated) as usize)
.clamp(0, self.num_buckets);
let non_backdated = self.num_buckets - backdated;

for _ in 0..backdated {
buckets.push(self.build_bucket(true));
}

rv
for _ in 0..non_backdated {
buckets.push(self.build_bucket(false));
}

buckets
}

fn build_bucket(&self, is_backdated: bool) -> (ProjectKey, Bucket) {
let time_shift = self.timestamp_shifts_generator.next();
let timestamp = if is_backdated {
self.base_timestamp.as_secs() - (time_shift as u64)
} else {
self.base_timestamp.as_secs() + (time_shift as u64)
};
let name = format!("c:transactions/foo_{}", self.metric_ids_generator.next());
let bucket = Bucket {
timestamp: UnixTimestamp::from_secs(timestamp),
width: 0,
name: name.into(),
value: BucketValue::counter(42.into()),
tags: BTreeMap::new(),
metadata: Default::default(),
};

let key_id = self.project_keys_generator.next();
let project_key = ProjectKey::parse(&format!("{key_id:0width$x}", width = 32)).unwrap();

(project_key, bucket)
}
}

impl fmt::Display for MetricInput {
impl fmt::Display for BucketsGenerator {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{} {:?} metrics with {} names, {} keys",
self.num_metrics,
self.bucket.value.ty(),
self.num_metric_names,
self.num_project_keys
)
write!(f, "{} buckets", self.num_buckets,)
}
}

fn bench_distribution(c: &mut Criterion) {
let mut group = c.benchmark_group("DistributionValue");

for size in [1, 10, 100, 1000, 10_000, 100_000, 1_000_000] {
let values = std::iter::from_fn(|| Some(rand::random()))
.filter_map(FiniteF64::new)
.take(size as usize)
.collect::<Vec<FiniteF64>>();

group.throughput(criterion::Throughput::Elements(size));
group.bench_with_input(BenchmarkId::from_parameter(size), &values, |b, values| {
b.iter(|| DistributionValue::from_iter(black_box(values.iter().copied())))
});
}

group.finish();
}

fn bench_insert_and_flush(c: &mut Criterion) {
let config = AggregatorConfig {
bucket_interval: 1000,
bucket_interval: 10,
initial_delay: 0,
..Default::default()
};

let counter = Bucket {
timestamp: UnixTimestamp::now(),
width: 0,
name: "c:transactions/foo@none".into(),
value: BucketValue::counter(42.into()),
tags: BTreeMap::new(),
metadata: Default::default(),
};

let inputs = [
MetricInput {
num_metrics: 1,
num_metric_names: 1,
bucket: counter.clone(),
num_project_keys: 1,
},
// scaling num_metrics
MetricInput {
num_metrics: 100,
num_metric_names: 1,
bucket: counter.clone(),
num_project_keys: 1,
},
MetricInput {
num_metrics: 1000,
num_metric_names: 1,
bucket: counter.clone(),
num_project_keys: 1,
},
// scaling num_metric_names
MetricInput {
num_metrics: 100,
num_metric_names: 100,
bucket: counter.clone(),
num_project_keys: 1,
},
MetricInput {
num_metrics: 1000,
num_metric_names: 1000,
bucket: counter.clone(),
num_project_keys: 1,
},
// scaling num_project_keys
MetricInput {
num_metrics: 100,
num_metric_names: 1,
bucket: counter.clone(),
num_project_keys: 100,
},
MetricInput {
num_metrics: 1000,
num_metric_names: 1,
bucket: counter,
num_project_keys: 1000,
},
// Metrics of same project
// Metrics of the same name
// All backdated metrics
// All non-backdated metrics
// Mix of backdated and non-backdated

let inputs = vec![
(
"multiple metrics of the same project",
BucketsGenerator {
base_timestamp: UnixTimestamp::now(),
percentage_backdated: 0.5,
num_buckets: 100_000,
metric_ids_generator: NumbersGenerator::new(1, 100),
project_keys_generator: NumbersGenerator::new(1, 1),
timestamp_shifts_generator: NumbersGenerator::new(1, 10),
},
),
(
"same metric on different projects",
BucketsGenerator {
base_timestamp: UnixTimestamp::now(),
percentage_backdated: 0.5,
num_buckets: 100_000,
metric_ids_generator: NumbersGenerator::new(1, 1),
project_keys_generator: NumbersGenerator::new(1, 100),
timestamp_shifts_generator: NumbersGenerator::new(1, 10),
},
),
(
"all backdated metrics",
BucketsGenerator {
base_timestamp: UnixTimestamp::now(),
percentage_backdated: 1.0,
num_buckets: 100_000,
metric_ids_generator: NumbersGenerator::new(1, 100),
project_keys_generator: NumbersGenerator::new(1, 100),
timestamp_shifts_generator: NumbersGenerator::new(10, 50),
},
),
(
"all non-backdated metrics",
BucketsGenerator {
base_timestamp: UnixTimestamp::now(),
percentage_backdated: 0.0,
num_buckets: 100_000,
metric_ids_generator: NumbersGenerator::new(1, 100),
project_keys_generator: NumbersGenerator::new(1, 100),
timestamp_shifts_generator: NumbersGenerator::new(10, 50),
},
),
];

for input in &inputs {
for (input_name, input) in &inputs {
c.bench_with_input(
BenchmarkId::new("bench_insert_metrics", input),
BenchmarkId::new("bench_insert_metrics", input_name),
&input,
|b, &input| {
|b, input| {
b.iter_batched(
|| {
let aggregator: Aggregator = Aggregator::new(config.clone());
(aggregator, input.get_buckets())
},
|(mut aggregator, buckets)| {
for (project_key, bucket) in buckets {
aggregator.merge(project_key, bucket, None).unwrap();
black_box(aggregator.merge(project_key, bucket, None).unwrap());
}
},
BatchSize::SmallInput,
Expand All @@ -138,7 +195,7 @@ fn bench_insert_and_flush(c: &mut Criterion) {
);

c.bench_with_input(
BenchmarkId::new("bench_flush_metrics", input),
BenchmarkId::new("bench_flush_metrics", input_name),
&input,
|b, &input| {
b.iter_batched(
Expand All @@ -152,7 +209,7 @@ fn bench_insert_and_flush(c: &mut Criterion) {
|mut aggregator| {
// XXX: Ideally we'd want to test the entire try_flush here, but spawning
// a service is too much work here.
aggregator.pop_flush_buckets(false);
black_box(aggregator.pop_flush_buckets(false));
},
BatchSize::SmallInput,
)
Expand All @@ -161,23 +218,6 @@ fn bench_insert_and_flush(c: &mut Criterion) {
}
}

fn bench_distribution(c: &mut Criterion) {
let mut group = c.benchmark_group("DistributionValue");

for size in [1, 10, 100, 1000, 10_000, 100_000, 1_000_000] {
let values = std::iter::from_fn(|| Some(rand::random()))
.filter_map(FiniteF64::new)
.take(size as usize)
.collect::<Vec<FiniteF64>>();

group.throughput(criterion::Throughput::Elements(size));
group.bench_with_input(BenchmarkId::from_parameter(size), &values, |b, values| {
b.iter(|| DistributionValue::from_iter(black_box(values.iter().copied())))
});
}

group.finish();
}

criterion_group!(benches, bench_insert_and_flush, bench_distribution);
criterion_group!(benches, bench_insert_and_flush);
// criterion_group!(benches, bench_insert_and_flush, bench_distribution);
criterion_main!(benches);

0 comments on commit b7decf2

Please sign in to comment.