Skip to content

Commit

Permalink
Rate limiting for shard operations (qdrant#5582)
Browse files Browse the repository at this point in the history
* Rate limiting for shard operations

* address all review comments in one go
  • Loading branch information
agourlay authored and timvisee committed Dec 9, 2024
1 parent b94f13a commit 3c19eea
Show file tree
Hide file tree
Showing 19 changed files with 217 additions and 10 deletions.
2 changes: 2 additions & 0 deletions docs/grpc/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -1432,6 +1432,8 @@ Note: 1kB = 1 vector of size 256. |
| search_max_oversampling | [float](#float) | optional | |
| upsert_max_batchsize | [uint64](#uint64) | optional | |
| max_collection_vector_size_bytes | [uint64](#uint64) | optional | |
| read_rate_limit_per_sec | [uint32](#uint32) | optional | |
| write_rate_limit_per_sec | [uint32](#uint32) | optional | |



Expand Down
14 changes: 14 additions & 0 deletions docs/redoc/master/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -7298,6 +7298,20 @@
"format": "uint",
"minimum": 0,
"nullable": true
},
"read_rate_limit_per_sec": {
"description": "Max number of read operations per second per shard per peer",
"type": "integer",
"format": "uint",
"minimum": 0,
"nullable": true
},
"write_rate_limit_per_sec": {
"description": "Max number of write operations per second per shard per peer",
"type": "integer",
"format": "uint",
"minimum": 0,
"nullable": true
}
}
},
Expand Down
4 changes: 4 additions & 0 deletions lib/api/src/grpc/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1635,6 +1635,8 @@ impl From<StrictModeConfig> for segment::types::StrictModeConfig {
max_collection_vector_size_bytes: value
.max_collection_vector_size_bytes
.map(|i| i as usize),
read_rate_limit_per_sec: value.read_rate_limit_per_sec.map(|i| i as usize),
write_rate_limit_per_sec: value.write_rate_limit_per_sec.map(|i| i as usize),
}
}
}
Expand All @@ -1654,6 +1656,8 @@ impl From<segment::types::StrictModeConfig> for StrictModeConfig {
max_collection_vector_size_bytes: value
.max_collection_vector_size_bytes
.map(|i| i as u64),
read_rate_limit_per_sec: value.read_rate_limit_per_sec.map(|i| i as u32),
write_rate_limit_per_sec: value.write_rate_limit_per_sec.map(|i| i as u32),
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions lib/api/src/grpc/proto/collections.proto
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,8 @@ message StrictModeConfig {
optional float search_max_oversampling = 8;
optional uint64 upsert_max_batchsize = 9;
optional uint64 max_collection_vector_size_bytes = 10;
optional uint32 read_rate_limit_per_sec = 11;
optional uint32 write_rate_limit_per_sec = 12;
}

message CreateCollection {
Expand Down
4 changes: 4 additions & 0 deletions lib/api/src/grpc/qdrant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,10 @@ pub struct StrictModeConfig {
pub upsert_max_batchsize: ::core::option::Option<u64>,
#[prost(uint64, optional, tag = "10")]
pub max_collection_vector_size_bytes: ::core::option::Option<u64>,
#[prost(uint32, optional, tag = "11")]
pub read_rate_limit_per_sec: ::core::option::Option<u32>,
#[prost(uint32, optional, tag = "12")]
pub write_rate_limit_per_sec: ::core::option::Option<u32>,
}
#[derive(validator::Validate)]
#[derive(serde::Serialize)]
Expand Down
7 changes: 7 additions & 0 deletions lib/collection/src/collection/collection_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,14 @@ impl Collection {
config.strict_mode_config = Some(strict_mode_diff);
}
}
// update collection config
self.collection_config.read().await.save(&self.path)?;
// apply config change to all shards
let shard_holder = self.shards_holder.read().await;
let updates = shard_holder
.all_shards()
.map(|replica_set| replica_set.on_strict_mode_config_update());
future::try_join_all(updates).await?;
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion lib/collection/src/operations/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1107,7 +1107,6 @@ impl CollectionError {
Self::Cancelled { .. } => true,
Self::OutOfMemory { .. } => true,
Self::PreConditionFailed { .. } => true,
Self::RateLimitExceeded { .. } => true,
// Not transient
Self::BadInput { .. } => false,
Self::NotFound { .. } => false,
Expand All @@ -1119,6 +1118,7 @@ impl CollectionError {
Self::ObjectStoreError { .. } => false,
Self::StrictMode { .. } => false,
Self::InferenceError { .. } => false,
Self::RateLimitExceeded { .. } => false,
}
}

Expand Down
2 changes: 2 additions & 0 deletions lib/collection/src/operations/verification/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,8 @@ mod test {
search_max_oversampling: Some(0.2),
upsert_max_batchsize: None,
max_collection_vector_size_bytes: None,
read_rate_limit_per_sec: None,
write_rate_limit_per_sec: None,
};

fixture_collection(&strict_mode_config).await
Expand Down
2 changes: 2 additions & 0 deletions lib/collection/src/shards/dummy_shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ impl DummyShard {
self.dummy()
}

pub async fn on_strict_mode_config_update(&self) {}

pub fn get_telemetry_data(&self) -> LocalShardTelemetry {
LocalShardTelemetry {
variant_name: Some("dummy shard".into()),
Expand Down
4 changes: 4 additions & 0 deletions lib/collection/src/shards/forward_proxy_shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,10 @@ impl ForwardProxyShard {
self.wrapped_shard.on_optimizer_config_update().await
}

pub async fn on_strict_mode_config_update(&self) {
self.wrapped_shard.on_strict_mode_config_update().await
}

pub fn trigger_optimizers(&self) {
self.wrapped_shard.trigger_optimizers();
}
Expand Down
52 changes: 44 additions & 8 deletions lib/collection/src/shards/local_shard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ pub struct LocalShard {
update_runtime: Handle,
pub(super) search_runtime: Handle,
disk_usage_watcher: DiskUsageWatcher,
read_rate_limiter: Option<ParkingMutex<RateLimiter>>,
write_rate_limiter: Option<ParkingMutex<RateLimiter>>,
read_rate_limiter: ParkingMutex<Option<RateLimiter>>,
write_rate_limiter: ParkingMutex<Option<RateLimiter>>,
}

/// Shard holds information about segments and WAL.
Expand Down Expand Up @@ -196,6 +196,20 @@ impl LocalShard {

let update_tracker = segment_holder.read().update_tracker();

let read_rate_limiter = config.strict_mode_config.as_ref().and_then(|strict_mode| {
strict_mode
.read_rate_limit_per_sec
.map(RateLimiter::with_rate_per_sec)
});
let read_rate_limiter = ParkingMutex::new(read_rate_limiter);

let write_rate_limiter = config.strict_mode_config.as_ref().and_then(|strict_mode| {
strict_mode
.write_rate_limit_per_sec
.map(RateLimiter::with_rate_per_sec)
});
let write_rate_limiter = ParkingMutex::new(write_rate_limiter);

drop(config); // release `shared_config` from borrow checker

Self {
Expand All @@ -214,8 +228,8 @@ impl LocalShard {
optimizers_log,
total_optimized_points,
disk_usage_watcher,
read_rate_limiter: None, // TODO initialize rate limiter from config
write_rate_limiter: None, // TODO initialize rate limiter from config
read_rate_limiter,
write_rate_limiter,
}
}

Expand Down Expand Up @@ -741,6 +755,28 @@ impl LocalShard {
Ok(())
}

/// Apply shard's strict mode configuration update
/// - Update read and write rate limiters
pub async fn on_strict_mode_config_update(&self) {
let config = self.collection_config.read().await;

if let Some(strict_mode_config) = &config.strict_mode_config {
// Update read rate limiter
if let Some(read_rate_limit_per_sec) = strict_mode_config.read_rate_limit_per_sec {
let mut read_rate_limiter_guard = self.read_rate_limiter.lock();
read_rate_limiter_guard
.replace(RateLimiter::with_rate_per_sec(read_rate_limit_per_sec));
}

// update write rate limiter
if let Some(write_rate_limit_per_sec) = strict_mode_config.write_rate_limit_per_sec {
let mut write_rate_limiter_guard = self.write_rate_limiter.lock();
write_rate_limiter_guard
.replace(RateLimiter::with_rate_per_sec(write_rate_limit_per_sec));
}
}
}

pub fn trigger_optimizers(&self) {
// Send a trigger signal and ignore errors because all error cases are acceptable:
// - If receiver is already dead - we do not care
Expand Down Expand Up @@ -1107,8 +1143,8 @@ impl LocalShard {
///
/// Returns an error if the rate limit is exceeded.
fn check_write_rate_limiter(&self) -> CollectionResult<()> {
if let Some(rate_limiter) = &self.write_rate_limiter {
if !rate_limiter.lock().check() {
if let Some(rate_limiter) = self.write_rate_limiter.lock().as_mut() {
if !rate_limiter.check() {
return Err(CollectionError::RateLimitExceeded {
description: "Write rate limit exceeded, retry later".to_string(),
});
Expand All @@ -1121,8 +1157,8 @@ impl LocalShard {
///
/// Returns an error if the rate limit is exceeded.
fn check_read_rate_limiter(&self) -> CollectionResult<()> {
if let Some(rate_limiter) = &self.read_rate_limiter {
if !rate_limiter.lock().check() {
if let Some(rate_limiter) = self.read_rate_limiter.lock().as_mut() {
if !rate_limiter.check() {
return Err(CollectionError::RateLimitExceeded {
description: "Read rate limit exceeded, retry later".to_string(),
});
Expand Down
4 changes: 4 additions & 0 deletions lib/collection/src/shards/proxy_shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ impl ProxyShard {
self.wrapped_shard.on_optimizer_config_update().await
}

pub async fn on_strict_mode_config_update(&self) {
self.wrapped_shard.on_strict_mode_config_update().await;
}

pub fn trigger_optimizers(&self) {
// TODO: we might want to defer this trigger until we unproxy
self.wrapped_shard.trigger_optimizers();
Expand Down
7 changes: 7 additions & 0 deletions lib/collection/src/shards/queue_proxy_shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,13 @@ impl QueueProxyShard {
.await
}

pub async fn on_strict_mode_config_update(&self) {
self.inner_unchecked()
.wrapped_shard
.on_strict_mode_config_update()
.await
}

pub fn trigger_optimizers(&self) {
self.inner_unchecked().wrapped_shard.trigger_optimizers();
}
Expand Down
10 changes: 9 additions & 1 deletion lib/collection/src/shards/replica_set/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,15 @@ impl ShardReplicaSet {
}
}

/// Check if the are any locally disabled peers
pub(crate) async fn on_strict_mode_config_update(&self) -> CollectionResult<()> {
let read_local = self.local.read().await;
if let Some(shard) = &*read_local {
shard.on_strict_mode_config_update().await
}
Ok(())
}

/// Check if there are any locally disabled peers
/// And if so, report them to the consensus
pub fn sync_local_state<F>(&self, get_shard_transfers: F) -> CollectionResult<()>
where
Expand Down
10 changes: 10 additions & 0 deletions lib/collection/src/shards/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,16 @@ impl Shard {
}
}

pub async fn on_strict_mode_config_update(&self) {
match self {
Shard::Local(local_shard) => local_shard.on_strict_mode_config_update().await,
Shard::Proxy(proxy_shard) => proxy_shard.on_strict_mode_config_update().await,
Shard::ForwardProxy(proxy_shard) => proxy_shard.on_strict_mode_config_update().await,
Shard::QueueProxy(proxy_shard) => proxy_shard.on_strict_mode_config_update().await,
Shard::Dummy(dummy_shard) => dummy_shard.on_strict_mode_config_update().await,
}
}

pub fn trigger_optimizers(&self) {
match self {
Shard::Local(local_shard) => local_shard.trigger_optimizers(),
Expand Down
6 changes: 6 additions & 0 deletions lib/common/common/src/rate_limiting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ impl RateLimiter {
}
}

/// Create a new rate limiter from a rate per second.
pub fn with_rate_per_sec(rate_per_sec: usize) -> Self {
let rate = Rate::new(rate_per_sec as u64, Duration::from_secs(1));
Self::new(rate)
}

/// Attempt to consume a token. Returns `true` if allowed, `false` otherwise.
pub fn check(&mut self) -> bool {
let now = Instant::now();
Expand Down
12 changes: 12 additions & 0 deletions lib/segment/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,14 @@ pub struct StrictModeConfig {
/// Max size of a collections vector storage in bytes
#[serde(skip_serializing_if = "Option::is_none")]
pub max_collection_vector_size_bytes: Option<usize>,

/// Max number of read operations per second per shard per peer
#[serde(skip_serializing_if = "Option::is_none")]
pub read_rate_limit_per_sec: Option<usize>,

/// Max number of write operations per second per shard per peer
#[serde(skip_serializing_if = "Option::is_none")]
pub write_rate_limit_per_sec: Option<usize>,
}

impl Eq for StrictModeConfig {}
Expand All @@ -729,6 +737,8 @@ impl Hash for StrictModeConfig {
search_max_oversampling: _,
upsert_max_batchsize,
max_collection_vector_size_bytes,
read_rate_limit_per_sec,
write_rate_limit_per_sec,
} = self;
(
enabled,
Expand All @@ -740,6 +750,8 @@ impl Hash for StrictModeConfig {
search_allow_exact,
upsert_max_batchsize,
max_collection_vector_size_bytes,
read_rate_limit_per_sec,
write_rate_limit_per_sec,
)
.hash(state);
}
Expand Down
2 changes: 2 additions & 0 deletions lib/storage/src/content_manager/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ pub fn strict_mode_from_api(value: api::grpc::qdrant::StrictModeConfig) -> Stric
max_collection_vector_size_bytes: value
.max_collection_vector_size_bytes
.map(|i| i as usize),
read_rate_limit_per_sec: value.write_rate_limit_per_sec.map(|i| i as usize),
write_rate_limit_per_sec: value.write_rate_limit_per_sec.map(|i| i as usize),
}
}

Expand Down
Loading

0 comments on commit 3c19eea

Please sign in to comment.