diff --git a/docs/grpc/docs.md b/docs/grpc/docs.md index e325aff7edd..3ead93a7390 100644 --- a/docs/grpc/docs.md +++ b/docs/grpc/docs.md @@ -12,6 +12,7 @@ - [CollectionInfo.PayloadSchemaEntry](#qdrant-CollectionInfo-PayloadSchemaEntry) - [CollectionOperationResponse](#qdrant-CollectionOperationResponse) - [CollectionParams](#qdrant-CollectionParams) + - [CollectionParamsDiff](#qdrant-CollectionParamsDiff) - [CreateAlias](#qdrant-CreateAlias) - [CreateCollection](#qdrant-CreateCollection) - [DeleteAlias](#qdrant-DeleteAlias) @@ -273,6 +274,22 @@ | shard_number | [uint32](#uint32) | | Number of shards in collection | | on_disk_payload | [bool](#bool) | | If true - point's payload will not be stored in memory | | vectors_config | [VectorsConfig](#qdrant-VectorsConfig) | optional | Configuration for vectors | +| replication_factor | [uint32](#uint32) | | Number of replicas of each shard that network tries to maintain | + + + + + + + + +### CollectionParamsDiff + + + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| replication_factor | [uint32](#uint32) | optional | Number of replicas of each shard that network tries to maintain | @@ -311,6 +328,7 @@ | on_disk_payload | [bool](#bool) | optional | If true - point's payload will not be stored in memory | | timeout | [uint64](#uint64) | optional | Wait timeout for operation commit in seconds, if not specified - default value will be supplied | | vectors_config | [VectorsConfig](#qdrant-VectorsConfig) | optional | Configuration for vectors | +| replication_factor | [uint32](#uint32) | optional | Number of replicas of each shard that network tries to maintain, default = 1 | @@ -543,6 +561,7 @@ If indexation speed have more priority for your - make this parameter lower. If | collection_name | [string](#string) | | Name of the collection | | optimizers_config | [OptimizersConfigDiff](#qdrant-OptimizersConfigDiff) | optional | New configuration parameters for the collection | | timeout | [uint64](#uint64) | optional | Wait timeout for operation commit in seconds, if not specified - default value will be supplied | +| params | [CollectionParamsDiff](#qdrant-CollectionParamsDiff) | optional | New configuration parameters for the collection | diff --git a/docs/redoc/master/openapi.json b/docs/redoc/master/openapi.json index b69750489a5..6e609411f05 100644 --- a/docs/redoc/master/openapi.json +++ b/docs/redoc/master/openapi.json @@ -2675,6 +2675,13 @@ "format": "uint32", "minimum": 1 }, + "replication_factor": { + "description": "Number of replicas for each shard", + "default": 1, + "type": "integer", + "format": "uint32", + "minimum": 1 + }, "on_disk_payload": { "description": "If true - point's payload will not be stored in memory. It will be read from the disk every time it is requested. This setting saves RAM by (slightly) increasing the response time. Note: those payload values that are involved in filtering and are indexed - remain in RAM.", "default": false, @@ -3833,6 +3840,14 @@ "minimum": 0, "nullable": true }, + "replication_factor": { + "description": "Number of shards replicas. Default is 1 Minimum is 1", + "default": null, + "type": "integer", + "format": "uint32", + "minimum": 0, + "nullable": true + }, "on_disk_payload": { "description": "If true - point's payload will not be stored in memory. It will be read from the disk every time it is requested. This setting saves RAM by (slightly) increasing the response time. Note: those payload values that are involved in filtering and are indexed - remain in RAM.", "default": null, @@ -3993,6 +4008,29 @@ "nullable": true } ] + }, + "params": { + "description": "Collection base params. If none - values from service configuration file are used.", + "anyOf": [ + { + "$ref": "#/components/schemas/CollectionParamsDiff" + }, + { + "nullable": true + } + ] + } + } + }, + "CollectionParamsDiff": { + "type": "object", + "properties": { + "replication_factor": { + "description": "Number of replicas for each shard", + "type": "integer", + "format": "uint32", + "minimum": 1, + "nullable": true } } }, diff --git a/lib/api/src/grpc/proto/collections.proto b/lib/api/src/grpc/proto/collections.proto index 405aa3cc6be..da5cfcdd9c1 100644 --- a/lib/api/src/grpc/proto/collections.proto +++ b/lib/api/src/grpc/proto/collections.proto @@ -157,12 +157,14 @@ message CreateCollection { optional bool on_disk_payload = 8; // If true - point's payload will not be stored in memory optional uint64 timeout = 9; // Wait timeout for operation commit in seconds, if not specified - default value will be supplied optional VectorsConfig vectors_config = 10; // Configuration for vectors + optional uint32 replication_factor = 11; // Number of replicas of each shard that network tries to maintain, default = 1 } message UpdateCollection { string collection_name = 1; // Name of the collection optional OptimizersConfigDiff optimizers_config = 2; // New configuration parameters for the collection optional uint64 timeout = 3; // Wait timeout for operation commit in seconds, if not specified - default value will be supplied + optional CollectionParamsDiff params = 4; // New configuration parameters for the collection } message DeleteCollection { @@ -181,6 +183,11 @@ message CollectionParams { uint32 shard_number = 3; // Number of shards in collection bool on_disk_payload = 4; // If true - point's payload will not be stored in memory optional VectorsConfig vectors_config = 5; // Configuration for vectors + uint32 replication_factor = 6; // Number of replicas of each shard that network tries to maintain +} + +message CollectionParamsDiff { + optional uint32 replication_factor = 1; // Number of replicas of each shard that network tries to maintain } message CollectionConfig { diff --git a/lib/api/src/grpc/qdrant.rs b/lib/api/src/grpc/qdrant.rs index c94eded4645..0a8bac9fd1d 100644 --- a/lib/api/src/grpc/qdrant.rs +++ b/lib/api/src/grpc/qdrant.rs @@ -175,6 +175,9 @@ pub struct CreateCollection { /// Configuration for vectors #[prost(message, optional, tag="10")] pub vectors_config: ::core::option::Option, + /// Number of replicas of each shard that network tries to maintain, default = 1 + #[prost(uint32, optional, tag="11")] + pub replication_factor: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct UpdateCollection { @@ -187,6 +190,9 @@ pub struct UpdateCollection { /// Wait timeout for operation commit in seconds, if not specified - default value will be supplied #[prost(uint64, optional, tag="3")] pub timeout: ::core::option::Option, + /// New configuration parameters for the collection + #[prost(message, optional, tag="4")] + pub params: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct DeleteCollection { @@ -217,6 +223,15 @@ pub struct CollectionParams { /// Configuration for vectors #[prost(message, optional, tag="5")] pub vectors_config: ::core::option::Option, + /// Number of replicas of each shard that network tries to maintain + #[prost(uint32, tag="6")] + pub replication_factor: u32, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct CollectionParamsDiff { + /// Number of replicas of each shard that network tries to maintain + #[prost(uint32, optional, tag="1")] + pub replication_factor: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct CollectionConfig { diff --git a/lib/collection/src/collection.rs b/lib/collection/src/collection.rs index 8ad6408a7ea..e54c7b84919 100644 --- a/lib/collection/src/collection.rs +++ b/lib/collection/src/collection.rs @@ -1,7 +1,7 @@ use std::cmp::max; use std::collections::{HashMap, HashSet}; use std::future::Future; -use std::num::NonZeroU32; +use std::ops::Deref; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -35,11 +35,11 @@ use crate::operations::types::{ }; use crate::operations::{CollectionUpdateOperations, Validate}; use crate::optimizers_builder::OptimizersConfig; -use crate::shard::collection_shard_distribution::CollectionShardDistribution; +use crate::shard::collection_shard_distribution::{self, CollectionShardDistribution}; use crate::shard::local_shard::LocalShard; use crate::shard::remote_shard::RemoteShard; use crate::shard::replica_set::ReplicaSet; -use crate::shard::shard_config::{ShardConfig, ShardType}; +use crate::shard::shard_config::{self, ShardConfig}; use crate::shard::shard_holder::{LockedShardHolder, ShardHolder}; use crate::shard::shard_versioning::versioned_shard_path; use crate::shard::transfer::shard_transfer::{ @@ -80,8 +80,10 @@ impl Collection { self.id.clone() } + #[allow(clippy::too_many_arguments)] pub async fn new( id: CollectionId, + this_peer_id: PeerId, path: &Path, snapshots_path: &Path, config: &CollectionConfig, @@ -97,38 +99,53 @@ impl Collection { let mut shard_holder = ShardHolder::new(path, HashRing::fair(HASH_RING_SHARD_SCALE))?; let shared_config = Arc::new(RwLock::new(config.clone())); - for shard_id in shard_distribution.local { - let shard_path = create_shard_dir(path, shard_id).await; - let shard = match shard_path { - Ok(shard_path) => { - LocalShard::build(shard_id, id.clone(), &shard_path, shared_config.clone()) - .await + for (shard_id, shard_type) in shard_distribution.shards { + let shard = match shard_type { + collection_shard_distribution::ShardType::Local => { + let shard_path = create_shard_dir(path, shard_id).await; + let shard = match shard_path { + Ok(shard_path) => { + LocalShard::build( + shard_id, + id.clone(), + &shard_path, + shared_config.clone(), + ) + .await + } + Err(e) => Err(e), + }; + shard.map(Shard::Local) } - Err(e) => Err(e), - }; - - let shard = match shard { - Ok(shard) => shard, - Err(err) => { - shard_holder.before_drop().await; - return Err(err); + collection_shard_distribution::ShardType::Remote(peer_id) => { + create_shard_dir(path, shard_id) + .await + .and_then(|shard_path| { + RemoteShard::init( + shard_id, + id.clone(), + peer_id, + shard_path, + channel_service.clone(), + ) + }) + .map(Shard::Remote) } - }; - shard_holder.add_shard(shard_id, Shard::Local(shard)); - } - - for (shard_id, peer_id) in shard_distribution.remote { - let shard = create_shard_dir(path, shard_id) - .await - .and_then(|shard_path| { - RemoteShard::init( + collection_shard_distribution::ShardType::ReplicaSet { local, remote } => { + ReplicaSet::build( shard_id, id.clone(), - peer_id, - shard_path, - channel_service.clone(), + this_peer_id, + local, + remote, + on_replica_failure.clone(), + path, + shared_config.clone(), ) - }); + .await + .map(Shard::ReplicaSet) + } + }; let shard = match shard { Ok(shard) => shard, Err(err) => { @@ -136,22 +153,7 @@ impl Collection { return Err(err); } }; - shard_holder.add_shard(shard_id, Shard::Remote(shard)); - } - // This is a stub to check that types and hidden lifetimes fit - // It might be worth leaving it here for now until we have an actual impl - // TODO: Create ReplicaSet shards - if false { - let shard = ReplicaSet::new( - 1, - 1, - Default::default(), - Default::default(), - Default::default(), - 0.0, - on_replica_failure, - ); - shard_holder.add_shard(0, Shard::ReplicaSet(shard)) + shard_holder.add_shard(shard_id, shard); } let locked_shard_holder = Arc::new(LockedShardHolder::new(shard_holder)); @@ -1037,24 +1039,22 @@ impl Collection { Ok(points) } + #[allow(unreachable_code, clippy::diverging_sub_expression)] pub async fn update_params_from_diff( &self, params_diff: CollectionParamsDiff, ) -> CollectionResult<()> { let mut config = self.config.write().await; - let old_repl_factor = config.params.replication_factor; config.params = params_diff.update(&config.params)?; - self.handle_repl_factor_change(old_repl_factor, config.params.replication_factor); + self.handle_replica_changes(todo!("supply replica changes")); Ok(()) } - pub fn handle_repl_factor_change(&self, old: NonZeroU32, new: NonZeroU32) { - if old != new { - // TODO: remove or add replicas. In case of replica addition: - // 1. Create and mark them as inactive - // 2. Copy data - // 3. Mark them as active - } + pub fn handle_replica_changes(&self, _replica_changes: HashSet) { + // TODO: remove or add replicas. In case of replica addition: + // 1. Create and mark them as inactive + // 2. Copy data + // 3. Mark them as active } /// Updates shard optimization params: @@ -1240,7 +1240,7 @@ impl Collection { .map(|(shard_id, shard)| { let shard_info = match shard { Shard::ReplicaSet(replicas) => ShardInfo::ReplicaSet { - replicas: replicas.replica_state.clone(), + replicas: replicas.replica_state.deref().clone(), }, shard => ShardInfo::Single( *shard @@ -1370,9 +1370,11 @@ impl Collection { let shard_config_opt = ShardConfig::load(&shard_path)?; if let Some(shard_config) = shard_config_opt { match shard_config.r#type { - ShardType::Local => LocalShard::restore_snapshot(&shard_path)?, - ShardType::Remote { .. } => RemoteShard::restore_snapshot(&shard_path), - ShardType::Temporary => {} + shard_config::ShardType::Local => LocalShard::restore_snapshot(&shard_path)?, + shard_config::ShardType::Remote { .. } => { + RemoteShard::restore_snapshot(&shard_path) + } + shard_config::ShardType::Temporary => {} } } else { return Err(CollectionError::service_error(format!( diff --git a/lib/collection/src/collection_state.rs b/lib/collection/src/collection_state.rs index 614b0ada49a..2c0d46e8478 100644 --- a/lib/collection/src/collection_state.rs +++ b/lib/collection/src/collection_state.rs @@ -64,6 +64,7 @@ impl State { Ok(()) } + #[allow(unreachable_code, clippy::diverging_sub_expression)] async fn apply_config( new_config: CollectionConfig, collection: &Collection, @@ -74,9 +75,8 @@ impl State { .await?; // updating replication factor let mut config = collection.config.write().await; - let old_repl_factor = config.params.replication_factor; config.params.replication_factor = new_config.params.replication_factor; - collection.handle_repl_factor_change(old_repl_factor, config.params.replication_factor); + collection.handle_replica_changes(todo!("Calculate and add changes")); Ok(()) } diff --git a/lib/collection/src/config.rs b/lib/collection/src/config.rs index 892d02c87c2..e0e610cb985 100644 --- a/lib/collection/src/config.rs +++ b/lib/collection/src/config.rs @@ -53,8 +53,6 @@ pub struct CollectionParams { #[serde(default = "default_shard_number")] pub shard_number: NonZeroU32, /// Number of replicas for each shard - // TODO: do not skip in v1.0 (when replication ships) - #[serde(skip)] #[serde(default = "default_replication_factor")] pub replication_factor: NonZeroU32, /// If true - point's payload will not be stored in memory. diff --git a/lib/collection/src/operations/conversions.rs b/lib/collection/src/operations/conversions.rs index a8ca9a1950a..7553c8923e6 100644 --- a/lib/collection/src/operations/conversions.rs +++ b/lib/collection/src/operations/conversions.rs @@ -7,10 +7,8 @@ use segment::data_types::vectors::{NamedVector, VectorStruct, DEFAULT_VECTOR_NAM use segment::types::Distance; use tonic::Status; -use crate::config::{ - default_replication_factor, CollectionConfig, CollectionParams, VectorParams, VectorsConfig, - WalConfig, -}; +use super::config_diff::CollectionParamsDiff; +use crate::config::{CollectionConfig, CollectionParams, VectorParams, VectorsConfig, WalConfig}; use crate::operations::config_diff::{HnswConfigDiff, OptimizersConfigDiff, WalConfigDiff}; use crate::operations::point_ops::PointsSelector::PointIdsSelector; use crate::operations::point_ops::{ @@ -42,6 +40,22 @@ impl From for WalConfigDiff { } } +impl TryFrom for CollectionParamsDiff { + type Error = Status; + + fn try_from(value: api::grpc::qdrant::CollectionParamsDiff) -> Result { + Ok(Self { + replication_factor: value + .replication_factor + .map(|factor| { + NonZeroU32::new(factor) + .ok_or_else(|| Status::invalid_argument("`replication_factor` cannot be 0")) + }) + .transpose()?, + }) + } +} + impl From for OptimizersConfigDiff { fn from(value: api::grpc::qdrant::OptimizersConfigDiff) -> Self { Self { @@ -115,6 +129,7 @@ impl From for api::grpc::qdrant::CollectionInfo { Some(api::grpc::qdrant::VectorsConfig { config }) }, shard_number: config.params.shard_number.get(), + replication_factor: config.params.replication_factor.get(), on_disk_payload: config.params.on_disk_payload, }), hnsw_config: Some(api::grpc::qdrant::HnswConfigDiff { @@ -243,43 +258,40 @@ impl TryFrom for CollectionConfig { Ok(Self { params: match config.params { None => return Err(Status::invalid_argument("Malformed CollectionParams type")), - Some(params) => { - CollectionParams { - vectors: match params.vectors_config { + Some(params) => CollectionParams { + vectors: match params.vectors_config { + None => { + return Err(Status::invalid_argument( + "Expected `vectors` - configuration for vector storage", + )) + } + Some(vector_config) => match vector_config.config { None => { return Err(Status::invalid_argument( "Expected `vectors` - configuration for vector storage", )) } - Some(vector_config) => match vector_config.config { - None => { - return Err(Status::invalid_argument( - "Expected `vectors` - configuration for vector storage", - )) - } - Some(api::grpc::qdrant::vectors_config::Config::Params(params)) => { - VectorsConfig::Single(params.try_into()?) - } - Some(api::grpc::qdrant::vectors_config::Config::ParamsMap( - params_map, - )) => VectorsConfig::Multi( - params_map - .map - .into_iter() - .map(|(k, v)| Ok((k, v.try_into()?))) - .collect::, Status>>( - )?, - ), - }, + Some(api::grpc::qdrant::vectors_config::Config::Params(params)) => { + VectorsConfig::Single(params.try_into()?) + } + Some(api::grpc::qdrant::vectors_config::Config::ParamsMap( + params_map, + )) => VectorsConfig::Multi( + params_map + .map + .into_iter() + .map(|(k, v)| Ok((k, v.try_into()?))) + .collect::, Status>>()?, + ), }, - shard_number: NonZeroU32::new(params.shard_number).ok_or_else(|| { - Status::invalid_argument("`shard_number` cannot be zero") - })?, - on_disk_payload: params.on_disk_payload, - // TODO: use `repliction_factor` from `config` - replication_factor: default_replication_factor(), - } - } + }, + shard_number: NonZeroU32::new(params.shard_number) + .ok_or_else(|| Status::invalid_argument("`shard_number` cannot be zero"))?, + on_disk_payload: params.on_disk_payload, + replication_factor: NonZeroU32::new(params.replication_factor).ok_or_else( + || Status::invalid_argument("`replication_factor` cannot be zero"), + )?, + }, }, hnsw_config: match config.hnsw_config { None => return Err(Status::invalid_argument("Malformed HnswConfig type")), diff --git a/lib/collection/src/save_on_disk.rs b/lib/collection/src/save_on_disk.rs index 63b741cd28f..e2bb97b2428 100644 --- a/lib/collection/src/save_on_disk.rs +++ b/lib/collection/src/save_on_disk.rs @@ -1,6 +1,6 @@ use std::fs::File; use std::io::BufWriter; -use std::ops::Deref; +use std::ops::{Deref, DerefMut}; use std::path::PathBuf; use atomicwrites::OverwriteBehavior::AllowOverwrite; @@ -56,7 +56,7 @@ impl Deserialize<'de>> SaveOnDisk { Ok(output) } - fn save(&self) -> Result<(), Error> { + pub fn save(&self) -> Result<(), Error> { AtomicFile::new(&self.path, AllowOverwrite).write(|file| { let writer = BufWriter::new(file); serde_json::to_writer(writer, &self.data) @@ -73,6 +73,12 @@ impl Deref for SaveOnDisk { } } +impl DerefMut for SaveOnDisk { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.data + } +} + #[cfg(test)] mod tests { use std::fs; diff --git a/lib/collection/src/shard/collection_shard_distribution.rs b/lib/collection/src/shard/collection_shard_distribution.rs index d2dee49184e..1df829f4902 100644 --- a/lib/collection/src/shard/collection_shard_distribution.rs +++ b/lib/collection/src/shard/collection_shard_distribution.rs @@ -1,55 +1,69 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; +use crate::collection_state::ShardInfo; use crate::shard::{PeerId, ShardId}; +#[derive(Debug)] +pub enum ShardType { + Local, + Remote(PeerId), + ReplicaSet { + local: bool, + remote: HashSet, + }, +} + #[derive(Debug)] pub struct CollectionShardDistribution { - pub local: Vec, - pub remote: Vec<(ShardId, PeerId)>, + pub shards: HashMap, } impl CollectionShardDistribution { - pub fn new(local: Vec, remote: Vec<(ShardId, PeerId)>) -> Self { - Self { local, remote } - } - pub fn all_local(shard_number: Option) -> Self { Self { // This method is called only when distributed deployment is disabled // so if not specified it will suggest 1 shard per collection for better performance. - local: (0..shard_number.unwrap_or(1)).collect(), - remote: vec![], + shards: (0..shard_number.unwrap_or(1)) + .map(|shard_id| (shard_id, ShardType::Local)) + .collect(), } } - pub fn from_shard_to_peer(this_peer: PeerId, shard_to_peer: &HashMap) -> Self { - let local = shard_to_peer - .iter() - .filter_map(|(shard, peer)| { - if peer == &this_peer { - Some(*shard) - } else { - None - } - }) - .collect(); - - let remote = shard_to_peer - .iter() - .filter_map(|(&shard, &peer)| { - if peer != this_peer { - Some((shard, peer)) - } else { - None - } - }) - .clone() - .collect(); - - Self { local, remote } + pub fn from_shards_info(this_peer: PeerId, shards_info: HashMap) -> Self { + Self { + shards: shards_info + .into_iter() + .map(|(shard_id, info)| match info { + ShardInfo::ReplicaSet { replicas } => ( + shard_id, + ShardType::ReplicaSet { + local: replicas.contains_key(&this_peer), + remote: replicas.into_keys().filter(|id| id != &this_peer).collect(), + }, + ), + ShardInfo::Single(peer_id) => { + if peer_id == this_peer { + (shard_id, ShardType::Local) + } else { + (shard_id, ShardType::Remote(peer_id)) + } + } + }) + .collect(), + } } pub fn shard_count(&self) -> usize { - self.local.len() + self.remote.len() + self.shards.len() + } + + pub fn shard_replica_count(&self) -> usize { + self.shards + .values() + .map(|shard| match shard { + ShardType::ReplicaSet { local, remote } => *local as usize + remote.len(), + _ => 1, + }) + .sum() } } diff --git a/lib/collection/src/shard/replica_set.rs b/lib/collection/src/shard/replica_set.rs index bc8144c9023..4c484a80825 100644 --- a/lib/collection/src/shard/replica_set.rs +++ b/lib/collection/src/shard/replica_set.rs @@ -1,6 +1,8 @@ -use std::collections::HashMap; +use std::cmp; +use std::collections::{HashMap, HashSet}; use std::future::Future; use std::ops::Deref; +use std::path::Path; use std::sync::Arc; use futures::future::{try_join, try_join_all}; @@ -10,53 +12,95 @@ use segment::types::{ ExtendedPointId, Filter, ScoredPoint, WithPayload, WithPayloadInterface, WithVector, }; use tokio::runtime::Handle; +use tokio::sync::RwLock; use super::local_shard::{drop_and_delete_from_disk, LocalShard}; use super::remote_shard::RemoteShard; -use super::{PeerId, ShardId, ShardOperation}; +use super::{create_shard_dir, CollectionId, PeerId, ShardId, ShardOperation}; +use crate::config::CollectionConfig; use crate::operations::types::{ CollectionError, CollectionInfo, CollectionResult, CountRequest, CountResult, PointRequest, Record, SearchRequestBatch, UpdateResult, }; use crate::operations::CollectionUpdateOperations; +use crate::save_on_disk::SaveOnDisk; pub type IsActive = bool; pub type OnPeerFailure = - Box Box + Send> + Send + Sync>; + Arc Box + Send> + Send + Sync>; + +const READ_REMOTE_REPLICAS: u32 = 2; + +const REPLICA_STATE_FILE: &str = "replica_state"; + +/// Represents a change in replica set, due to scaling of `replication_factor` +pub enum Change { + Add(ShardId, PeerId), + Remove(ShardId, PeerId), +} /// A set of shard replicas. /// Handles operations so that the state is consistent across all the replicas of the shard. /// Prefers local shard for read-only operations. /// Perform updates on all replicas and report error if there is at least one failure. +/// +/// `ReplicaSet` should always have >= 2 replicas. +/// If a user decreases replication factor to 1 - it should be converted to just `Local` or `Remote` shard. pub struct ReplicaSet { shard_id: ShardId, this_peer_id: PeerId, local: Option, remotes: Vec, - pub(crate) replica_state: HashMap, - read_fan_out_ratio: f32, + pub(crate) replica_state: SaveOnDisk>, + /// Number of remote replicas to send read requests to. + /// If actual number of peers is less than this, then read request will be sent to all of them. + read_remote_replicas: u32, notify_peer_failure_cb: OnPeerFailure, } impl ReplicaSet { - pub fn new( + #[allow(clippy::too_many_arguments)] + pub async fn build( shard_id: ShardId, + collection_id: CollectionId, this_peer_id: PeerId, - local: Option, - remotes: Vec, - replica_state: HashMap, - read_fan_out_ratio: f32, + local: bool, + remotes: HashSet, on_peer_failure: OnPeerFailure, - ) -> Self { - Self { + collection_path: &Path, + shared_config: Arc>, + ) -> CollectionResult { + let shard_path = create_shard_dir(collection_path, shard_id).await?; + let local = if local { + let shard = + LocalShard::build(shard_id, collection_id, &shard_path, shared_config.clone()) + .await?; + Some(shard) + } else { + None + }; + let mut replica_state: SaveOnDisk> = + SaveOnDisk::load_or_init(shard_path.join(REPLICA_STATE_FILE))?; + replica_state.write(|rs| { + if local.is_some() { + rs.insert(this_peer_id, true); + } + for peer in remotes { + rs.insert(peer, true); + } + })?; + Ok(Self { shard_id, this_peer_id, local, - remotes, + // TODO: Initialize remote shards + // This requires logic to store several peer ids in remote shard file + remotes: Vec::new(), replica_state, - read_fan_out_ratio, + // TODO: move to collection config + read_remote_replicas: READ_REMOTE_REPLICAS, notify_peer_failure_cb: on_peer_failure, - } + }) } pub async fn notify_peer_failure(&self, peer_id: PeerId) { Box::into_pin(self.notify_peer_failure_cb.deref()(peer_id, self.shard_id)).await @@ -67,12 +111,13 @@ impl ReplicaSet { } pub fn set_active(&mut self, peer_id: &PeerId, active: bool) -> CollectionResult<()> { - *self - .replica_state - .get_mut(peer_id) - .ok_or_else(|| CollectionError::NotFound { - what: format!("Shard {} replica on peer {peer_id}", self.shard_id), - })? = active; + self.replica_state.write_with_res(|rs| { + *rs.get_mut(peer_id) + .ok_or_else(|| CollectionError::NotFound { + what: format!("Shard {} replica on peer {peer_id}", self.shard_id), + })? = active; + Ok::<(), CollectionError>(()) + })?; Ok(()) } @@ -110,6 +155,7 @@ impl ReplicaSet { todo!("Add remote replica") } } + self.replica_state.save()?; Ok(()) } @@ -152,8 +198,10 @@ impl ReplicaSet { ))); } - let fan_out_selection = - (self.read_fan_out_ratio * active_remote_shards.len() as f32).ceil() as usize; + let fan_out_selection = cmp::min( + active_remote_shards.len(), + self.read_remote_replicas as usize, + ); let mut futures = FuturesUnordered::new(); for remote in &active_remote_shards[0..fan_out_selection] { diff --git a/lib/collection/src/tests/snapshot_test.rs b/lib/collection/src/tests/snapshot_test.rs index fec5d66fe51..94dbcdc6103 100644 --- a/lib/collection/src/tests/snapshot_test.rs +++ b/lib/collection/src/tests/snapshot_test.rs @@ -1,4 +1,6 @@ +use std::collections::HashMap; use std::num::{NonZeroU32, NonZeroU64}; +use std::sync::Arc; use segment::types::Distance; use tempfile::Builder; @@ -6,7 +8,7 @@ use tempfile::Builder; use crate::collection::Collection; use crate::config::{CollectionConfig, CollectionParams, VectorParams, VectorsConfig, WalConfig}; use crate::optimizers_builder::OptimizersConfig; -use crate::shard::collection_shard_distribution::CollectionShardDistribution; +use crate::shard::collection_shard_distribution::{self, CollectionShardDistribution}; use crate::shard::replica_set::OnPeerFailure; use crate::shard::{ChannelService, Shard}; @@ -22,7 +24,7 @@ const TEST_OPTIMIZERS_CONFIG: OptimizersConfig = OptimizersConfig { }; pub fn dummy_on_replica_failure() -> OnPeerFailure { - Box::new(move |_peer_id, _shard_id| Box::new(async {})) + Arc::new(move |_peer_id, _shard_id| Box::new(async {})) } #[tokio::test] @@ -57,13 +59,18 @@ async fn test_snapshot_collection() { .unwrap(); let collection_name = "test".to_string(); let collection_name_rec = "test_rec".to_string(); + let mut shards = HashMap::new(); + shards.insert(0, collection_shard_distribution::ShardType::Local); + shards.insert(1, collection_shard_distribution::ShardType::Local); + shards.insert(2, collection_shard_distribution::ShardType::Remote(10_000)); let mut collection = Collection::new( collection_name, + 1, collection_dir.path(), snapshots_path.path(), &config, - CollectionShardDistribution::new(vec![0, 1], vec![(2, 10000)]), + CollectionShardDistribution { shards }, ChannelService::default(), dummy_on_replica_failure(), ) diff --git a/lib/collection/tests/common/mod.rs b/lib/collection/tests/common/mod.rs index 1fc9e840ad5..3211262ba53 100644 --- a/lib/collection/tests/common/mod.rs +++ b/lib/collection/tests/common/mod.rs @@ -2,6 +2,7 @@ use std::num::{NonZeroU32, NonZeroU64}; use std::path::Path; +use std::sync::Arc; use collection::collection::Collection; use collection::config::{CollectionConfig, CollectionParams, VectorParams, WalConfig}; @@ -68,7 +69,7 @@ pub async fn simple_collection_fixture(collection_path: &Path, shard_number: u32 } pub fn dummy_on_replica_failure() -> OnPeerFailure { - Box::new(move |_peer_id, _shard_id| Box::new(async {})) + Arc::new(move |_peer_id, _shard_id| Box::new(async {})) } /// Default to a collection with all the shards local @@ -80,6 +81,7 @@ pub async fn new_local_collection( ) -> Result { Collection::new( id, + 0, path, snapshots_path, config, diff --git a/lib/storage/src/content_manager/collection_meta_ops.rs b/lib/storage/src/content_manager/collection_meta_ops.rs index 5e37c8a1422..0dae3ca1b8f 100644 --- a/lib/storage/src/content_manager/collection_meta_ops.rs +++ b/lib/storage/src/content_manager/collection_meta_ops.rs @@ -1,5 +1,7 @@ use collection::config::VectorsConfig; -use collection::operations::config_diff::{HnswConfigDiff, OptimizersConfigDiff, WalConfigDiff}; +use collection::operations::config_diff::{ + CollectionParamsDiff, HnswConfigDiff, OptimizersConfigDiff, WalConfigDiff, +}; use collection::shard::{CollectionId, PeerId, ShardId, ShardTransfer}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -92,6 +94,11 @@ pub struct CreateCollection { /// Minimum is 1 #[serde(default)] pub shard_number: Option, + /// Number of shards replicas. + /// Default is 1 + /// Minimum is 1 + #[serde(default)] + pub replication_factor: Option, /// If true - point's payload will not be stored in memory. /// It will be read from the disk every time it is requested. /// This setting saves RAM by (slightly) increasing the response time. @@ -107,7 +114,7 @@ pub struct CreateCollection { } /// Operation for creating new collection and (optionally) specify index params -#[derive(Debug, Deserialize, Serialize, JsonSchema, PartialEq, Eq, Hash, Clone)] +#[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Hash, Clone)] #[serde(rename_all = "snake_case")] pub struct CreateCollectionOperation { pub collection_name: String, @@ -122,10 +129,12 @@ pub struct UpdateCollection { /// Custom params for Optimizers. If none - values from service configuration file are used. /// This operation is blocking, it will only proceed ones all current optimizations are complete pub optimizers_config: Option, // ToDo: Allow updates for other configuration params as well + /// Collection base params. If none - values from service configuration file are used. + pub params: Option, } /// Operation for updating parameters of the existing collection -#[derive(Debug, Deserialize, Serialize, JsonSchema, PartialEq, Eq, Hash, Clone)] +#[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Hash, Clone)] #[serde(rename_all = "snake_case")] pub struct UpdateCollectionOperation { pub collection_name: String, @@ -143,11 +152,11 @@ pub struct ChangeAliasesOperation { } /// Operation for deleting collection with given name -#[derive(Debug, Deserialize, Serialize, JsonSchema, PartialEq, Eq, Hash, Clone)] +#[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Hash, Clone)] #[serde(rename_all = "snake_case")] pub struct DeleteCollectionOperation(pub String); -#[derive(Debug, Deserialize, Serialize, JsonSchema, PartialEq, Eq, Hash, Clone)] +#[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Hash, Clone)] pub enum ShardTransferOperations { Start(ShardTransfer), Finish(ShardTransfer), @@ -158,7 +167,7 @@ pub enum ShardTransferOperations { } /// Sets the state of shard replica -#[derive(Debug, Deserialize, Serialize, JsonSchema, PartialEq, Eq, Hash, Clone)] +#[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Hash, Clone)] pub struct SetShardReplicaState { pub collection_name: String, pub shard_id: ShardId, @@ -168,7 +177,7 @@ pub struct SetShardReplicaState { } /// Enumeration of all possible collection update operations -#[derive(Debug, Deserialize, Serialize, JsonSchema, PartialEq, Eq, Hash, Clone)] +#[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Hash, Clone)] #[serde(rename_all = "snake_case")] pub enum CollectionMetaOperations { CreateCollection(CreateCollectionOperation), diff --git a/lib/storage/src/content_manager/consensus_state.rs b/lib/storage/src/content_manager/consensus_state.rs index 181131e73b0..6876b02d4c9 100644 --- a/lib/storage/src/content_manager/consensus_state.rs +++ b/lib/storage/src/content_manager/consensus_state.rs @@ -214,7 +214,10 @@ impl ConsensusState { } } else if entry.get_context().is_empty() { // Allow empty context for compatibility - // TODO: remove in the next version after 0.10 + log::warn!( + "Outdated peer addition entry found with index: {}", + entry.get_index() + ) } else { // Should not be reachable as it is checked in API return Err(StorageError::ServiceError { diff --git a/lib/storage/src/content_manager/conversions.rs b/lib/storage/src/content_manager/conversions.rs index 280ce5dce9a..278e9df9256 100644 --- a/lib/storage/src/content_manager/conversions.rs +++ b/lib/storage/src/content_manager/conversions.rs @@ -53,6 +53,7 @@ impl TryFrom for CollectionMetaOperations { optimizers_config: value.optimizers_config.map(|v| v.into()), shard_number: value.shard_number, on_disk_payload: value.on_disk_payload, + replication_factor: value.replication_factor, }, })) } @@ -65,7 +66,8 @@ impl TryFrom for CollectionMetaOperations { Ok(Self::UpdateCollection(UpdateCollectionOperation { collection_name: value.collection_name, update_collection: UpdateCollection { - optimizers_config: value.optimizers_config.map(|v| v.into()), + optimizers_config: value.optimizers_config.map(Into::into), + params: value.params.map(TryInto::try_into).transpose()?, }, })) } diff --git a/lib/storage/src/content_manager/shard_distribution.rs b/lib/storage/src/content_manager/shard_distribution.rs index ea453ad0b11..111ec1484ee 100644 --- a/lib/storage/src/content_manager/shard_distribution.rs +++ b/lib/storage/src/content_manager/shard_distribution.rs @@ -1,6 +1,8 @@ use std::cmp::Reverse; use std::collections::BinaryHeap; +use std::num::NonZeroU32; +use collection::shard::collection_shard_distribution::{CollectionShardDistribution, ShardType}; use collection::shard::{PeerId, ShardId}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -12,9 +14,9 @@ struct PeerShardCount { } impl PeerShardCount { - fn new(shard_count: usize, peer_id: PeerId) -> Self { + fn new(peer_id: PeerId) -> Self { Self { - shard_count, + shard_count: 0, peer_id, } } @@ -26,39 +28,36 @@ impl PeerShardCount { #[derive(Debug, Deserialize, Serialize, JsonSchema, PartialEq, Eq, Hash, Clone)] pub struct ShardDistributionProposal { - pub distribution: Vec<(ShardId, PeerId)>, + /// A shard can be located on several peers if it has replicas + pub distribution: Vec<(ShardId, Vec)>, } impl ShardDistributionProposal { /// Builds a proposal for the distribution of shards. - /// It will propose to allocate shards so that all peers have the same number of shards at the end. + /// It will propose to allocate shards so that all peers have the same number of shards of this collection at the end. pub fn new( - config_shard_number: u32, + shard_number: NonZeroU32, + replication_factor: NonZeroU32, known_peers: &[PeerId], - known_shards: Vec<(ShardId, PeerId)>, ) -> Self { // min number of shard_count on top to make this a min-heap let mut min_heap: BinaryHeap> = BinaryHeap::with_capacity(known_peers.len()); - - // count number of existing shards per peers - for &peer in known_peers { - let shard_count_on_peer = known_shards - .iter() - .filter(|(_shard_id, peer_id)| *peer_id == peer) - .count(); - min_heap.push(Reverse(PeerShardCount::new(shard_count_on_peer, peer))) + for peer in known_peers { + min_heap.push(Reverse(PeerShardCount::new(*peer))); } - let mut distribution: Vec<(ShardId, PeerId)> = - Vec::with_capacity(config_shard_number as usize); - - // propose the peer with the least amount of existing shards to host the next shard - for shard_id in 0..config_shard_number { - let mut least_loaded_peer = min_heap.peek_mut().unwrap(); - let selected_peer = least_loaded_peer.0.peer_id; - least_loaded_peer.0.inc_shard_count(); - distribution.push((shard_id, selected_peer)); + let mut distribution = Vec::with_capacity(shard_number.get() as usize); + + for shard_id in 0..shard_number.get() { + let mut replicas = Vec::new(); + for _replica in 0..replication_factor.get() { + let mut least_loaded_peer = min_heap.peek_mut().unwrap(); + let selected_peer = least_loaded_peer.0.peer_id; + least_loaded_peer.0.inc_shard_count(); + replicas.push(selected_peer); + } + distribution.push((shard_id, replicas)) } Self { distribution } @@ -67,25 +66,48 @@ impl ShardDistributionProposal { pub fn local_shards_for(&self, peer_id: PeerId) -> Vec { self.distribution .iter() - .filter_map( - |(shard, peer)| { - if peer == &peer_id { - Some(*shard) - } else { - None - } - }, - ) + .filter_map(|(shard, peers)| { + if peers.contains(&peer_id) { + Some(shard) + } else { + None + } + }) + .copied() .collect() } - pub fn remote_shards_for(&self, peer_id: PeerId) -> Vec<(ShardId, PeerId)> { + pub fn remote_shards_for(&self, peer_id: PeerId) -> Vec<(ShardId, Vec)> { self.distribution .iter() - .filter(|(_shard, peer)| peer != &peer_id) - .copied() + .filter(|(_shard, peers)| !peers.contains(&peer_id)) + .cloned() .collect() } + + pub fn into(self, this_peer_id: PeerId) -> CollectionShardDistribution { + CollectionShardDistribution { + shards: self + .distribution + .into_iter() + .map(|(shard_id, peers)| match &peers[..] { + [peer] if *peer == this_peer_id => (shard_id, ShardType::Local), + [peer] => (shard_id, ShardType::Remote(*peer)), + peers => ( + shard_id, + ShardType::ReplicaSet { + local: peers.contains(&this_peer_id), + remote: peers + .iter() + .copied() + .filter(|peer| peer != &this_peer_id) + .collect(), + }, + ), + }) + .collect(), + } + } } #[cfg(test)] @@ -95,18 +117,24 @@ mod tests { #[test] fn test_distribution() { let known_peers = vec![1, 2, 3, 4]; - let distribution = ShardDistributionProposal::new(6, &known_peers, vec![]); + let distribution = ShardDistributionProposal::new( + NonZeroU32::new(6).unwrap(), + NonZeroU32::new(1).unwrap(), + &known_peers, + ); // Check it distribution is as even as possible let mut shard_counts: Vec = vec![0; known_peers.len()]; - for (_shard_id, peer_id) in &distribution.distribution { - let peer_offset = known_peers - .iter() - .enumerate() - .find(|(_, x)| *x == peer_id) - .unwrap() - .0; - shard_counts[peer_offset] += 1; + for (_shard_id, peers) in &distribution.distribution { + for peer_id in peers { + let peer_offset = known_peers + .iter() + .enumerate() + .find(|(_, x)| *x == peer_id) + .unwrap() + .0; + shard_counts[peer_offset] += 1; + } } assert_eq!(shard_counts.iter().sum::(), 6); diff --git a/lib/storage/src/content_manager/toc.rs b/lib/storage/src/content_manager/toc.rs index ca0d647cc73..e21ac6ca65d 100644 --- a/lib/storage/src/content_manager/toc.rs +++ b/lib/storage/src/content_manager/toc.rs @@ -7,8 +7,8 @@ use std::sync::Arc; use collection::collection::Collection; use collection::collection_state; use collection::collection_state::ShardInfo; -use collection::config::{CollectionConfig, CollectionParams}; -use collection::operations::config_diff::{CollectionParamsDiff, DiffConfig}; +use collection::config::{default_replication_factor, CollectionConfig, CollectionParams}; +use collection::operations::config_diff::DiffConfig; use collection::operations::snapshot_ops::SnapshotDescription; use collection::operations::types::{ CountRequest, CountResult, PointRequest, RecommendRequest, RecommendRequestBatch, Record, @@ -208,6 +208,7 @@ impl TableOfContent { hnsw_config: hnsw_config_diff, wal_config: wal_config_diff, optimizers_config: optimizers_config_diff, + replication_factor, } = operation; self.collections @@ -226,6 +227,8 @@ impl TableOfContent { "If shard number was supplied then this exact number should be used in a distribution" ) } + let replication_factor = + replication_factor.unwrap_or_else(|| default_replication_factor().get()); let collection_params = CollectionParams { vectors, @@ -234,8 +237,11 @@ impl TableOfContent { description: "`shard_number` cannot be 0".to_string(), })?, on_disk_payload: on_disk_payload.unwrap_or(self.storage_config.on_disk_payload), - // TODO: use `replication_factor` supplied in `CreateCollection` - replication_factor: collection::config::default_replication_factor(), + replication_factor: NonZeroU32::new(replication_factor).ok_or( + StorageError::BadInput { + description: "`replication_factor` cannot be 0".to_string(), + }, + )?, }; let wal_config = match wal_config_diff { None => self.storage_config.wal.clone(), @@ -260,6 +266,7 @@ impl TableOfContent { }; let collection = Collection::new( collection_name.to_string(), + self.this_peer_id, &collection_path, &snapshots_path, &collection_config, @@ -279,7 +286,7 @@ impl TableOfContent { fn on_peer_failure_callback(&self, collection_name: String) -> replica_set::OnPeerFailure { let proposal_sender = self.consensus_proposal_sender.clone(); - Box::new(move |peer_id, shard_id| { + Arc::new(move |peer_id, shard_id| { let proposal_sender = proposal_sender.clone(); let collection_name = collection_name.clone(); Box::new(async move { @@ -303,9 +310,10 @@ impl TableOfContent { collection_name: &str, operation: UpdateCollection, ) -> Result { - let UpdateCollection { optimizers_config } = operation; - // TODO: get `params` from `UpdateCollection` - let params: Option = None; + let UpdateCollection { + optimizers_config, + params, + } = operation; let collection = self.get_collection(collection_name).await?; if let Some(diff) = optimizers_config { collection.update_optimizer_params_from_diff(diff).await? @@ -392,14 +400,14 @@ impl TableOfContent { operation: CollectionMetaOperations, ) -> Result { match operation { - CollectionMetaOperations::CreateCollectionDistributed(operation, distribution) => { - let local = distribution.local_shards_for(self.this_peer_id); - let remote = distribution.remote_shards_for(self.this_peer_id); - let collection_shard_distribution = CollectionShardDistribution::new(local, remote); + CollectionMetaOperations::CreateCollectionDistributed( + operation, + distribution_proposal, + ) => { self.create_collection( &operation.collection_name, operation.create_collection, - collection_shard_distribution, + distribution_proposal.into(self.this_peer_id), ) .await } @@ -835,21 +843,13 @@ impl TableOfContent { None => { let collection_path = self.create_collection_path(id).await?; let snapshots_path = self.create_snapshots_path(id).await?; - let shard_distribution = CollectionShardDistribution::from_shard_to_peer( + let shard_distribution = CollectionShardDistribution::from_shards_info( self.this_peer_id, - &state - .shards - .iter() - .map(|(id, info)| match info { - ShardInfo::ReplicaSet { replicas: _ } => { - todo!("Handle multiple replicas in shard distribution") - } - ShardInfo::Single(peer_id) => (*id, *peer_id), - }) - .collect(), + state.shards.clone(), ); let collection = Collection::new( id.to_string(), + self.this_peer_id, &collection_path, &snapshots_path, &state.config, @@ -899,11 +899,12 @@ impl TableOfContent { pub async fn suggest_shard_distribution( &self, op: &CreateCollectionOperation, - suggested_shard_number: u32, + suggested_shard_number: NonZeroU32, ) -> ShardDistributionProposal { let shard_number = op .create_collection .shard_number + .and_then(NonZeroU32::new) .unwrap_or(suggested_shard_number); let mut known_peers_set: HashSet<_> = self .channel_service @@ -914,8 +915,14 @@ impl TableOfContent { .collect(); known_peers_set.insert(self.this_peer_id()); let known_peers: Vec<_> = known_peers_set.into_iter().collect(); + let replication_factor = op + .create_collection + .replication_factor + .and_then(NonZeroU32::new) + .unwrap_or_else(default_replication_factor); - let shard_distribution = ShardDistributionProposal::new(shard_number, &known_peers, vec![]); + let shard_distribution = + ShardDistributionProposal::new(shard_number, replication_factor, &known_peers); log::debug!( "Suggesting distribution for {} shards for collection '{}' among {} peers {:?}", diff --git a/lib/storage/src/dispatcher.rs b/lib/storage/src/dispatcher.rs index 0de49aa5514..2d58e977adb 100644 --- a/lib/storage/src/dispatcher.rs +++ b/lib/storage/src/dispatcher.rs @@ -1,3 +1,4 @@ +use std::num::NonZeroU32; use std::ops::Deref; use std::sync::Arc; use std::time::Duration; @@ -49,7 +50,11 @@ impl Dispatcher { let number_of_peers = state.0.peer_count(); let shard_distribution = self .toc - .suggest_shard_distribution(&op, number_of_peers as u32) + .suggest_shard_distribution( + &op, + NonZeroU32::new(number_of_peers as u32) + .expect("Peer count should be always >= 1"), + ) .await; CollectionMetaOperations::CreateCollectionDistributed(op, shard_distribution) } diff --git a/lib/storage/tests/alias_tests.rs b/lib/storage/tests/alias_tests.rs index 8e13ae913eb..283e438a722 100644 --- a/lib/storage/tests/alias_tests.rs +++ b/lib/storage/tests/alias_tests.rs @@ -80,6 +80,7 @@ mod tests { optimizers_config: None, shard_number: Some(1), on_disk_payload: None, + replication_factor: None, }, }), None, diff --git a/src/consensus.rs b/src/consensus.rs index c566a9f3659..be476c6dac2 100644 --- a/src/consensus.rs +++ b/src/consensus.rs @@ -269,11 +269,6 @@ impl Consensus { // This needs to be propagated manually to other peers as it is not contained in any log entry. state_ref.set_first_voter(all_peers.first_peer_id); state_ref.set_conf_state(ConfState::from((vec![all_peers.first_peer_id], vec![])))?; - // TODO: Remove in the next version after 0.10 as it does nothing and is left for compatability - client - .add_peer_as_participant(tonic::Request::new(api::grpc::qdrant::PeerId { id })) - .await - .context("Failed to add peer as participant")?; Ok(()) } @@ -767,6 +762,7 @@ mod tests { optimizers_config: None, shard_number: Some(2), on_disk_payload: None, + replication_factor: None, }, }), None,