Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): Expose rate limits for metrics #3347

Merged
merged 14 commits into from
Mar 28, 2024
Next Next commit
feat(server): Expose rate limits for metrics
  • Loading branch information
jan-auer committed Mar 27, 2024
commit f987757a2b282a72d0d3e3554ffbfcd75b2e2f69
57 changes: 53 additions & 4 deletions relay-quotas/src/rate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,11 @@ impl RateLimits {
// Categories are logically a set, but not implemented as such.
limit.categories.sort();

let limit_opt = self
.limits
.iter_mut()
.find(|l| l.categories == limit.categories && l.scope == limit.scope);
let limit_opt = self.limits.iter_mut().find(|l| {
l.categories == limit.categories
&& l.scope == limit.scope
&& l.namespace == limit.namespace
});

match limit_opt {
None => self.limits.push(limit),
Expand Down Expand Up @@ -686,6 +687,54 @@ mod tests {
"###);
}

/// Regression test that ensures namespaces are correctly added to rate limits.
#[test]
fn test_rate_limits_add_namespaces() {
let mut rate_limits = RateLimits::new();

rate_limits.add(RateLimit {
categories: smallvec![DataCategory::MetricBucket],
scope: RateLimitScope::Organization(42),
reason_code: None,
retry_after: RetryAfter::from_secs(1),
namespace: Some(MetricNamespace::Custom),
});

// Same scope but different categories
jan-auer marked this conversation as resolved.
Show resolved Hide resolved
rate_limits.add(RateLimit {
categories: smallvec![DataCategory::MetricBucket],
scope: RateLimitScope::Organization(42),
reason_code: None,
retry_after: RetryAfter::from_secs(1),
namespace: Some(MetricNamespace::Spans),
});

insta::assert_ron_snapshot!(rate_limits, @r###"
RateLimits(
limits: [
RateLimit(
categories: [
metric_bucket,
],
scope: Organization(42),
reason_code: None,
retry_after: RetryAfter(1),
namespace: Some("custom"),
),
RateLimit(
categories: [
metric_bucket,
],
scope: Organization(42),
reason_code: None,
retry_after: RetryAfter(1),
namespace: Some("spans"),
),
],
)
"###);
}

#[test]
fn test_rate_limits_longest() {
let mut rate_limits = RateLimits::new();
Expand Down
27 changes: 27 additions & 0 deletions relay-server/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
//! ```

use relay_event_normalization::{normalize_transaction_name, TransactionNameRule};
use relay_metrics::MetricNamespace;
use std::borrow::Borrow;
use std::collections::BTreeMap;
use std::fmt;
Expand Down Expand Up @@ -554,6 +555,12 @@ pub struct ItemHeaders {
#[serde(default, skip_serializing_if = "is_false")]
metrics_extracted: bool,

/// Namespace for metric items.
///
/// TODO(ja): Doc
#[serde(default, skip_serializing_if = "Option::is_none")]
namespace: Option<MetricNamespace>,

/// `false` if the sampling decision is "drop".
///
/// In the most common use case, the item is dropped when the sampling decision is "drop".
Expand Down Expand Up @@ -628,6 +635,7 @@ impl Item {
sample_rates: None,
other: BTreeMap::new(),
metrics_extracted: false,
namespace: None,
sampled: true,
},
payload: Bytes::new(),
Expand Down Expand Up @@ -833,6 +841,25 @@ impl Item {
self.headers.metrics_extracted = metrics_extracted;
}

/// Returns the namespace for metric items.
pub fn namespace(&self) -> Option<MetricNamespace> {
self.headers.namespace.filter(|_| self.ty().is_metrics())
}

/// Sets the namespace for metric items.
///
/// This should not be called for non-metric items.
pub fn set_namespace(&mut self, namespace: MetricNamespace) {
debug_assert!(
self.ty().is_metrics(),
"setting namespace on non-metric item"
);

if self.ty().is_metrics() {
self.headers.namespace = Some(namespace);
}
}

/// Gets the `sampled` flag.
pub fn sampled(&self) -> bool {
self.headers.sampled
Expand Down
10 changes: 9 additions & 1 deletion relay-server/src/services/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1102,11 +1102,19 @@ impl Project {
Ok(self.rate_limits.check_with_quotas(quotas, item_scoping))
});

let (enforcement, rate_limits) =
let (enforcement, mut rate_limits) =
envelope_limiter.enforce(envelope.envelope_mut(), &scoping)?;
enforcement.track_outcomes(envelope.envelope(), &scoping, outcome_aggregator);
envelope.update();

// Special case: Expose active rate limits for all metric namespaces if there is at least
// one metrics item in the Envelope to communicate backoff to SDKs. This is necessary
// because `EnvelopeLimiter` cannot not check metrics without parsing item contents.
if envelope.envelope().items().any(|i| i.ty().is_metrics()) {
let metrics_scoping = scoping.item(DataCategory::MetricBucket);
rate_limits.merge(self.rate_limits.check_with_quotas(quotas, metrics_scoping));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not work yet. The scoping does not have a namespace, hence no quotas with namespaces match.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. We now check with MetricNamespaceScoping::Any, which will resolve all metrics. This also makes the API a bit more explicit by adding specific doc comments for the semantics of None.

}

let envelope = if envelope.envelope().is_empty() {
// Individual rate limits have already been issued above
envelope.reject(Outcome::RateLimited(None));
Expand Down
70 changes: 68 additions & 2 deletions relay-server/src/utils/rate_limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ pub fn format_rate_limits(rate_limits: &RateLimits) -> String {

if let Some(ref reason_code) = rate_limit.reason_code {
write!(header, ":{reason_code}").ok();
} else if rate_limit.namespace.is_some() {
write!(header, ":").ok();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It took me a while to understand this code.

Suggested change
write!(header, ":").ok();
write!(header, ":").ok(); // delimits the empty reason code

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, the comment helps. I was hoping to rewrite this in a better way, but everything else I could come up with ended up much more complex.

}

if let Some(namespace) = rate_limit.namespace {
write!(header, ":{namespace}").ok();
}
}

Expand Down Expand Up @@ -68,14 +74,19 @@ pub fn parse_rate_limits(scoping: &Scoping, string: &str) -> RateLimits {
let quota_scope = QuotaScope::from_name(components.next().unwrap_or(""));
let scope = RateLimitScope::for_quota(scoping, quota_scope);

let reason_code = components.next().map(ReasonCode::new);
let reason_code = components
.next()
.filter(|s| !s.is_empty())
.map(ReasonCode::new);

let namespace = components.next().and_then(|s| s.parse().ok());

rate_limits.add(RateLimit {
categories,
scope,
reason_code,
retry_after,
namespace: None,
namespace,
});
}

Expand Down Expand Up @@ -749,6 +760,7 @@ mod tests {

use relay_base_schema::project::{ProjectId, ProjectKey};
use relay_dynamic_config::TransactionMetricsConfig;
use relay_metrics::MetricNamespace;
use relay_quotas::RetryAfter;
use smallvec::smallvec;

Expand Down Expand Up @@ -785,6 +797,34 @@ mod tests {
assert_eq!(formatted, expected);
}

#[test]
fn test_format_rate_limits_namespace() {
let mut rate_limits = RateLimits::new();

// Rate limit with reason code and namespace.
rate_limits.add(RateLimit {
categories: smallvec![DataCategory::MetricBucket],
scope: RateLimitScope::Organization(42),
reason_code: Some(ReasonCode::new("my_limit")),
retry_after: RetryAfter::from_secs(42),
namespace: Some(MetricNamespace::Custom),
});

// Rate limit without reason code.
rate_limits.add(RateLimit {
categories: smallvec![DataCategory::MetricBucket],
scope: RateLimitScope::Organization(42),
reason_code: None,
retry_after: RetryAfter::from_secs(42),
namespace: Some(MetricNamespace::Spans),
});

let formatted = format_rate_limits(&rate_limits);
let expected =
"42:metric_bucket:organization:my_limit:custom, 42:metric_bucket:organization::spans";
assert_eq!(formatted, expected);
}

#[test]
fn test_parse_invalid_rate_limits() {
let scoping = Scoping {
Expand Down Expand Up @@ -842,6 +882,32 @@ mod tests {
assert_eq!(4711, rate_limits[1].retry_after.remaining_seconds());
}

#[test]
fn test_parse_rate_limits_namespace() {
let scoping = Scoping {
organization_id: 42,
project_id: ProjectId::new(21),
project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
key_id: Some(17),
};

// contains "foobar", an unknown scope that should be mapped to Unknown
jan-auer marked this conversation as resolved.
Show resolved Hide resolved
let formatted = "42:metric_bucket:organization::custom";
let rate_limits: Vec<RateLimit> =
parse_rate_limits(&scoping, formatted).into_iter().collect();

assert_eq!(
rate_limits,
vec![RateLimit {
categories: smallvec![DataCategory::MetricBucket],
scope: RateLimitScope::Organization(42),
reason_code: None,
retry_after: rate_limits[0].retry_after,
namespace: Some(MetricNamespace::Custom),
},]
);
}

#[test]
fn test_parse_rate_limits_only_unknown() {
let scoping = Scoping {
Expand Down
Loading