-
Notifications
You must be signed in to change notification settings - Fork 92
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(metric-stats): Emit accepted volume
- Loading branch information
Showing
10 changed files
with
242 additions
and
36 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
use std::collections::BTreeMap; | ||
use std::sync::Arc; | ||
|
||
use relay_config::Config; | ||
use relay_metrics::{ | ||
Aggregator, Bucket, BucketValue, BucketView, MergeBuckets, MetricResourceIdentifier, | ||
UnixTimestamp, | ||
}; | ||
use relay_quotas::Scoping; | ||
use relay_system::{Addr, FromMessage, Interface, NoResponse, Service}; | ||
|
||
use crate::services::outcome::Outcome; | ||
|
||
#[derive(Debug)] | ||
pub struct TrackMetric { | ||
pub timestamp: UnixTimestamp, | ||
pub scoping: Scoping, | ||
pub mri: String, | ||
pub outcome: Outcome, | ||
pub volume: u32, | ||
} | ||
|
||
impl TrackMetric { | ||
pub fn volume(scoping: Scoping, bucket: &BucketView<'_>, outcome: Outcome) -> Self { | ||
Self { | ||
timestamp: UnixTimestamp::now(), | ||
scoping, | ||
mri: bucket.name().to_owned(), | ||
outcome, | ||
volume: bucket.metadata().merges.get(), | ||
} | ||
} | ||
|
||
fn to_volume_metric(&self) -> Option<Bucket> { | ||
if self.volume == 0 { | ||
return None; | ||
} | ||
|
||
let namespace = MetricResourceIdentifier::parse(&self.mri).ok()?.namespace; | ||
if !namespace.has_metric_stats() { | ||
return None; | ||
} | ||
|
||
let mut tags = BTreeMap::from([ | ||
("mri".to_owned(), self.mri.to_owned()), | ||
("mri.namespace".to_owned(), namespace.to_string()), | ||
( | ||
"outcome.id".to_owned(), | ||
self.outcome.to_outcome_id().as_u8().to_string(), | ||
), | ||
]); | ||
|
||
if let Some(reason) = self.outcome.to_reason() { | ||
tags.insert("outcome.reason".to_owned(), reason.into_owned()); | ||
} | ||
|
||
Some(Bucket { | ||
timestamp: self.timestamp, | ||
width: 0, | ||
name: "c:metric_stats/volume@none".to_owned(), | ||
value: BucketValue::Counter(self.volume.into()), | ||
tags, | ||
metadata: Default::default(), | ||
}) | ||
} | ||
} | ||
|
||
impl Interface for TrackMetric {} | ||
|
||
impl FromMessage<Self> for TrackMetric { | ||
type Response = NoResponse; | ||
|
||
fn from_message(message: Self, _: ()) -> Self { | ||
message | ||
} | ||
} | ||
|
||
pub struct MetricStatsService { | ||
config: Arc<Config>, | ||
aggregator: Addr<Aggregator>, | ||
} | ||
|
||
impl MetricStatsService { | ||
pub fn new(config: Arc<Config>, aggregator: Addr<Aggregator>) -> Self { | ||
Self { config, aggregator } | ||
} | ||
|
||
fn handle_track_metric(&self, tm: TrackMetric) { | ||
if !self.config.processing_enabled() { | ||
return; | ||
} | ||
|
||
let Some(volume) = tm.to_volume_metric() else { | ||
return; | ||
}; | ||
|
||
relay_log::trace!("Tracking volume of {} for mri '{}'", tm.volume, tm.mri); | ||
self.aggregator | ||
.send(MergeBuckets::new(tm.scoping.project_key, vec![volume])); | ||
} | ||
} | ||
|
||
impl Service for MetricStatsService { | ||
type Interface = TrackMetric; | ||
|
||
fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) { | ||
tokio::spawn(async move { | ||
while let Some(message) = rx.recv().await { | ||
self.handle_track_metric(message); | ||
} | ||
dbg!("END"); | ||
}); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.