Skip to content

Commit

Permalink
feat(metric-stats): Emit accepted volume
Browse files Browse the repository at this point in the history
  • Loading branch information
Dav1dde committed Mar 18, 2024
1 parent 2f49cfe commit e3897dc
Show file tree
Hide file tree
Showing 12 changed files with 259 additions and 36 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
- Track metric bucket metadata for Relay internal usage. ([#3254](https://github.com/getsentry/relay/pull/3254))
- Enforce rate limits for standalone spans. ([#3238](https://github.com/getsentry/relay/pull/3238))
- Extract `span.status_code` tag for HTTP spans. ([#3245](https://github.com/getsentry/relay/pull/3245))
- Implement volume metric stats. ([#3245](https://github.com/getsentry/relay/pull/3245))

**Bug Fixes**:

Expand Down
33 changes: 19 additions & 14 deletions relay-base-schema/src/metrics/mri.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,26 +136,31 @@ impl MetricNamespace {
/// Returns all namespaces/variants of this enum.
pub fn all() -> [Self; 7] {
[
MetricNamespace::Sessions,
MetricNamespace::Transactions,
MetricNamespace::Spans,
MetricNamespace::Profiles,
MetricNamespace::Custom,
MetricNamespace::Stats,
MetricNamespace::Unsupported,
Self::Sessions,
Self::Transactions,
Self::Spans,
Self::Profiles,
Self::Custom,
Self::Stats,
Self::Unsupported,
]
}

/// Returns `true` if metric stats are enabled for this namespace.
pub fn has_metric_stats(&self) -> bool {
matches!(self, Self::Custom)
}

/// Returns the string representation for this metric type.
pub fn as_str(&self) -> &'static str {
match self {
MetricNamespace::Sessions => "sessions",
MetricNamespace::Transactions => "transactions",
MetricNamespace::Spans => "spans",
MetricNamespace::Profiles => "profiles",
MetricNamespace::Custom => "custom",
MetricNamespace::Stats => "metric_stats",
MetricNamespace::Unsupported => "unsupported",
Self::Sessions => "sessions",
Self::Transactions => "transactions",
Self::Spans => "spans",
Self::Profiles => "profiles",
Self::Custom => "custom",
Self::Stats => "metric_stats",
Self::Unsupported => "unsupported",
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions relay-dynamic-config/src/feature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ pub enum Feature {
/// Serialized as `organizations:custom-metrics`.
#[serde(rename = "organizations:custom-metrics")]
CustomMetrics,
/// Enables metric stats.
///
/// Serialized as `organizations:metric-stats`.
#[serde(rename = "organizations:metric-stats")]
MetricStats,

/// Enable processing profiles.
///
Expand Down
8 changes: 4 additions & 4 deletions relay-metrics/src/bucket.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::{BTreeMap, BTreeSet};
use std::hash::Hash;
use std::iter::FusedIterator;
use std::num::NonZeroU64;
use std::num::NonZeroU32;
use std::sync::Arc;
use std::{fmt, mem};

Expand Down Expand Up @@ -755,7 +755,7 @@ pub struct BucketMetadata {
///
/// For example: Merging two un-merged buckets will yield a total
/// of `2` merges.
pub merges: NonZeroU64,
pub merges: NonZeroU32,
}

impl BucketMetadata {
Expand All @@ -764,14 +764,14 @@ impl BucketMetadata {
/// The new metadata is initialized with `1` merge.
pub fn new() -> Self {
Self {
merges: NonZeroU64::MIN,
merges: NonZeroU32::MIN,
}
}

/// Whether the metadata does not contain more information than the default.
pub fn is_default(&self) -> bool {
let Self { merges } = self;
*merges == NonZeroU64::MIN
*merges == NonZeroU32::MIN
}

/// Merges another metadata object into the current one.
Expand Down
6 changes: 6 additions & 0 deletions relay-metrics/src/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
use std::collections::BTreeMap;
use std::fmt;
use std::ops::Range;
use std::sync::Arc;

use crate::bucket::Bucket;
use crate::BucketValue;
Expand Down Expand Up @@ -376,6 +377,11 @@ impl<'a> BucketView<'a> {
&self.inner.name
}

/// Returns the name of the bucket.
pub fn clone_name(&self) -> Arc<str> {
Arc::clone(&self.inner.name)
}

/// Value of the bucket view.
pub fn value(&self) -> BucketViewValue<'a> {
match &self.inner.value {
Expand Down
6 changes: 6 additions & 0 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use tokio::runtime::Runtime;
use crate::services::cogs::{CogsService, CogsServiceRecorder};
use crate::services::global_config::{GlobalConfigManager, GlobalConfigService};
use crate::services::health_check::{HealthCheck, HealthCheckService};
use crate::services::metric_stats::{MetricStatsService, TrackMetric};
use crate::services::outcome::{OutcomeProducer, OutcomeProducerService, TrackOutcome};
use crate::services::outcome_aggregator::OutcomeAggregator;
use crate::services::processor::{EnvelopeProcessor, EnvelopeProcessorService};
Expand Down Expand Up @@ -50,6 +51,7 @@ pub struct Registry {
pub health_check: Addr<HealthCheck>,
pub outcome_producer: Addr<OutcomeProducer>,
pub outcome_aggregator: Addr<TrackOutcome>,
pub metric_stats: Addr<TrackMetric>,
pub processor: Addr<EnvelopeProcessor>,
pub test_store: Addr<TestStore>,
pub relay_cache: Addr<RelayCache>,
Expand Down Expand Up @@ -136,13 +138,16 @@ impl ServiceState {
)
.start_in(&runtimes.aggregator);

let metric_stats = MetricStatsService::new(config.clone(), aggregator.clone()).start();

#[cfg(feature = "processing")]
let store = match runtimes.store {
Some(ref rt) => Some(
StoreService::create(
config.clone(),
global_config_handle.clone(),
outcome_aggregator.clone(),
metric_stats.clone(),
)?
.start_in(rt),
),
Expand Down Expand Up @@ -214,6 +219,7 @@ impl ServiceState {
health_check,
outcome_producer,
outcome_aggregator,
metric_stats,
test_store,
relay_cache,
global_config,
Expand Down
124 changes: 124 additions & 0 deletions relay-server/src/services/metric_stats.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
use std::collections::BTreeMap;
use std::sync::{Arc, OnceLock};

use relay_config::Config;
use relay_metrics::{
Aggregator, Bucket, BucketValue, MergeBuckets, MetricResourceIdentifier, UnixTimestamp,
};
use relay_quotas::Scoping;
use relay_system::{Addr, FromMessage, Interface, NoResponse, Service};

use crate::services::outcome::Outcome;

fn volume_metric_mri() -> Arc<str> {
static VOLUME_METRIC_MRI: OnceLock<Arc<str>> = OnceLock::new();

Arc::clone(VOLUME_METRIC_MRI.get_or_init(|| "c:metric_stats/volume@none".into()))
}

#[derive(Debug)]
pub struct TrackMetric {
pub timestamp: UnixTimestamp,
pub scoping: Scoping,
pub mri: Arc<str>,
pub outcome: Outcome,
pub volume: u32,
}

impl TrackMetric {
#[cfg(feature = "processing")]
pub fn volume(
scoping: Scoping,
bucket: &relay_metrics::BucketView<'_>,
outcome: Outcome,
) -> Self {
Self {
timestamp: UnixTimestamp::now(),
scoping,
mri: bucket.clone_name(),
outcome,
volume: bucket.metadata().merges.get(),
}
}

fn to_volume_metric(&self) -> Option<Bucket> {
if self.volume == 0 {
return None;
}

let namespace = MetricResourceIdentifier::parse(&self.mri).ok()?.namespace;
if !namespace.has_metric_stats() {
return None;
}

let mut tags = BTreeMap::from([
("mri".to_owned(), self.mri.to_string()),
("mri.namespace".to_owned(), namespace.to_string()),
(
"outcome.id".to_owned(),
self.outcome.to_outcome_id().as_u8().to_string(),
),
]);

if let Some(reason) = self.outcome.to_reason() {
tags.insert("outcome.reason".to_owned(), reason.into_owned());
}

Some(Bucket {
timestamp: self.timestamp,
width: 0,
name: volume_metric_mri(),
value: BucketValue::Counter(self.volume.into()),
tags,
metadata: Default::default(),
})
}
}

impl Interface for TrackMetric {}

impl FromMessage<Self> for TrackMetric {
type Response = NoResponse;

fn from_message(message: Self, _: ()) -> Self {
message
}
}

pub struct MetricStatsService {
config: Arc<Config>,
aggregator: Addr<Aggregator>,
}

impl MetricStatsService {
pub fn new(config: Arc<Config>, aggregator: Addr<Aggregator>) -> Self {
Self { config, aggregator }
}

fn handle_track_metric(&self, tm: TrackMetric) {
if !self.config.processing_enabled() {
return;
}

let Some(volume) = tm.to_volume_metric() else {
return;
};

relay_log::trace!("Tracking volume of {} for mri '{}'", tm.volume, tm.mri);
self.aggregator
.send(MergeBuckets::new(tm.scoping.project_key, vec![volume]));
}
}

impl Service for MetricStatsService {
type Interface = TrackMetric;

fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) {
tokio::spawn(async move {
while let Some(message) = rx.recv().await {
self.handle_track_metric(message);
}
dbg!("END");
});
}
}
1 change: 1 addition & 0 deletions relay-server/src/services/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
pub mod cogs;
pub mod global_config;
pub mod health_check;
pub mod metric_stats;
pub mod outcome;
pub mod outcome_aggregator;
pub mod processor;
Expand Down
8 changes: 6 additions & 2 deletions relay-server/src/services/outcome.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ impl OutcomeId {
const INVALID: OutcomeId = OutcomeId(3);
const ABUSE: OutcomeId = OutcomeId(4);
const CLIENT_DISCARD: OutcomeId = OutcomeId(5);

pub fn as_u8(self) -> u8 {
self.0
}
}

trait TrackOutcomeLike {
Expand Down Expand Up @@ -180,7 +184,7 @@ pub enum Outcome {

impl Outcome {
/// Returns the raw numeric value of this outcome for the JSON and Kafka schema.
fn to_outcome_id(&self) -> OutcomeId {
pub fn to_outcome_id(&self) -> OutcomeId {
match self {
Outcome::Filtered(_) | Outcome::FilteredSampling(_) => OutcomeId::FILTERED,
Outcome::RateLimited(_) => OutcomeId::RATE_LIMITED,
Expand All @@ -194,7 +198,7 @@ impl Outcome {
}

/// Returns the `reason` code field of this outcome.
fn to_reason(&self) -> Option<Cow<str>> {
pub fn to_reason(&self) -> Option<Cow<str>> {
match self {
Outcome::Invalid(discard_reason) => Some(Cow::Borrowed(discard_reason.name())),
Outcome::Filtered(filter_key) => Some(filter_key.clone().name()),
Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/services/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1248,7 +1248,7 @@ fn is_metric_namespace_valid(state: &ProjectState, namespace: &MetricNamespace)
}
MetricNamespace::Profiles => true,
MetricNamespace::Custom => state.has_feature(Feature::CustomMetrics),
MetricNamespace::Stats => false,
MetricNamespace::Stats => state.has_feature(Feature::MetricStats),
MetricNamespace::Unsupported => false,
}
}
Expand Down
Loading

0 comments on commit e3897dc

Please sign in to comment.