Skip to content

Commit

Permalink
Strict mode max collection vector size (qdrant#5501)
Browse files Browse the repository at this point in the history
* Strict mode config: Max collection size

* api specs

* Add tests + set/update payload check

* Improve function names and add comments

* rename config to separate vectors and payload

* fix tests

* Adjust configs docs

* add benchmark

* improve performance by caching shard info

* add bench for size_info() and fix tests

* Also limit the batch-size for vector updates (qdrant#5508)

* Also limit the batch-size for vector updates

* clippy

* add lost commit

* Load cache on collection initialization

* add unit type to parameter name

* fix renaming in test

* clearer error message

* fix test

* review remarks

* remove unused function for now

---------

Co-authored-by: Arnaud Gourlay <arnaud.gourlay@gmail.com>
  • Loading branch information
2 people authored and timvisee committed Dec 9, 2024
1 parent 9b8b270 commit 0702854
Show file tree
Hide file tree
Showing 32 changed files with 629 additions and 90 deletions.
1 change: 1 addition & 0 deletions docs/grpc/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -1431,6 +1431,7 @@ Note: 1kB = 1 vector of size 256. |
| search_allow_exact | [bool](#bool) | optional | |
| search_max_oversampling | [float](#float) | optional | |
| upsert_max_batchsize | [uint64](#uint64) | optional | |
| max_collection_vector_size_bytes | [uint64](#uint64) | optional | |



Expand Down
7 changes: 7 additions & 0 deletions docs/redoc/master/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -7291,6 +7291,13 @@
"format": "uint",
"minimum": 0,
"nullable": true
},
"max_collection_vector_size_bytes": {
"description": "Max size of a collections vector storage in bytes",
"type": "integer",
"format": "uint",
"minimum": 0,
"nullable": true
}
}
},
Expand Down
6 changes: 6 additions & 0 deletions lib/api/src/grpc/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1632,6 +1632,9 @@ impl From<StrictModeConfig> for segment::types::StrictModeConfig {
search_allow_exact: value.search_allow_exact,
search_max_oversampling: value.search_max_oversampling.map(f64::from),
upsert_max_batchsize: value.upsert_max_batchsize.map(|i| i as usize),
max_collection_vector_size_bytes: value
.max_collection_vector_size_bytes
.map(|i| i as usize),
}
}
}
Expand All @@ -1648,6 +1651,9 @@ impl From<segment::types::StrictModeConfig> for StrictModeConfig {
search_allow_exact: value.search_allow_exact,
search_max_oversampling: value.search_max_oversampling.map(|i| i as f32),
upsert_max_batchsize: value.upsert_max_batchsize.map(|i| i as u64),
max_collection_vector_size_bytes: value
.max_collection_vector_size_bytes
.map(|i| i as u64),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions lib/api/src/grpc/proto/collections.proto
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ message StrictModeConfig {
optional bool search_allow_exact = 7;
optional float search_max_oversampling = 8;
optional uint64 upsert_max_batchsize = 9;
optional uint64 max_collection_vector_size_bytes = 10;
}

message CreateCollection {
Expand Down
2 changes: 2 additions & 0 deletions lib/api/src/grpc/qdrant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,8 @@ pub struct StrictModeConfig {
pub search_max_oversampling: ::core::option::Option<f32>,
#[prost(uint64, optional, tag = "9")]
pub upsert_max_batchsize: ::core::option::Option<u64>,
#[prost(uint64, optional, tag = "10")]
pub max_collection_vector_size_bytes: ::core::option::Option<u64>,
}
#[derive(validator::Validate)]
#[derive(serde::Serialize)]
Expand Down
14 changes: 14 additions & 0 deletions lib/api/src/rest/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1036,3 +1036,17 @@ impl Validate for PointInsertOperations {
}
}
}

impl PointInsertOperations {
/// Amount of vectors in the operation request.
pub fn len(&self) -> usize {
match self {
PointInsertOperations::PointsBatch(batch) => batch.batch.ids.len(),
PointInsertOperations::PointsList(list) => list.points.len(),
}
}

pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
38 changes: 38 additions & 0 deletions lib/collection/src/collection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use tokio::sync::{Mutex, RwLock, RwLockWriteGuard};
use crate::collection::payload_index_schema::PayloadIndexSchema;
use crate::collection_state::{ShardInfo, State};
use crate::common::is_ready::IsReady;
use crate::common::local_data_stats::{LocalDataStats, LocalDataStatsCache};
use crate::config::CollectionConfigInternal;
use crate::operations::config_diff::{DiffConfig, OptimizersConfigDiff};
use crate::operations::shared_storage_config::SharedStorageConfig;
Expand Down Expand Up @@ -80,6 +81,8 @@ pub struct Collection {
// Search runtime handle.
search_runtime: Handle,
optimizer_cpu_budget: CpuBudget,
// Cached stats over all local shards used in strict mode, may be outdated
local_stats_cache: LocalDataStatsCache,
}

pub type RequestShardTransfer = Arc<dyn Fn(ShardTransfer) + Send + Sync>;
Expand Down Expand Up @@ -150,6 +153,10 @@ impl Collection {

let locked_shard_holder = Arc::new(LockedShardHolder::new(shard_holder));

let local_stats_cache = LocalDataStatsCache::new_with_values(
Self::calculate_segment_stats(&locked_shard_holder).await,
);

// Once the config is persisted - the collection is considered to be successfully created.
CollectionVersion::save(path)?;
collection_config.save(path)?;
Expand All @@ -176,6 +183,7 @@ impl Collection {
update_runtime: update_runtime.unwrap_or_else(Handle::current),
search_runtime: search_runtime.unwrap_or_else(Handle::current),
optimizer_cpu_budget,
local_stats_cache,
})
}

Expand Down Expand Up @@ -263,6 +271,10 @@ impl Collection {

let locked_shard_holder = Arc::new(LockedShardHolder::new(shard_holder));

let local_stats_cache = LocalDataStatsCache::new_with_values(
Self::calculate_segment_stats(&locked_shard_holder).await,
);

Self {
id: collection_id.clone(),
shards_holder: locked_shard_holder,
Expand All @@ -285,6 +297,7 @@ impl Collection {
update_runtime: update_runtime.unwrap_or_else(Handle::current),
search_runtime: search_runtime.unwrap_or_else(Handle::current),
optimizer_cpu_budget,
local_stats_cache,
}
}

Expand Down Expand Up @@ -784,6 +797,31 @@ impl Collection {
pub async fn trigger_optimizers(&self) {
self.shards_holder.read().await.trigger_optimizers().await;
}

async fn calculate_segment_stats(shards_holder: &Arc<RwLock<ShardHolder>>) -> LocalDataStats {
let shard_lock = shards_holder.read().await;
shard_lock.calculate_local_segments_stats().await
}

/// Checks and performs a cache update for local data statistics if needed.
/// Returns `Some(..)` with the new values if a cache update has been performed and `None` otherwise.
async fn check_and_update_local_size_stats(&self) -> Option<LocalDataStats> {
if self.local_stats_cache.check_need_update_and_increment() {
let new_stats = Self::calculate_segment_stats(&self.shards_holder).await;
self.local_stats_cache.update(new_stats);
return Some(new_stats);
}

None
}

/// Returns the estimated local vector storage size for this collection, cached and auto-updated.
pub async fn estimated_local_vector_storage_size(&self) -> usize {
if let Some(shard_stats) = self.check_and_update_local_size_stats().await {
return shard_stats.vector_storage_size;
}
self.local_stats_cache.get_vector_storage()
}
}

struct CollectionVersion;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -944,9 +944,14 @@ impl SegmentEntry for ProxySegment {
SegmentType::Special
}

fn size_info(&self) -> SegmentInfo {
// To reduce code complexity for estimations, we use `.info()` directly here.
self.info()
}

fn info(&self) -> SegmentInfo {
let wrapped_info = self.wrapped_segment.get().read().info();
let write_info = self.write_segment.get().read().info();
let write_info = self.write_segment.get().read().size_info(); // Only fields set by `size_info()` needed!

let vector_name_count =
self.config().vector_data.len() + self.config().sparse_vector_data.len();
Expand Down
65 changes: 65 additions & 0 deletions lib/collection/src/common/local_data_stats.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use std::sync::atomic::{AtomicUsize, Ordering};

/// Amount of requests that have to be done until the cached data gets updated.
const UPDATE_INTERVAL: usize = 32;

/// A cache for `LocalDataStats` utilizing `AtomicUsize` for better performance.
#[derive(Default)]
pub(crate) struct LocalDataStatsCache {
vector_storage_size: AtomicUsize,

request_counter: AtomicUsize,
}

impl LocalDataStatsCache {
pub fn new_with_values(stats: LocalDataStats) -> Self {
let LocalDataStats {
vector_storage_size,
} = stats;
let vector_storage_size = AtomicUsize::new(vector_storage_size);
Self {
vector_storage_size,
request_counter: AtomicUsize::new(1), // Prevent same data getting loaded a second time when doing the first request.
}
}

/// Checks whether the cache needs to be updated.
/// For performance reasons, this also assumes a cached value gets read afterwards and brings the
/// Update counter one tick closer to the next update.
pub fn check_need_update_and_increment(&self) -> bool {
let req_counter = self.request_counter.fetch_add(1, Ordering::Relaxed);
req_counter % UPDATE_INTERVAL == 0
}

/// Sets all cache values to `new_stats`.
pub fn update(&self, new_stats: LocalDataStats) {
let LocalDataStats {
vector_storage_size,
} = new_stats;

self.vector_storage_size
.store(vector_storage_size, Ordering::Relaxed);
}

/// Returns cached vector storage size estimation.
pub fn get_vector_storage(&self) -> usize {
self.vector_storage_size.load(Ordering::Relaxed)
}
}

/// Statistics for local data, like the size of vector storage.
#[derive(Clone, Copy, Default)]
pub struct LocalDataStats {
/// Estimated amount of vector storage size.
pub vector_storage_size: usize,
}

impl LocalDataStats {
pub fn accumulate_from(&mut self, other: &Self) {
let LocalDataStats {
vector_storage_size,
} = other;

self.vector_storage_size += vector_storage_size;
}
}
1 change: 1 addition & 0 deletions lib/collection/src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod eta_calculator;
pub mod fetch_vectors;
pub mod file_utils;
pub mod is_ready;
pub mod local_data_stats;
pub mod retrieve_request_trait;
pub mod sha_256;
pub mod snapshot_stream;
Expand Down
5 changes: 3 additions & 2 deletions lib/collection/src/operations/verification/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@ impl StrictModeVerification for DiscoverRequestInternal {
}

impl StrictModeVerification for DiscoverRequestBatch {
fn check_strict_mode(
async fn check_strict_mode(
&self,
collection: &Collection,
strict_mode_config: &StrictModeConfig,
) -> Result<(), CollectionError> {
for i in self.searches.iter() {
i.discover_request
.check_strict_mode(collection, strict_mode_config)?;
.check_strict_mode(collection, strict_mode_config)
.await?;
}

Ok(())
Expand Down
24 changes: 16 additions & 8 deletions lib/collection/src/operations/verification/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ pub struct VerificationPass {
/// This trait ignores the `enabled` parameter in `StrictModeConfig`.
pub trait StrictModeVerification {
/// Implementing this method allows adding a custom check for request specific values.
fn check_custom(
#[allow(async_fn_in_trait)]
async fn check_custom(
&self,
_collection: &Collection,
_strict_mode_config: &StrictModeConfig,
Expand Down Expand Up @@ -86,13 +87,14 @@ pub trait StrictModeVerification {
}

/// Checks search parameters.
fn check_search_params(
#[allow(async_fn_in_trait)]
async fn check_search_params(
&self,
collection: &Collection,
strict_mode_config: &StrictModeConfig,
) -> Result<(), CollectionError> {
if let Some(search_params) = self.request_search_params() {
search_params.check_strict_mode(collection, strict_mode_config)?;
Box::pin(search_params.check_strict_mode(collection, strict_mode_config)).await?;
}
Ok(())
}
Expand Down Expand Up @@ -140,16 +142,18 @@ pub trait StrictModeVerification {

/// Does the verification of all configured parameters. Only implement this function if you know what
/// you are doing. In most cases implementing `check_custom` is sufficient.
fn check_strict_mode(
#[allow(async_fn_in_trait)]
async fn check_strict_mode(
&self,
collection: &Collection,
strict_mode_config: &StrictModeConfig,
) -> Result<(), CollectionError> {
self.check_custom(collection, strict_mode_config)?;
self.check_custom(collection, strict_mode_config).await?;
self.check_request_query_limit(strict_mode_config)?;
self.check_request_filter(collection, strict_mode_config)?;
self.check_request_exact(strict_mode_config)?;
self.check_search_params(collection, strict_mode_config)?;
self.check_search_params(collection, strict_mode_config)
.await?;
Ok(())
}
}
Expand Down Expand Up @@ -196,7 +200,7 @@ pub(crate) fn check_limit_opt<T: PartialOrd + Display>(
}

impl StrictModeVerification for SearchParams {
fn check_custom(
async fn check_custom(
&self,
_collection: &Collection,
strict_mode_config: &StrictModeConfig,
Expand Down Expand Up @@ -338,6 +342,7 @@ mod test {
let strict_mode_config = collection.strict_mode_config().await.unwrap();
let error = request
.check_strict_mode(collection, &strict_mode_config)
.await
.expect_err("Expected strict mode error but got Ok() value");
if !matches!(error, CollectionError::StrictMode { .. }) {
panic!("Expected strict mode error but got {error:#}");
Expand All @@ -349,7 +354,9 @@ mod test {
collection: &Collection,
) {
let strict_mode_config = collection.strict_mode_config().await.unwrap();
let res = request.check_strict_mode(collection, &strict_mode_config);
let res = request
.check_strict_mode(collection, &strict_mode_config)
.await;
if let Err(CollectionError::StrictMode { description }) = res {
panic!("Strict mode check should've passed but failed with error: {description:?}");
} else if res.is_err() {
Expand Down Expand Up @@ -401,6 +408,7 @@ mod test {
search_allow_exact: Some(false),
search_max_oversampling: Some(0.2),
upsert_max_batchsize: None,
max_collection_vector_size_bytes: None,
};

fixture_collection(&strict_mode_config).await
Expand Down
10 changes: 6 additions & 4 deletions lib/collection/src/operations/verification/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ use crate::operations::universal_query::collection_query::{
};

impl StrictModeVerification for QueryRequestInternal {
fn check_custom(
async fn check_custom(
&self,
collection: &Collection,
strict_mode_config: &StrictModeConfig,
) -> Result<(), crate::operations::types::CollectionError> {
if let Some(prefetch) = &self.prefetch {
for prefetch in prefetch {
prefetch.check_strict_mode(collection, strict_mode_config)?;
prefetch
.check_strict_mode(collection, strict_mode_config)
.await?;
}
}

Expand Down Expand Up @@ -44,15 +46,15 @@ impl StrictModeVerification for QueryRequestInternal {
}

impl StrictModeVerification for Prefetch {
fn check_custom(
async fn check_custom(
&self,
collection: &Collection,
strict_mode_config: &StrictModeConfig,
) -> Result<(), crate::operations::types::CollectionError> {
// Prefetch.prefetch is of type Prefetch (recursive type)
if let Some(prefetch) = &self.prefetch {
for prefetch in prefetch {
prefetch.check_strict_mode(collection, strict_mode_config)?;
Box::pin(prefetch.check_strict_mode(collection, strict_mode_config)).await?;
}
}

Expand Down
Loading

0 comments on commit 0702854

Please sign in to comment.