Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metric-stats): Emit accepted volume #3281

Merged
merged 7 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
global config and tests
  • Loading branch information
Dav1dde committed Mar 19, 2024
commit 74e0747cc83315e4f024528d5dc44331a28d8ca1
5 changes: 0 additions & 5 deletions relay-dynamic-config/src/feature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,6 @@ pub enum Feature {
/// Serialized as `organizations:custom-metrics`.
#[serde(rename = "organizations:custom-metrics")]
CustomMetrics,
/// Enables metric stats.
///
/// Serialized as `organizations:metric-stats`.
#[serde(rename = "organizations:metric-stats")]
MetricStats,

/// Enable processing profiles.
///
Expand Down
11 changes: 11 additions & 0 deletions relay-dynamic-config/src/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,17 @@ pub struct Options {
)]
pub metric_bucket_dist_encodings: BucketEncodings,

/// Rollout rate for metric stats.
///
/// Rate needs to be between `0.0` and `1.0`.
/// If set to `1.0` all organizations will have metric stats enabled.
#[serde(
rename = "relay.metric-stats.rollout-rate",
deserialize_with = "default_on_error",
skip_serializing_if = "is_default"
)]
pub metric_stats_rollout_rate: f32,

/// All other unknown options.
#[serde(flatten)]
other: HashMap<String, Value>,
Expand Down
161 changes: 158 additions & 3 deletions relay-server/src/metric_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use relay_metrics::{
use relay_quotas::Scoping;
use relay_system::Addr;

use crate::services::global_config::GlobalConfigHandle;
use crate::services::outcome::Outcome;

fn volume_metric_mri() -> Arc<str> {
Expand All @@ -27,18 +28,27 @@ fn volume_metric_mri() -> Arc<str> {
#[derive(Clone, Debug)]
pub struct MetricStats {
config: Arc<Config>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Is config only used to check processing_enabled? If so, we could give MetricStats a noop variant instead, something like

enum MetricStats {
    Disabled,
    Enabled {
        global_config: GlobalConfigHandle,
        aggregator: Addr<Aggregator>,
    }
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes dealing with self awkward, since I have to pattern match it in every method.

global_config: GlobalConfigHandle,
aggregator: Addr<Aggregator>,
}

impl MetricStats {
/// Creates a new [`MetricStats`] instance.
pub fn new(config: Arc<Config>, aggregator: Addr<Aggregator>) -> Self {
Self { config, aggregator }
pub fn new(
config: Arc<Config>,
global_config: GlobalConfigHandle,
aggregator: Addr<Aggregator>,
) -> Self {
Self {
config,
global_config,
aggregator,
}
}

/// Tracks the metric volume and outcome for the bucket.
pub fn track(&self, scoping: Scoping, bucket: &BucketView<'_>, outcome: Outcome) {
if !self.config.processing_enabled() {
if !self.config.processing_enabled() || !self.is_rolled_out(scoping.organization_id) {
return;
}

Expand All @@ -56,6 +66,16 @@ impl MetricStats {
.send(MergeBuckets::new(scoping.project_key, vec![volume]));
}

fn is_rolled_out(&self, organization_id: u64) -> bool {
let rate = self
.global_config
.current()
.options
.metric_stats_rollout_rate;

((organization_id % 100000) as f32 / 100000.0f32) <= rate
}

fn to_volume_metric(&self, bucket: &BucketView<'_>, outcome: &Outcome) -> Option<Bucket> {
let volume = bucket.metadata().merges.get();
if volume == 0 {
Expand Down Expand Up @@ -92,3 +112,138 @@ impl MetricStats {
})
}
}

#[cfg(test)]
mod tests {
use relay_base_schema::project::{ProjectId, ProjectKey};
use relay_dynamic_config::GlobalConfig;
use relay_quotas::ReasonCode;
use tokio::sync::mpsc::UnboundedReceiver;

use super::*;

fn create_metric_stats(rollout_rate: f32) -> (MetricStats, UnboundedReceiver<Aggregator>) {
let config = Config::from_json_value(serde_json::json!({
"processing": {
"enabled": true,
"kafka_config": [],
}
}))
.unwrap();

let mut global_config = GlobalConfig::default();
global_config.options.metric_stats_rollout_rate = rollout_rate;
let global_config = GlobalConfigHandle::fixed(global_config);

let (addr, receiver) = Addr::custom();
let ms = MetricStats::new(Arc::new(config), global_config, addr);

(ms, receiver)
}

fn scoping() -> Scoping {
Scoping {
organization_id: 42,
project_id: ProjectId::new(21),
project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
key_id: Some(17),
}
}

macro_rules! tags {
($(($key:expr, $value:expr),)*) => {
BTreeMap::from([
$(($key.to_owned(), $value.to_owned())),*
])
}
}

#[test]
fn test_metric_stats_volume() {
let (ms, mut receiver) = create_metric_stats(1.0);

let scoping = scoping();
let mut bucket = Bucket::parse(b"rt@millisecond:57|d", UnixTimestamp::now()).unwrap();

ms.track(scoping, &BucketView::from(&bucket), Outcome::Accepted);

bucket.metadata.merges = bucket.metadata.merges.saturating_add(41);
ms.track(
scoping,
&BucketView::from(&bucket),
Outcome::RateLimited(Some(ReasonCode::new("foobar"))),
);

drop(ms);

let Aggregator::MergeBuckets(mb) = receiver.blocking_recv().unwrap() else {
panic!();
};
assert_eq!(mb.project_key(), scoping.project_key);

let mut buckets = mb.buckets();
assert_eq!(buckets.len(), 1);
let bucket = buckets.pop().unwrap();

assert_eq!(&*bucket.name, "c:metric_stats/volume@none");
assert_eq!(bucket.value, BucketValue::Counter(1.into()));
assert_eq!(
bucket.tags,
tags!(
("mri", "d:custom/rt@millisecond"),
("mri.namespace", "custom"),
("outcome.id", "0"),
)
);

let Aggregator::MergeBuckets(mb) = receiver.blocking_recv().unwrap() else {
panic!();
};
assert_eq!(mb.project_key(), scoping.project_key);

let mut buckets = mb.buckets();
assert_eq!(buckets.len(), 1);
let bucket = buckets.pop().unwrap();

assert_eq!(&*bucket.name, "c:metric_stats/volume@none");
assert_eq!(bucket.value, BucketValue::Counter(42.into()));
assert_eq!(
bucket.tags,
tags!(
("mri", "d:custom/rt@millisecond"),
("mri.namespace", "custom"),
("outcome.id", "2"),
("outcome.reason", "foobar"),
)
);

assert!(receiver.blocking_recv().is_none());
}

#[test]
fn test_metric_stats_rollout_rate_disabled() {
let (ms, mut receiver) = create_metric_stats(0.0);

let scoping = scoping();
let bucket = Bucket::parse(b"rt@millisecond:57|d", UnixTimestamp::now()).unwrap();
ms.track(scoping, &BucketView::from(&bucket), Outcome::Accepted);

drop(ms);

assert!(receiver.blocking_recv().is_none());
}

#[test]
fn test_metric_stats_disabled_namespace() {
let (ms, mut receiver) = create_metric_stats(1.0);

let scoping = scoping();
let bucket =
Bucket::parse(b"transactions/rt@millisecond:57|d", UnixTimestamp::now()).unwrap();
ms.track(scoping, &BucketView::from(&bucket), Outcome::Accepted);

drop(ms);

assert!(receiver.blocking_recv().is_none());
}
}
6 changes: 5 additions & 1 deletion relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,11 @@ impl ServiceState {
)
.start_in(&runtimes.aggregator);

let metric_stats = MetricStats::new(config.clone(), aggregator.clone());
let metric_stats = MetricStats::new(
config.clone(),
global_config_handle.clone(),
aggregator.clone(),
);

#[cfg(feature = "processing")]
let store = match runtimes.store {
Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/services/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1248,7 +1248,7 @@ fn is_metric_namespace_valid(state: &ProjectState, namespace: &MetricNamespace)
}
MetricNamespace::Profiles => true,
MetricNamespace::Custom => state.has_feature(Feature::CustomMetrics),
MetricNamespace::Stats => state.has_feature(Feature::MetricStats),
MetricNamespace::Stats => true,
MetricNamespace::Unsupported => false,
}
}
Expand Down
2 changes: 2 additions & 0 deletions tests/integration/test_metric_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ def metric_stats_by_mri(metrics_consumer, count, timeout=None):


def test_metric_stats_simple(mini_sentry, relay_with_processing, metrics_consumer):
mini_sentry.global_config["options"]["relay.metric-stats.rollout-rate"] = 1.0

metrics_consumer = metrics_consumer()
relay = relay_with_processing(options=TEST_CONFIG)

Expand Down