Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): Expose rate limits for metrics #3347

Merged
merged 14 commits into from
Mar 28, 2024
Prev Previous commit
Next Next commit
fix: Properly check rate limits in the fast path
  • Loading branch information
jan-auer committed Mar 28, 2024
commit 59dc5e6db16d43619350b1760c0d32ae3b064235
32 changes: 12 additions & 20 deletions relay-quotas/src/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ mod tests {
use relay_common::time::UnixTimestamp;
use relay_redis::{RedisConfigOptions, RedisPool};

use crate::{DataCategories, ItemScoping, Quota, QuotaScope, Scoping};
use crate::{DataCategories, Quota, QuotaScope, Scoping};

fn build_redis_pool() -> RedisPool {
let url = std::env::var("RELAY_REDIS_URL")
Expand Down Expand Up @@ -284,11 +284,7 @@ mod tests {
}

fn build_redis_quota<'a>(quota: &'a Quota, scoping: &'a Scoping) -> RedisQuota<'a> {
let scoping = ItemScoping {
category: DataCategory::MetricBucket,
scoping,
namespace: None,
};
let scoping = scoping.item(DataCategory::MetricBucket);
RedisQuota::new(quota, scoping, UnixTimestamp::now()).unwrap()
}

Expand Down Expand Up @@ -470,18 +466,14 @@ mod tests {
let ts = UnixTimestamp::now();
let quota = build_quota(window, limit);
let scoping = build_scoping();
let scoping = ItemScoping {
category: DataCategory::MetricBucket,
scoping: &scoping,
namespace: None,
};
let item_scoping = scoping.item(DataCategory::MetricBucket);

let pool = build_redis_pool();
let mut client = pool.client().unwrap();

let rl = GlobalRateLimits::default();

let redis_quota = [RedisQuota::new(&quota, scoping, ts).unwrap()];
let redis_quota = [RedisQuota::new(&quota, item_scoping, ts).unwrap()];
assert!(rl
.filter_rate_limited(&mut client, &redis_quota, 200)
.unwrap()
Expand All @@ -494,7 +486,10 @@ mod tests {

// Fast forward time.
let redis_quota =
[RedisQuota::new(&quota, scoping, ts + Duration::from_secs(window + 1)).unwrap()];
[
RedisQuota::new(&quota, item_scoping, ts + Duration::from_secs(window + 1))
.unwrap(),
];
assert!(rl
.filter_rate_limited(&mut client, &redis_quota, 200)
.unwrap()
Expand All @@ -513,11 +508,8 @@ mod tests {
let timestamp = UnixTimestamp::now();

let mut quota = build_quota(100, limit);
let scoping = ItemScoping {
category: DataCategory::MetricBucket,
scoping: &build_scoping(),
namespace: None,
};
let scoping = build_scoping();
let item_scoping = scoping.item(DataCategory::MetricBucket);

let pool = build_redis_pool();
let mut client = pool.client().unwrap();
Expand All @@ -527,7 +519,7 @@ mod tests {
let quantity = 2;
let redis_threshold = (quantity as f32 / DEFAULT_BUDGET_RATIO) as u64;
for _ in 0..redis_threshold + 10 {
let redis_quota = RedisQuota::new(&quota, scoping, timestamp).unwrap();
let redis_quota = RedisQuota::new(&quota, item_scoping, timestamp).unwrap();
assert!(rl
.filter_rate_limited(&mut client, &[redis_quota], quantity)
.unwrap()
Expand All @@ -539,7 +531,7 @@ mod tests {
let rl = GlobalRateLimits::default();

quota.limit = Some(redis_threshold);
let redis_quota = RedisQuota::new(&quota, scoping, timestamp).unwrap();
let redis_quota = RedisQuota::new(&quota, item_scoping, timestamp).unwrap();

assert!(!rl
.filter_rate_limited(&mut client, &[redis_quota], quantity)
Expand Down
100 changes: 80 additions & 20 deletions relay-quotas/src/quota.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,63 @@ impl Scoping {
ItemScoping {
category,
scoping: self,
namespace: None,
namespace: MetricNamespaceScoping::None,
}
}

/// Returns an `ItemScoping` for metric buckets in this scope.
///
/// The item scoping will contain a reference to this scope, the category
/// [`MetricBucket](DataCategory::MetricBucket), and the information passed to this function.
/// This is a cheap operation to allow rate limiting for an individual item.
pub fn metric_bucket(&self, namespace: MetricNamespace) -> ItemScoping<'_> {
ItemScoping {
category: DataCategory::MetricBucket,
scoping: self,
namespace: MetricNamespaceScoping::Some(namespace),
}
}
}

/// Item scoping of metric namespaces.
///
/// This enum is used in [`ItemScoping`] to declare the contents of an item's metric namespace.
#[derive(Clone, Copy, Debug, Default, PartialEq, Hash, PartialOrd)]
pub enum MetricNamespaceScoping {
/// This item does not contain metrics of any namespace. This should only be used for non-metric
/// items.
#[default]
None,

/// This item contains metrics of a specific namespace.
Some(MetricNamespace),

/// This item contains metrics of any namespace.
///
/// The namespace of metrics contained in this item is not known. This can be used to check rate
/// limits or quotas of any namespace.
Any,
}

impl MetricNamespaceScoping {
/// Returns `true` if the given namespace matches the namespace of the item.
///
/// If the self is `Any`, this method returns `true` for any namespace.
pub fn matches(&self, namespace: MetricNamespace) -> bool {
match self {
Self::None => false,
Self::Some(ns) => *ns == namespace,
Self::Any => true,
}
}
}

impl From<MetricNamespace> for MetricNamespaceScoping {
fn from(namespace: MetricNamespace) -> Self {
Self::Some(namespace)
}
}

/// Data categorization and scoping information.
///
/// `ItemScoping` is always attached to a `Scope` and references it internally. It is a cheap,
Expand All @@ -55,7 +107,7 @@ pub struct ItemScoping<'a> {
pub scoping: &'a Scoping,

/// Namespace if applicable. `None` matches any namespace.
pub namespace: Option<MetricNamespace>,
pub namespace: MetricNamespaceScoping,
}

impl AsRef<Scoping> for ItemScoping<'_> {
Expand Down Expand Up @@ -92,6 +144,17 @@ impl ItemScoping<'_> {
// not support yet.
categories.is_empty() || categories.iter().any(|cat| *cat == self.category)
}

/// Returns `true` if the rate limit namespace matches the namespace of the item.
///
/// If the list of namespaces is empty, this method returns `true`.
pub(crate) fn matches_namespaces<'a, I>(&self, namespaces: I) -> bool
where
I: IntoIterator<Item = &'a MetricNamespace>,
{
let mut iter = namespaces.into_iter().peekable();
iter.peek().is_none() || iter.any(|ns| self.namespace.matches(*ns))
}
}

/// The unit in which a data category is measured.
Expand Down Expand Up @@ -312,11 +375,6 @@ impl Quota {
/// - the `scope_id` constraint is not numeric
/// - the scope identifier matches the one from ascoping and the scope is known
fn matches_scope(&self, scoping: ItemScoping<'_>) -> bool {
// Accept all types of namespaces if none are configured in the quota.
if self.namespace.is_some() && self.namespace != scoping.namespace {
return false;
}

if self.scope == QuotaScope::Global {
return true;
}
Expand All @@ -340,7 +398,9 @@ impl Quota {

/// Checks whether the quota's constraints match the current item.
pub fn matches(&self, scoping: ItemScoping<'_>) -> bool {
self.matches_scope(scoping) && scoping.matches_categories(&self.categories)
self.matches_scope(scoping)
&& scoping.matches_categories(&self.categories)
&& scoping.matches_namespaces(&self.namespace)
}
}

Expand Down Expand Up @@ -654,7 +714,7 @@ mod tests {
project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
key_id: Some(17),
},
namespace: None,
namespace: MetricNamespaceScoping::None,
}));
}

Expand All @@ -679,7 +739,7 @@ mod tests {
project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
key_id: Some(17),
},
namespace: None,
namespace: MetricNamespaceScoping::None,
}));
}

Expand All @@ -704,7 +764,7 @@ mod tests {
project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
key_id: Some(17),
},
namespace: None,
namespace: MetricNamespaceScoping::None,
}));

assert!(!quota.matches(ItemScoping {
Expand All @@ -715,7 +775,7 @@ mod tests {
project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
key_id: Some(17),
},
namespace: None,
namespace: MetricNamespaceScoping::None,
}));
}

Expand All @@ -740,7 +800,7 @@ mod tests {
project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
key_id: Some(17),
},
namespace: None,
namespace: MetricNamespaceScoping::None,
}));
}

Expand All @@ -765,7 +825,7 @@ mod tests {
project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
key_id: Some(17),
},
namespace: None,
namespace: MetricNamespaceScoping::None,
}));

assert!(!quota.matches(ItemScoping {
Expand All @@ -776,7 +836,7 @@ mod tests {
project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
key_id: Some(17),
},
namespace: None,
namespace: MetricNamespaceScoping::None,
}));
}

Expand All @@ -801,7 +861,7 @@ mod tests {
project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
key_id: Some(17),
},
namespace: None,
namespace: MetricNamespaceScoping::None,
}));

assert!(!quota.matches(ItemScoping {
Expand All @@ -812,7 +872,7 @@ mod tests {
project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
key_id: Some(17),
},
namespace: None,
namespace: MetricNamespaceScoping::None,
}));
}

Expand All @@ -837,7 +897,7 @@ mod tests {
project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
key_id: Some(17),
},
namespace: None,
namespace: MetricNamespaceScoping::None,
}));

assert!(!quota.matches(ItemScoping {
Expand All @@ -848,7 +908,7 @@ mod tests {
project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
key_id: Some(0),
},
namespace: None,
namespace: MetricNamespaceScoping::None,
}));

assert!(!quota.matches(ItemScoping {
Expand All @@ -859,7 +919,7 @@ mod tests {
project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
key_id: None,
},
namespace: None,
namespace: MetricNamespaceScoping::None,
}));
}
}
Loading
Loading