Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metric-stats): Emit accepted volume #3281

Merged
merged 7 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
**Internal**:

- Apply rate limits to span metrics. ([#3255](https://github.com/getsentry/relay/pull/3255))
- Implement volume metric stats. ([#3281](https://github.com/getsentry/relay/pull/3281))

## 24.3.0

Expand Down
33 changes: 19 additions & 14 deletions relay-base-schema/src/metrics/mri.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,26 +136,31 @@ impl MetricNamespace {
/// Returns all namespaces/variants of this enum.
pub fn all() -> [Self; 7] {
[
MetricNamespace::Sessions,
MetricNamespace::Transactions,
MetricNamespace::Spans,
MetricNamespace::Profiles,
MetricNamespace::Custom,
MetricNamespace::Stats,
MetricNamespace::Unsupported,
Self::Sessions,
Self::Transactions,
Self::Spans,
Self::Profiles,
Self::Custom,
Self::Stats,
Self::Unsupported,
]
}

/// Returns `true` if metric stats are enabled for this namespace.
pub fn has_metric_stats(&self) -> bool {
matches!(self, Self::Custom)
}

/// Returns the string representation for this metric type.
pub fn as_str(&self) -> &'static str {
match self {
MetricNamespace::Sessions => "sessions",
MetricNamespace::Transactions => "transactions",
MetricNamespace::Spans => "spans",
MetricNamespace::Profiles => "profiles",
MetricNamespace::Custom => "custom",
MetricNamespace::Stats => "metric_stats",
MetricNamespace::Unsupported => "unsupported",
Self::Sessions => "sessions",
Self::Transactions => "transactions",
Self::Spans => "spans",
Self::Profiles => "profiles",
Self::Custom => "custom",
Self::Stats => "metric_stats",
Self::Unsupported => "unsupported",
}
}
}
Expand Down
11 changes: 11 additions & 0 deletions relay-dynamic-config/src/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,17 @@ pub struct Options {
)]
pub metric_bucket_dist_encodings: BucketEncodings,

/// Rollout rate for metric stats.
///
/// Rate needs to be between `0.0` and `1.0`.
/// If set to `1.0` all organizations will have metric stats enabled.
#[serde(
rename = "relay.metric-stats.rollout-rate",
deserialize_with = "default_on_error",
skip_serializing_if = "is_default"
)]
pub metric_stats_rollout_rate: f32,

/// All other unknown options.
#[serde(flatten)]
other: HashMap<String, Value>,
Expand Down
8 changes: 4 additions & 4 deletions relay-metrics/src/bucket.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::{BTreeMap, BTreeSet};
use std::hash::Hash;
use std::iter::FusedIterator;
use std::num::NonZeroU64;
use std::num::NonZeroU32;
use std::sync::Arc;
use std::{fmt, mem};

Expand Down Expand Up @@ -755,7 +755,7 @@ pub struct BucketMetadata {
///
/// For example: Merging two un-merged buckets will yield a total
/// of `2` merges.
pub merges: NonZeroU64,
pub merges: NonZeroU32,
}

impl BucketMetadata {
Expand All @@ -764,14 +764,14 @@ impl BucketMetadata {
/// The new metadata is initialized with `1` merge.
pub fn new() -> Self {
Self {
merges: NonZeroU64::MIN,
merges: NonZeroU32::MIN,
}
}

/// Whether the metadata does not contain more information than the default.
pub fn is_default(&self) -> bool {
let Self { merges } = self;
*merges == NonZeroU64::MIN
*merges == NonZeroU32::MIN
}

/// Merges another metadata object into the current one.
Expand Down
8 changes: 8 additions & 0 deletions relay-metrics/src/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
use std::collections::BTreeMap;
use std::fmt;
use std::ops::Range;
use std::sync::Arc;

use crate::bucket::Bucket;
use crate::BucketValue;
Expand Down Expand Up @@ -376,6 +377,13 @@ impl<'a> BucketView<'a> {
&self.inner.name
}

/// Returns the name of the bucket.
Dav1dde marked this conversation as resolved.
Show resolved Hide resolved
///
/// Caller holds shared ownership of the string.
pub fn clone_name(&self) -> Arc<str> {
Arc::clone(&self.inner.name)
}

/// Value of the bucket view.
pub fn value(&self) -> BucketViewValue<'a> {
match &self.inner.value {
Expand Down
2 changes: 2 additions & 0 deletions relay-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,8 @@ mod endpoints;
mod envelope;
mod extractors;
mod http;
#[cfg(feature = "processing")]
mod metric_stats;
mod metrics_extraction;
mod middlewares;
mod service;
Expand Down
249 changes: 249 additions & 0 deletions relay-server/src/metric_stats.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
use std::collections::BTreeMap;
use std::sync::{Arc, OnceLock};

use relay_config::Config;
use relay_metrics::{
Aggregator, Bucket, BucketValue, BucketView, MergeBuckets, MetricResourceIdentifier,
UnixTimestamp,
};
use relay_quotas::Scoping;
use relay_system::Addr;

use crate::services::global_config::GlobalConfigHandle;
use crate::services::outcome::Outcome;

fn volume_metric_mri() -> Arc<str> {
static VOLUME_METRIC_MRI: OnceLock<Arc<str>> = OnceLock::new();

Arc::clone(VOLUME_METRIC_MRI.get_or_init(|| "c:metric_stats/volume@none".into()))
}

/// Tracks stats about metrics.
///
/// Metric stats are similar to outcomes for envelopes, they record
/// the final "fate" of a metric and its properties.
///
/// Unlike outcomes metric stats are tracked on a per MRI basis
/// and contain additional metadata, like the cardinality of a metric.
#[derive(Clone, Debug)]
pub struct MetricStats {
config: Arc<Config>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Is config only used to check processing_enabled? If so, we could give MetricStats a noop variant instead, something like

enum MetricStats {
    Disabled,
    Enabled {
        global_config: GlobalConfigHandle,
        aggregator: Addr<Aggregator>,
    }
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes dealing with self awkward, since I have to pattern match it in every method.

global_config: GlobalConfigHandle,
aggregator: Addr<Aggregator>,
}

impl MetricStats {
/// Creates a new [`MetricStats`] instance.
pub fn new(
config: Arc<Config>,
global_config: GlobalConfigHandle,
aggregator: Addr<Aggregator>,
) -> Self {
Self {
config,
global_config,
aggregator,
}
}

/// Tracks the metric volume and outcome for the bucket.
pub fn track(&self, scoping: Scoping, bucket: &BucketView<'_>, outcome: Outcome) {
if !self.config.processing_enabled() || !self.is_rolled_out(scoping.organization_id) {
return;
}

let Some(volume) = self.to_volume_metric(bucket, &outcome) else {
return;
};

relay_log::trace!(
"Tracking volume of {} for mri '{}': {}",
bucket.metadata().merges.get(),
bucket.name(),
outcome
);
self.aggregator
.send(MergeBuckets::new(scoping.project_key, vec![volume]));
}

fn is_rolled_out(&self, organization_id: u64) -> bool {
let rate = self
.global_config
.current()
.options
.metric_stats_rollout_rate;

((organization_id % 100000) as f32 / 100000.0f32) <= rate
}

fn to_volume_metric(&self, bucket: &BucketView<'_>, outcome: &Outcome) -> Option<Bucket> {
let volume = bucket.metadata().merges.get();
if volume == 0 {
return None;
}

let namespace = MetricResourceIdentifier::parse(bucket.name())
.ok()?
.namespace;
if !namespace.has_metric_stats() {
return None;
}

let mut tags = BTreeMap::from([
("mri".to_owned(), bucket.name().to_string()),
("mri.namespace".to_owned(), namespace.to_string()),
(
"outcome.id".to_owned(),
outcome.to_outcome_id().as_u8().to_string(),
),
]);

if let Some(reason) = outcome.to_reason() {
tags.insert("outcome.reason".to_owned(), reason.into_owned());
}

Some(Bucket {
timestamp: UnixTimestamp::now(),
width: 0,
name: volume_metric_mri(),
value: BucketValue::Counter(volume.into()),
tags,
metadata: Default::default(),
})
}
}

#[cfg(test)]
mod tests {
use relay_base_schema::project::{ProjectId, ProjectKey};
use relay_dynamic_config::GlobalConfig;
use relay_quotas::ReasonCode;
use tokio::sync::mpsc::UnboundedReceiver;

use super::*;

fn create_metric_stats(rollout_rate: f32) -> (MetricStats, UnboundedReceiver<Aggregator>) {
let config = Config::from_json_value(serde_json::json!({
"processing": {
"enabled": true,
"kafka_config": [],
}
}))
.unwrap();

let mut global_config = GlobalConfig::default();
global_config.options.metric_stats_rollout_rate = rollout_rate;
let global_config = GlobalConfigHandle::fixed(global_config);

let (addr, receiver) = Addr::custom();
let ms = MetricStats::new(Arc::new(config), global_config, addr);

(ms, receiver)
}

fn scoping() -> Scoping {
Scoping {
organization_id: 42,
project_id: ProjectId::new(21),
project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
key_id: Some(17),
}
}

macro_rules! tags {
($(($key:expr, $value:expr),)*) => {
BTreeMap::from([
$(($key.to_owned(), $value.to_owned())),*
])
}
}

#[test]
fn test_metric_stats_volume() {
let (ms, mut receiver) = create_metric_stats(1.0);

let scoping = scoping();
let mut bucket = Bucket::parse(b"rt@millisecond:57|d", UnixTimestamp::now()).unwrap();

ms.track(scoping, &BucketView::from(&bucket), Outcome::Accepted);

bucket.metadata.merges = bucket.metadata.merges.saturating_add(41);
ms.track(
scoping,
&BucketView::from(&bucket),
Outcome::RateLimited(Some(ReasonCode::new("foobar"))),
);

drop(ms);

let Aggregator::MergeBuckets(mb) = receiver.blocking_recv().unwrap() else {
panic!();
};
assert_eq!(mb.project_key(), scoping.project_key);

let mut buckets = mb.buckets();
assert_eq!(buckets.len(), 1);
let bucket = buckets.pop().unwrap();

assert_eq!(&*bucket.name, "c:metric_stats/volume@none");
assert_eq!(bucket.value, BucketValue::Counter(1.into()));
assert_eq!(
bucket.tags,
tags!(
("mri", "d:custom/rt@millisecond"),
("mri.namespace", "custom"),
("outcome.id", "0"),
)
);

let Aggregator::MergeBuckets(mb) = receiver.blocking_recv().unwrap() else {
panic!();
};
assert_eq!(mb.project_key(), scoping.project_key);

let mut buckets = mb.buckets();
assert_eq!(buckets.len(), 1);
let bucket = buckets.pop().unwrap();

assert_eq!(&*bucket.name, "c:metric_stats/volume@none");
assert_eq!(bucket.value, BucketValue::Counter(42.into()));
assert_eq!(
bucket.tags,
tags!(
("mri", "d:custom/rt@millisecond"),
("mri.namespace", "custom"),
("outcome.id", "2"),
("outcome.reason", "foobar"),
)
);

assert!(receiver.blocking_recv().is_none());
}

#[test]
fn test_metric_stats_rollout_rate_disabled() {
let (ms, mut receiver) = create_metric_stats(0.0);

let scoping = scoping();
let bucket = Bucket::parse(b"rt@millisecond:57|d", UnixTimestamp::now()).unwrap();
ms.track(scoping, &BucketView::from(&bucket), Outcome::Accepted);

drop(ms);

assert!(receiver.blocking_recv().is_none());
}

#[test]
fn test_metric_stats_disabled_namespace() {
let (ms, mut receiver) = create_metric_stats(1.0);

let scoping = scoping();
let bucket =
Bucket::parse(b"transactions/rt@millisecond:57|d", UnixTimestamp::now()).unwrap();
ms.track(scoping, &BucketView::from(&bucket), Outcome::Accepted);

drop(ms);

assert!(receiver.blocking_recv().is_none());
}
}
Loading
Loading