diff --git a/docs/grpc/docs.md b/docs/grpc/docs.md
index e325aff7edd..3ead93a7390 100644
--- a/docs/grpc/docs.md
+++ b/docs/grpc/docs.md
@@ -12,6 +12,7 @@
- [CollectionInfo.PayloadSchemaEntry](#qdrant-CollectionInfo-PayloadSchemaEntry)
- [CollectionOperationResponse](#qdrant-CollectionOperationResponse)
- [CollectionParams](#qdrant-CollectionParams)
+ - [CollectionParamsDiff](#qdrant-CollectionParamsDiff)
- [CreateAlias](#qdrant-CreateAlias)
- [CreateCollection](#qdrant-CreateCollection)
- [DeleteAlias](#qdrant-DeleteAlias)
@@ -273,6 +274,22 @@
| shard_number | [uint32](#uint32) | | Number of shards in collection |
| on_disk_payload | [bool](#bool) | | If true - point's payload will not be stored in memory |
| vectors_config | [VectorsConfig](#qdrant-VectorsConfig) | optional | Configuration for vectors |
+| replication_factor | [uint32](#uint32) | | Number of replicas of each shard that network tries to maintain |
+
+
+
+
+
+
+
+
+### CollectionParamsDiff
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| replication_factor | [uint32](#uint32) | optional | Number of replicas of each shard that network tries to maintain |
@@ -311,6 +328,7 @@
| on_disk_payload | [bool](#bool) | optional | If true - point's payload will not be stored in memory |
| timeout | [uint64](#uint64) | optional | Wait timeout for operation commit in seconds, if not specified - default value will be supplied |
| vectors_config | [VectorsConfig](#qdrant-VectorsConfig) | optional | Configuration for vectors |
+| replication_factor | [uint32](#uint32) | optional | Number of replicas of each shard that network tries to maintain, default = 1 |
@@ -543,6 +561,7 @@ If indexation speed have more priority for your - make this parameter lower. If
| collection_name | [string](#string) | | Name of the collection |
| optimizers_config | [OptimizersConfigDiff](#qdrant-OptimizersConfigDiff) | optional | New configuration parameters for the collection |
| timeout | [uint64](#uint64) | optional | Wait timeout for operation commit in seconds, if not specified - default value will be supplied |
+| params | [CollectionParamsDiff](#qdrant-CollectionParamsDiff) | optional | New configuration parameters for the collection |
diff --git a/docs/redoc/master/openapi.json b/docs/redoc/master/openapi.json
index b69750489a5..6e609411f05 100644
--- a/docs/redoc/master/openapi.json
+++ b/docs/redoc/master/openapi.json
@@ -2675,6 +2675,13 @@
"format": "uint32",
"minimum": 1
},
+ "replication_factor": {
+ "description": "Number of replicas for each shard",
+ "default": 1,
+ "type": "integer",
+ "format": "uint32",
+ "minimum": 1
+ },
"on_disk_payload": {
"description": "If true - point's payload will not be stored in memory. It will be read from the disk every time it is requested. This setting saves RAM by (slightly) increasing the response time. Note: those payload values that are involved in filtering and are indexed - remain in RAM.",
"default": false,
@@ -3833,6 +3840,14 @@
"minimum": 0,
"nullable": true
},
+ "replication_factor": {
+ "description": "Number of shards replicas. Default is 1 Minimum is 1",
+ "default": null,
+ "type": "integer",
+ "format": "uint32",
+ "minimum": 0,
+ "nullable": true
+ },
"on_disk_payload": {
"description": "If true - point's payload will not be stored in memory. It will be read from the disk every time it is requested. This setting saves RAM by (slightly) increasing the response time. Note: those payload values that are involved in filtering and are indexed - remain in RAM.",
"default": null,
@@ -3993,6 +4008,29 @@
"nullable": true
}
]
+ },
+ "params": {
+ "description": "Collection base params. If none - values from service configuration file are used.",
+ "anyOf": [
+ {
+ "$ref": "#/components/schemas/CollectionParamsDiff"
+ },
+ {
+ "nullable": true
+ }
+ ]
+ }
+ }
+ },
+ "CollectionParamsDiff": {
+ "type": "object",
+ "properties": {
+ "replication_factor": {
+ "description": "Number of replicas for each shard",
+ "type": "integer",
+ "format": "uint32",
+ "minimum": 1,
+ "nullable": true
}
}
},
diff --git a/lib/api/src/grpc/proto/collections.proto b/lib/api/src/grpc/proto/collections.proto
index 405aa3cc6be..da5cfcdd9c1 100644
--- a/lib/api/src/grpc/proto/collections.proto
+++ b/lib/api/src/grpc/proto/collections.proto
@@ -157,12 +157,14 @@ message CreateCollection {
optional bool on_disk_payload = 8; // If true - point's payload will not be stored in memory
optional uint64 timeout = 9; // Wait timeout for operation commit in seconds, if not specified - default value will be supplied
optional VectorsConfig vectors_config = 10; // Configuration for vectors
+ optional uint32 replication_factor = 11; // Number of replicas of each shard that network tries to maintain, default = 1
}
message UpdateCollection {
string collection_name = 1; // Name of the collection
optional OptimizersConfigDiff optimizers_config = 2; // New configuration parameters for the collection
optional uint64 timeout = 3; // Wait timeout for operation commit in seconds, if not specified - default value will be supplied
+ optional CollectionParamsDiff params = 4; // New configuration parameters for the collection
}
message DeleteCollection {
@@ -181,6 +183,11 @@ message CollectionParams {
uint32 shard_number = 3; // Number of shards in collection
bool on_disk_payload = 4; // If true - point's payload will not be stored in memory
optional VectorsConfig vectors_config = 5; // Configuration for vectors
+ uint32 replication_factor = 6; // Number of replicas of each shard that network tries to maintain
+}
+
+message CollectionParamsDiff {
+ optional uint32 replication_factor = 1; // Number of replicas of each shard that network tries to maintain
}
message CollectionConfig {
diff --git a/lib/api/src/grpc/qdrant.rs b/lib/api/src/grpc/qdrant.rs
index c94eded4645..0a8bac9fd1d 100644
--- a/lib/api/src/grpc/qdrant.rs
+++ b/lib/api/src/grpc/qdrant.rs
@@ -175,6 +175,9 @@ pub struct CreateCollection {
/// Configuration for vectors
#[prost(message, optional, tag="10")]
pub vectors_config: ::core::option::Option,
+ /// Number of replicas of each shard that network tries to maintain, default = 1
+ #[prost(uint32, optional, tag="11")]
+ pub replication_factor: ::core::option::Option,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct UpdateCollection {
@@ -187,6 +190,9 @@ pub struct UpdateCollection {
/// Wait timeout for operation commit in seconds, if not specified - default value will be supplied
#[prost(uint64, optional, tag="3")]
pub timeout: ::core::option::Option,
+ /// New configuration parameters for the collection
+ #[prost(message, optional, tag="4")]
+ pub params: ::core::option::Option,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct DeleteCollection {
@@ -217,6 +223,15 @@ pub struct CollectionParams {
/// Configuration for vectors
#[prost(message, optional, tag="5")]
pub vectors_config: ::core::option::Option,
+ /// Number of replicas of each shard that network tries to maintain
+ #[prost(uint32, tag="6")]
+ pub replication_factor: u32,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct CollectionParamsDiff {
+ /// Number of replicas of each shard that network tries to maintain
+ #[prost(uint32, optional, tag="1")]
+ pub replication_factor: ::core::option::Option,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CollectionConfig {
diff --git a/lib/collection/src/collection.rs b/lib/collection/src/collection.rs
index 8ad6408a7ea..e54c7b84919 100644
--- a/lib/collection/src/collection.rs
+++ b/lib/collection/src/collection.rs
@@ -1,7 +1,7 @@
use std::cmp::max;
use std::collections::{HashMap, HashSet};
use std::future::Future;
-use std::num::NonZeroU32;
+use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::sync::Arc;
@@ -35,11 +35,11 @@ use crate::operations::types::{
};
use crate::operations::{CollectionUpdateOperations, Validate};
use crate::optimizers_builder::OptimizersConfig;
-use crate::shard::collection_shard_distribution::CollectionShardDistribution;
+use crate::shard::collection_shard_distribution::{self, CollectionShardDistribution};
use crate::shard::local_shard::LocalShard;
use crate::shard::remote_shard::RemoteShard;
use crate::shard::replica_set::ReplicaSet;
-use crate::shard::shard_config::{ShardConfig, ShardType};
+use crate::shard::shard_config::{self, ShardConfig};
use crate::shard::shard_holder::{LockedShardHolder, ShardHolder};
use crate::shard::shard_versioning::versioned_shard_path;
use crate::shard::transfer::shard_transfer::{
@@ -80,8 +80,10 @@ impl Collection {
self.id.clone()
}
+ #[allow(clippy::too_many_arguments)]
pub async fn new(
id: CollectionId,
+ this_peer_id: PeerId,
path: &Path,
snapshots_path: &Path,
config: &CollectionConfig,
@@ -97,38 +99,53 @@ impl Collection {
let mut shard_holder = ShardHolder::new(path, HashRing::fair(HASH_RING_SHARD_SCALE))?;
let shared_config = Arc::new(RwLock::new(config.clone()));
- for shard_id in shard_distribution.local {
- let shard_path = create_shard_dir(path, shard_id).await;
- let shard = match shard_path {
- Ok(shard_path) => {
- LocalShard::build(shard_id, id.clone(), &shard_path, shared_config.clone())
- .await
+ for (shard_id, shard_type) in shard_distribution.shards {
+ let shard = match shard_type {
+ collection_shard_distribution::ShardType::Local => {
+ let shard_path = create_shard_dir(path, shard_id).await;
+ let shard = match shard_path {
+ Ok(shard_path) => {
+ LocalShard::build(
+ shard_id,
+ id.clone(),
+ &shard_path,
+ shared_config.clone(),
+ )
+ .await
+ }
+ Err(e) => Err(e),
+ };
+ shard.map(Shard::Local)
}
- Err(e) => Err(e),
- };
-
- let shard = match shard {
- Ok(shard) => shard,
- Err(err) => {
- shard_holder.before_drop().await;
- return Err(err);
+ collection_shard_distribution::ShardType::Remote(peer_id) => {
+ create_shard_dir(path, shard_id)
+ .await
+ .and_then(|shard_path| {
+ RemoteShard::init(
+ shard_id,
+ id.clone(),
+ peer_id,
+ shard_path,
+ channel_service.clone(),
+ )
+ })
+ .map(Shard::Remote)
}
- };
- shard_holder.add_shard(shard_id, Shard::Local(shard));
- }
-
- for (shard_id, peer_id) in shard_distribution.remote {
- let shard = create_shard_dir(path, shard_id)
- .await
- .and_then(|shard_path| {
- RemoteShard::init(
+ collection_shard_distribution::ShardType::ReplicaSet { local, remote } => {
+ ReplicaSet::build(
shard_id,
id.clone(),
- peer_id,
- shard_path,
- channel_service.clone(),
+ this_peer_id,
+ local,
+ remote,
+ on_replica_failure.clone(),
+ path,
+ shared_config.clone(),
)
- });
+ .await
+ .map(Shard::ReplicaSet)
+ }
+ };
let shard = match shard {
Ok(shard) => shard,
Err(err) => {
@@ -136,22 +153,7 @@ impl Collection {
return Err(err);
}
};
- shard_holder.add_shard(shard_id, Shard::Remote(shard));
- }
- // This is a stub to check that types and hidden lifetimes fit
- // It might be worth leaving it here for now until we have an actual impl
- // TODO: Create ReplicaSet shards
- if false {
- let shard = ReplicaSet::new(
- 1,
- 1,
- Default::default(),
- Default::default(),
- Default::default(),
- 0.0,
- on_replica_failure,
- );
- shard_holder.add_shard(0, Shard::ReplicaSet(shard))
+ shard_holder.add_shard(shard_id, shard);
}
let locked_shard_holder = Arc::new(LockedShardHolder::new(shard_holder));
@@ -1037,24 +1039,22 @@ impl Collection {
Ok(points)
}
+ #[allow(unreachable_code, clippy::diverging_sub_expression)]
pub async fn update_params_from_diff(
&self,
params_diff: CollectionParamsDiff,
) -> CollectionResult<()> {
let mut config = self.config.write().await;
- let old_repl_factor = config.params.replication_factor;
config.params = params_diff.update(&config.params)?;
- self.handle_repl_factor_change(old_repl_factor, config.params.replication_factor);
+ self.handle_replica_changes(todo!("supply replica changes"));
Ok(())
}
- pub fn handle_repl_factor_change(&self, old: NonZeroU32, new: NonZeroU32) {
- if old != new {
- // TODO: remove or add replicas. In case of replica addition:
- // 1. Create and mark them as inactive
- // 2. Copy data
- // 3. Mark them as active
- }
+ pub fn handle_replica_changes(&self, _replica_changes: HashSet) {
+ // TODO: remove or add replicas. In case of replica addition:
+ // 1. Create and mark them as inactive
+ // 2. Copy data
+ // 3. Mark them as active
}
/// Updates shard optimization params:
@@ -1240,7 +1240,7 @@ impl Collection {
.map(|(shard_id, shard)| {
let shard_info = match shard {
Shard::ReplicaSet(replicas) => ShardInfo::ReplicaSet {
- replicas: replicas.replica_state.clone(),
+ replicas: replicas.replica_state.deref().clone(),
},
shard => ShardInfo::Single(
*shard
@@ -1370,9 +1370,11 @@ impl Collection {
let shard_config_opt = ShardConfig::load(&shard_path)?;
if let Some(shard_config) = shard_config_opt {
match shard_config.r#type {
- ShardType::Local => LocalShard::restore_snapshot(&shard_path)?,
- ShardType::Remote { .. } => RemoteShard::restore_snapshot(&shard_path),
- ShardType::Temporary => {}
+ shard_config::ShardType::Local => LocalShard::restore_snapshot(&shard_path)?,
+ shard_config::ShardType::Remote { .. } => {
+ RemoteShard::restore_snapshot(&shard_path)
+ }
+ shard_config::ShardType::Temporary => {}
}
} else {
return Err(CollectionError::service_error(format!(
diff --git a/lib/collection/src/collection_state.rs b/lib/collection/src/collection_state.rs
index 614b0ada49a..2c0d46e8478 100644
--- a/lib/collection/src/collection_state.rs
+++ b/lib/collection/src/collection_state.rs
@@ -64,6 +64,7 @@ impl State {
Ok(())
}
+ #[allow(unreachable_code, clippy::diverging_sub_expression)]
async fn apply_config(
new_config: CollectionConfig,
collection: &Collection,
@@ -74,9 +75,8 @@ impl State {
.await?;
// updating replication factor
let mut config = collection.config.write().await;
- let old_repl_factor = config.params.replication_factor;
config.params.replication_factor = new_config.params.replication_factor;
- collection.handle_repl_factor_change(old_repl_factor, config.params.replication_factor);
+ collection.handle_replica_changes(todo!("Calculate and add changes"));
Ok(())
}
diff --git a/lib/collection/src/config.rs b/lib/collection/src/config.rs
index 892d02c87c2..e0e610cb985 100644
--- a/lib/collection/src/config.rs
+++ b/lib/collection/src/config.rs
@@ -53,8 +53,6 @@ pub struct CollectionParams {
#[serde(default = "default_shard_number")]
pub shard_number: NonZeroU32,
/// Number of replicas for each shard
- // TODO: do not skip in v1.0 (when replication ships)
- #[serde(skip)]
#[serde(default = "default_replication_factor")]
pub replication_factor: NonZeroU32,
/// If true - point's payload will not be stored in memory.
diff --git a/lib/collection/src/operations/conversions.rs b/lib/collection/src/operations/conversions.rs
index a8ca9a1950a..7553c8923e6 100644
--- a/lib/collection/src/operations/conversions.rs
+++ b/lib/collection/src/operations/conversions.rs
@@ -7,10 +7,8 @@ use segment::data_types::vectors::{NamedVector, VectorStruct, DEFAULT_VECTOR_NAM
use segment::types::Distance;
use tonic::Status;
-use crate::config::{
- default_replication_factor, CollectionConfig, CollectionParams, VectorParams, VectorsConfig,
- WalConfig,
-};
+use super::config_diff::CollectionParamsDiff;
+use crate::config::{CollectionConfig, CollectionParams, VectorParams, VectorsConfig, WalConfig};
use crate::operations::config_diff::{HnswConfigDiff, OptimizersConfigDiff, WalConfigDiff};
use crate::operations::point_ops::PointsSelector::PointIdsSelector;
use crate::operations::point_ops::{
@@ -42,6 +40,22 @@ impl From for WalConfigDiff {
}
}
+impl TryFrom for CollectionParamsDiff {
+ type Error = Status;
+
+ fn try_from(value: api::grpc::qdrant::CollectionParamsDiff) -> Result {
+ Ok(Self {
+ replication_factor: value
+ .replication_factor
+ .map(|factor| {
+ NonZeroU32::new(factor)
+ .ok_or_else(|| Status::invalid_argument("`replication_factor` cannot be 0"))
+ })
+ .transpose()?,
+ })
+ }
+}
+
impl From for OptimizersConfigDiff {
fn from(value: api::grpc::qdrant::OptimizersConfigDiff) -> Self {
Self {
@@ -115,6 +129,7 @@ impl From for api::grpc::qdrant::CollectionInfo {
Some(api::grpc::qdrant::VectorsConfig { config })
},
shard_number: config.params.shard_number.get(),
+ replication_factor: config.params.replication_factor.get(),
on_disk_payload: config.params.on_disk_payload,
}),
hnsw_config: Some(api::grpc::qdrant::HnswConfigDiff {
@@ -243,43 +258,40 @@ impl TryFrom for CollectionConfig {
Ok(Self {
params: match config.params {
None => return Err(Status::invalid_argument("Malformed CollectionParams type")),
- Some(params) => {
- CollectionParams {
- vectors: match params.vectors_config {
+ Some(params) => CollectionParams {
+ vectors: match params.vectors_config {
+ None => {
+ return Err(Status::invalid_argument(
+ "Expected `vectors` - configuration for vector storage",
+ ))
+ }
+ Some(vector_config) => match vector_config.config {
None => {
return Err(Status::invalid_argument(
"Expected `vectors` - configuration for vector storage",
))
}
- Some(vector_config) => match vector_config.config {
- None => {
- return Err(Status::invalid_argument(
- "Expected `vectors` - configuration for vector storage",
- ))
- }
- Some(api::grpc::qdrant::vectors_config::Config::Params(params)) => {
- VectorsConfig::Single(params.try_into()?)
- }
- Some(api::grpc::qdrant::vectors_config::Config::ParamsMap(
- params_map,
- )) => VectorsConfig::Multi(
- params_map
- .map
- .into_iter()
- .map(|(k, v)| Ok((k, v.try_into()?)))
- .collect::, Status>>(
- )?,
- ),
- },
+ Some(api::grpc::qdrant::vectors_config::Config::Params(params)) => {
+ VectorsConfig::Single(params.try_into()?)
+ }
+ Some(api::grpc::qdrant::vectors_config::Config::ParamsMap(
+ params_map,
+ )) => VectorsConfig::Multi(
+ params_map
+ .map
+ .into_iter()
+ .map(|(k, v)| Ok((k, v.try_into()?)))
+ .collect::, Status>>()?,
+ ),
},
- shard_number: NonZeroU32::new(params.shard_number).ok_or_else(|| {
- Status::invalid_argument("`shard_number` cannot be zero")
- })?,
- on_disk_payload: params.on_disk_payload,
- // TODO: use `repliction_factor` from `config`
- replication_factor: default_replication_factor(),
- }
- }
+ },
+ shard_number: NonZeroU32::new(params.shard_number)
+ .ok_or_else(|| Status::invalid_argument("`shard_number` cannot be zero"))?,
+ on_disk_payload: params.on_disk_payload,
+ replication_factor: NonZeroU32::new(params.replication_factor).ok_or_else(
+ || Status::invalid_argument("`replication_factor` cannot be zero"),
+ )?,
+ },
},
hnsw_config: match config.hnsw_config {
None => return Err(Status::invalid_argument("Malformed HnswConfig type")),
diff --git a/lib/collection/src/save_on_disk.rs b/lib/collection/src/save_on_disk.rs
index 63b741cd28f..e2bb97b2428 100644
--- a/lib/collection/src/save_on_disk.rs
+++ b/lib/collection/src/save_on_disk.rs
@@ -1,6 +1,6 @@
use std::fs::File;
use std::io::BufWriter;
-use std::ops::Deref;
+use std::ops::{Deref, DerefMut};
use std::path::PathBuf;
use atomicwrites::OverwriteBehavior::AllowOverwrite;
@@ -56,7 +56,7 @@ impl Deserialize<'de>> SaveOnDisk {
Ok(output)
}
- fn save(&self) -> Result<(), Error> {
+ pub fn save(&self) -> Result<(), Error> {
AtomicFile::new(&self.path, AllowOverwrite).write(|file| {
let writer = BufWriter::new(file);
serde_json::to_writer(writer, &self.data)
@@ -73,6 +73,12 @@ impl Deref for SaveOnDisk {
}
}
+impl DerefMut for SaveOnDisk {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.data
+ }
+}
+
#[cfg(test)]
mod tests {
use std::fs;
diff --git a/lib/collection/src/shard/collection_shard_distribution.rs b/lib/collection/src/shard/collection_shard_distribution.rs
index d2dee49184e..1df829f4902 100644
--- a/lib/collection/src/shard/collection_shard_distribution.rs
+++ b/lib/collection/src/shard/collection_shard_distribution.rs
@@ -1,55 +1,69 @@
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
+use crate::collection_state::ShardInfo;
use crate::shard::{PeerId, ShardId};
+#[derive(Debug)]
+pub enum ShardType {
+ Local,
+ Remote(PeerId),
+ ReplicaSet {
+ local: bool,
+ remote: HashSet,
+ },
+}
+
#[derive(Debug)]
pub struct CollectionShardDistribution {
- pub local: Vec,
- pub remote: Vec<(ShardId, PeerId)>,
+ pub shards: HashMap,
}
impl CollectionShardDistribution {
- pub fn new(local: Vec, remote: Vec<(ShardId, PeerId)>) -> Self {
- Self { local, remote }
- }
-
pub fn all_local(shard_number: Option) -> Self {
Self {
// This method is called only when distributed deployment is disabled
// so if not specified it will suggest 1 shard per collection for better performance.
- local: (0..shard_number.unwrap_or(1)).collect(),
- remote: vec![],
+ shards: (0..shard_number.unwrap_or(1))
+ .map(|shard_id| (shard_id, ShardType::Local))
+ .collect(),
}
}
- pub fn from_shard_to_peer(this_peer: PeerId, shard_to_peer: &HashMap) -> Self {
- let local = shard_to_peer
- .iter()
- .filter_map(|(shard, peer)| {
- if peer == &this_peer {
- Some(*shard)
- } else {
- None
- }
- })
- .collect();
-
- let remote = shard_to_peer
- .iter()
- .filter_map(|(&shard, &peer)| {
- if peer != this_peer {
- Some((shard, peer))
- } else {
- None
- }
- })
- .clone()
- .collect();
-
- Self { local, remote }
+ pub fn from_shards_info(this_peer: PeerId, shards_info: HashMap) -> Self {
+ Self {
+ shards: shards_info
+ .into_iter()
+ .map(|(shard_id, info)| match info {
+ ShardInfo::ReplicaSet { replicas } => (
+ shard_id,
+ ShardType::ReplicaSet {
+ local: replicas.contains_key(&this_peer),
+ remote: replicas.into_keys().filter(|id| id != &this_peer).collect(),
+ },
+ ),
+ ShardInfo::Single(peer_id) => {
+ if peer_id == this_peer {
+ (shard_id, ShardType::Local)
+ } else {
+ (shard_id, ShardType::Remote(peer_id))
+ }
+ }
+ })
+ .collect(),
+ }
}
pub fn shard_count(&self) -> usize {
- self.local.len() + self.remote.len()
+ self.shards.len()
+ }
+
+ pub fn shard_replica_count(&self) -> usize {
+ self.shards
+ .values()
+ .map(|shard| match shard {
+ ShardType::ReplicaSet { local, remote } => *local as usize + remote.len(),
+ _ => 1,
+ })
+ .sum()
}
}
diff --git a/lib/collection/src/shard/replica_set.rs b/lib/collection/src/shard/replica_set.rs
index bc8144c9023..4c484a80825 100644
--- a/lib/collection/src/shard/replica_set.rs
+++ b/lib/collection/src/shard/replica_set.rs
@@ -1,6 +1,8 @@
-use std::collections::HashMap;
+use std::cmp;
+use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::ops::Deref;
+use std::path::Path;
use std::sync::Arc;
use futures::future::{try_join, try_join_all};
@@ -10,53 +12,95 @@ use segment::types::{
ExtendedPointId, Filter, ScoredPoint, WithPayload, WithPayloadInterface, WithVector,
};
use tokio::runtime::Handle;
+use tokio::sync::RwLock;
use super::local_shard::{drop_and_delete_from_disk, LocalShard};
use super::remote_shard::RemoteShard;
-use super::{PeerId, ShardId, ShardOperation};
+use super::{create_shard_dir, CollectionId, PeerId, ShardId, ShardOperation};
+use crate::config::CollectionConfig;
use crate::operations::types::{
CollectionError, CollectionInfo, CollectionResult, CountRequest, CountResult, PointRequest,
Record, SearchRequestBatch, UpdateResult,
};
use crate::operations::CollectionUpdateOperations;
+use crate::save_on_disk::SaveOnDisk;
pub type IsActive = bool;
pub type OnPeerFailure =
- Box Box + Send> + Send + Sync>;
+ Arc Box + Send> + Send + Sync>;
+
+const READ_REMOTE_REPLICAS: u32 = 2;
+
+const REPLICA_STATE_FILE: &str = "replica_state";
+
+/// Represents a change in replica set, due to scaling of `replication_factor`
+pub enum Change {
+ Add(ShardId, PeerId),
+ Remove(ShardId, PeerId),
+}
/// A set of shard replicas.
/// Handles operations so that the state is consistent across all the replicas of the shard.
/// Prefers local shard for read-only operations.
/// Perform updates on all replicas and report error if there is at least one failure.
+///
+/// `ReplicaSet` should always have >= 2 replicas.
+/// If a user decreases replication factor to 1 - it should be converted to just `Local` or `Remote` shard.
pub struct ReplicaSet {
shard_id: ShardId,
this_peer_id: PeerId,
local: Option,
remotes: Vec,
- pub(crate) replica_state: HashMap,
- read_fan_out_ratio: f32,
+ pub(crate) replica_state: SaveOnDisk>,
+ /// Number of remote replicas to send read requests to.
+ /// If actual number of peers is less than this, then read request will be sent to all of them.
+ read_remote_replicas: u32,
notify_peer_failure_cb: OnPeerFailure,
}
impl ReplicaSet {
- pub fn new(
+ #[allow(clippy::too_many_arguments)]
+ pub async fn build(
shard_id: ShardId,
+ collection_id: CollectionId,
this_peer_id: PeerId,
- local: Option,
- remotes: Vec,
- replica_state: HashMap,
- read_fan_out_ratio: f32,
+ local: bool,
+ remotes: HashSet,
on_peer_failure: OnPeerFailure,
- ) -> Self {
- Self {
+ collection_path: &Path,
+ shared_config: Arc>,
+ ) -> CollectionResult {
+ let shard_path = create_shard_dir(collection_path, shard_id).await?;
+ let local = if local {
+ let shard =
+ LocalShard::build(shard_id, collection_id, &shard_path, shared_config.clone())
+ .await?;
+ Some(shard)
+ } else {
+ None
+ };
+ let mut replica_state: SaveOnDisk> =
+ SaveOnDisk::load_or_init(shard_path.join(REPLICA_STATE_FILE))?;
+ replica_state.write(|rs| {
+ if local.is_some() {
+ rs.insert(this_peer_id, true);
+ }
+ for peer in remotes {
+ rs.insert(peer, true);
+ }
+ })?;
+ Ok(Self {
shard_id,
this_peer_id,
local,
- remotes,
+ // TODO: Initialize remote shards
+ // This requires logic to store several peer ids in remote shard file
+ remotes: Vec::new(),
replica_state,
- read_fan_out_ratio,
+ // TODO: move to collection config
+ read_remote_replicas: READ_REMOTE_REPLICAS,
notify_peer_failure_cb: on_peer_failure,
- }
+ })
}
pub async fn notify_peer_failure(&self, peer_id: PeerId) {
Box::into_pin(self.notify_peer_failure_cb.deref()(peer_id, self.shard_id)).await
@@ -67,12 +111,13 @@ impl ReplicaSet {
}
pub fn set_active(&mut self, peer_id: &PeerId, active: bool) -> CollectionResult<()> {
- *self
- .replica_state
- .get_mut(peer_id)
- .ok_or_else(|| CollectionError::NotFound {
- what: format!("Shard {} replica on peer {peer_id}", self.shard_id),
- })? = active;
+ self.replica_state.write_with_res(|rs| {
+ *rs.get_mut(peer_id)
+ .ok_or_else(|| CollectionError::NotFound {
+ what: format!("Shard {} replica on peer {peer_id}", self.shard_id),
+ })? = active;
+ Ok::<(), CollectionError>(())
+ })?;
Ok(())
}
@@ -110,6 +155,7 @@ impl ReplicaSet {
todo!("Add remote replica")
}
}
+ self.replica_state.save()?;
Ok(())
}
@@ -152,8 +198,10 @@ impl ReplicaSet {
)));
}
- let fan_out_selection =
- (self.read_fan_out_ratio * active_remote_shards.len() as f32).ceil() as usize;
+ let fan_out_selection = cmp::min(
+ active_remote_shards.len(),
+ self.read_remote_replicas as usize,
+ );
let mut futures = FuturesUnordered::new();
for remote in &active_remote_shards[0..fan_out_selection] {
diff --git a/lib/collection/src/tests/snapshot_test.rs b/lib/collection/src/tests/snapshot_test.rs
index fec5d66fe51..94dbcdc6103 100644
--- a/lib/collection/src/tests/snapshot_test.rs
+++ b/lib/collection/src/tests/snapshot_test.rs
@@ -1,4 +1,6 @@
+use std::collections::HashMap;
use std::num::{NonZeroU32, NonZeroU64};
+use std::sync::Arc;
use segment::types::Distance;
use tempfile::Builder;
@@ -6,7 +8,7 @@ use tempfile::Builder;
use crate::collection::Collection;
use crate::config::{CollectionConfig, CollectionParams, VectorParams, VectorsConfig, WalConfig};
use crate::optimizers_builder::OptimizersConfig;
-use crate::shard::collection_shard_distribution::CollectionShardDistribution;
+use crate::shard::collection_shard_distribution::{self, CollectionShardDistribution};
use crate::shard::replica_set::OnPeerFailure;
use crate::shard::{ChannelService, Shard};
@@ -22,7 +24,7 @@ const TEST_OPTIMIZERS_CONFIG: OptimizersConfig = OptimizersConfig {
};
pub fn dummy_on_replica_failure() -> OnPeerFailure {
- Box::new(move |_peer_id, _shard_id| Box::new(async {}))
+ Arc::new(move |_peer_id, _shard_id| Box::new(async {}))
}
#[tokio::test]
@@ -57,13 +59,18 @@ async fn test_snapshot_collection() {
.unwrap();
let collection_name = "test".to_string();
let collection_name_rec = "test_rec".to_string();
+ let mut shards = HashMap::new();
+ shards.insert(0, collection_shard_distribution::ShardType::Local);
+ shards.insert(1, collection_shard_distribution::ShardType::Local);
+ shards.insert(2, collection_shard_distribution::ShardType::Remote(10_000));
let mut collection = Collection::new(
collection_name,
+ 1,
collection_dir.path(),
snapshots_path.path(),
&config,
- CollectionShardDistribution::new(vec![0, 1], vec![(2, 10000)]),
+ CollectionShardDistribution { shards },
ChannelService::default(),
dummy_on_replica_failure(),
)
diff --git a/lib/collection/tests/common/mod.rs b/lib/collection/tests/common/mod.rs
index 1fc9e840ad5..3211262ba53 100644
--- a/lib/collection/tests/common/mod.rs
+++ b/lib/collection/tests/common/mod.rs
@@ -2,6 +2,7 @@
use std::num::{NonZeroU32, NonZeroU64};
use std::path::Path;
+use std::sync::Arc;
use collection::collection::Collection;
use collection::config::{CollectionConfig, CollectionParams, VectorParams, WalConfig};
@@ -68,7 +69,7 @@ pub async fn simple_collection_fixture(collection_path: &Path, shard_number: u32
}
pub fn dummy_on_replica_failure() -> OnPeerFailure {
- Box::new(move |_peer_id, _shard_id| Box::new(async {}))
+ Arc::new(move |_peer_id, _shard_id| Box::new(async {}))
}
/// Default to a collection with all the shards local
@@ -80,6 +81,7 @@ pub async fn new_local_collection(
) -> Result {
Collection::new(
id,
+ 0,
path,
snapshots_path,
config,
diff --git a/lib/storage/src/content_manager/collection_meta_ops.rs b/lib/storage/src/content_manager/collection_meta_ops.rs
index 5e37c8a1422..0dae3ca1b8f 100644
--- a/lib/storage/src/content_manager/collection_meta_ops.rs
+++ b/lib/storage/src/content_manager/collection_meta_ops.rs
@@ -1,5 +1,7 @@
use collection::config::VectorsConfig;
-use collection::operations::config_diff::{HnswConfigDiff, OptimizersConfigDiff, WalConfigDiff};
+use collection::operations::config_diff::{
+ CollectionParamsDiff, HnswConfigDiff, OptimizersConfigDiff, WalConfigDiff,
+};
use collection::shard::{CollectionId, PeerId, ShardId, ShardTransfer};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
@@ -92,6 +94,11 @@ pub struct CreateCollection {
/// Minimum is 1
#[serde(default)]
pub shard_number: Option,
+ /// Number of shards replicas.
+ /// Default is 1
+ /// Minimum is 1
+ #[serde(default)]
+ pub replication_factor: Option,
/// If true - point's payload will not be stored in memory.
/// It will be read from the disk every time it is requested.
/// This setting saves RAM by (slightly) increasing the response time.
@@ -107,7 +114,7 @@ pub struct CreateCollection {
}
/// Operation for creating new collection and (optionally) specify index params
-#[derive(Debug, Deserialize, Serialize, JsonSchema, PartialEq, Eq, Hash, Clone)]
+#[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Hash, Clone)]
#[serde(rename_all = "snake_case")]
pub struct CreateCollectionOperation {
pub collection_name: String,
@@ -122,10 +129,12 @@ pub struct UpdateCollection {
/// Custom params for Optimizers. If none - values from service configuration file are used.
/// This operation is blocking, it will only proceed ones all current optimizations are complete
pub optimizers_config: Option, // ToDo: Allow updates for other configuration params as well
+ /// Collection base params. If none - values from service configuration file are used.
+ pub params: Option,
}
/// Operation for updating parameters of the existing collection
-#[derive(Debug, Deserialize, Serialize, JsonSchema, PartialEq, Eq, Hash, Clone)]
+#[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Hash, Clone)]
#[serde(rename_all = "snake_case")]
pub struct UpdateCollectionOperation {
pub collection_name: String,
@@ -143,11 +152,11 @@ pub struct ChangeAliasesOperation {
}
/// Operation for deleting collection with given name
-#[derive(Debug, Deserialize, Serialize, JsonSchema, PartialEq, Eq, Hash, Clone)]
+#[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Hash, Clone)]
#[serde(rename_all = "snake_case")]
pub struct DeleteCollectionOperation(pub String);
-#[derive(Debug, Deserialize, Serialize, JsonSchema, PartialEq, Eq, Hash, Clone)]
+#[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Hash, Clone)]
pub enum ShardTransferOperations {
Start(ShardTransfer),
Finish(ShardTransfer),
@@ -158,7 +167,7 @@ pub enum ShardTransferOperations {
}
/// Sets the state of shard replica
-#[derive(Debug, Deserialize, Serialize, JsonSchema, PartialEq, Eq, Hash, Clone)]
+#[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Hash, Clone)]
pub struct SetShardReplicaState {
pub collection_name: String,
pub shard_id: ShardId,
@@ -168,7 +177,7 @@ pub struct SetShardReplicaState {
}
/// Enumeration of all possible collection update operations
-#[derive(Debug, Deserialize, Serialize, JsonSchema, PartialEq, Eq, Hash, Clone)]
+#[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Hash, Clone)]
#[serde(rename_all = "snake_case")]
pub enum CollectionMetaOperations {
CreateCollection(CreateCollectionOperation),
diff --git a/lib/storage/src/content_manager/consensus_state.rs b/lib/storage/src/content_manager/consensus_state.rs
index 181131e73b0..6876b02d4c9 100644
--- a/lib/storage/src/content_manager/consensus_state.rs
+++ b/lib/storage/src/content_manager/consensus_state.rs
@@ -214,7 +214,10 @@ impl ConsensusState {
}
} else if entry.get_context().is_empty() {
// Allow empty context for compatibility
- // TODO: remove in the next version after 0.10
+ log::warn!(
+ "Outdated peer addition entry found with index: {}",
+ entry.get_index()
+ )
} else {
// Should not be reachable as it is checked in API
return Err(StorageError::ServiceError {
diff --git a/lib/storage/src/content_manager/conversions.rs b/lib/storage/src/content_manager/conversions.rs
index 280ce5dce9a..278e9df9256 100644
--- a/lib/storage/src/content_manager/conversions.rs
+++ b/lib/storage/src/content_manager/conversions.rs
@@ -53,6 +53,7 @@ impl TryFrom for CollectionMetaOperations {
optimizers_config: value.optimizers_config.map(|v| v.into()),
shard_number: value.shard_number,
on_disk_payload: value.on_disk_payload,
+ replication_factor: value.replication_factor,
},
}))
}
@@ -65,7 +66,8 @@ impl TryFrom for CollectionMetaOperations {
Ok(Self::UpdateCollection(UpdateCollectionOperation {
collection_name: value.collection_name,
update_collection: UpdateCollection {
- optimizers_config: value.optimizers_config.map(|v| v.into()),
+ optimizers_config: value.optimizers_config.map(Into::into),
+ params: value.params.map(TryInto::try_into).transpose()?,
},
}))
}
diff --git a/lib/storage/src/content_manager/shard_distribution.rs b/lib/storage/src/content_manager/shard_distribution.rs
index ea453ad0b11..111ec1484ee 100644
--- a/lib/storage/src/content_manager/shard_distribution.rs
+++ b/lib/storage/src/content_manager/shard_distribution.rs
@@ -1,6 +1,8 @@
use std::cmp::Reverse;
use std::collections::BinaryHeap;
+use std::num::NonZeroU32;
+use collection::shard::collection_shard_distribution::{CollectionShardDistribution, ShardType};
use collection::shard::{PeerId, ShardId};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
@@ -12,9 +14,9 @@ struct PeerShardCount {
}
impl PeerShardCount {
- fn new(shard_count: usize, peer_id: PeerId) -> Self {
+ fn new(peer_id: PeerId) -> Self {
Self {
- shard_count,
+ shard_count: 0,
peer_id,
}
}
@@ -26,39 +28,36 @@ impl PeerShardCount {
#[derive(Debug, Deserialize, Serialize, JsonSchema, PartialEq, Eq, Hash, Clone)]
pub struct ShardDistributionProposal {
- pub distribution: Vec<(ShardId, PeerId)>,
+ /// A shard can be located on several peers if it has replicas
+ pub distribution: Vec<(ShardId, Vec)>,
}
impl ShardDistributionProposal {
/// Builds a proposal for the distribution of shards.
- /// It will propose to allocate shards so that all peers have the same number of shards at the end.
+ /// It will propose to allocate shards so that all peers have the same number of shards of this collection at the end.
pub fn new(
- config_shard_number: u32,
+ shard_number: NonZeroU32,
+ replication_factor: NonZeroU32,
known_peers: &[PeerId],
- known_shards: Vec<(ShardId, PeerId)>,
) -> Self {
// min number of shard_count on top to make this a min-heap
let mut min_heap: BinaryHeap> =
BinaryHeap::with_capacity(known_peers.len());
-
- // count number of existing shards per peers
- for &peer in known_peers {
- let shard_count_on_peer = known_shards
- .iter()
- .filter(|(_shard_id, peer_id)| *peer_id == peer)
- .count();
- min_heap.push(Reverse(PeerShardCount::new(shard_count_on_peer, peer)))
+ for peer in known_peers {
+ min_heap.push(Reverse(PeerShardCount::new(*peer)));
}
- let mut distribution: Vec<(ShardId, PeerId)> =
- Vec::with_capacity(config_shard_number as usize);
-
- // propose the peer with the least amount of existing shards to host the next shard
- for shard_id in 0..config_shard_number {
- let mut least_loaded_peer = min_heap.peek_mut().unwrap();
- let selected_peer = least_loaded_peer.0.peer_id;
- least_loaded_peer.0.inc_shard_count();
- distribution.push((shard_id, selected_peer));
+ let mut distribution = Vec::with_capacity(shard_number.get() as usize);
+
+ for shard_id in 0..shard_number.get() {
+ let mut replicas = Vec::new();
+ for _replica in 0..replication_factor.get() {
+ let mut least_loaded_peer = min_heap.peek_mut().unwrap();
+ let selected_peer = least_loaded_peer.0.peer_id;
+ least_loaded_peer.0.inc_shard_count();
+ replicas.push(selected_peer);
+ }
+ distribution.push((shard_id, replicas))
}
Self { distribution }
@@ -67,25 +66,48 @@ impl ShardDistributionProposal {
pub fn local_shards_for(&self, peer_id: PeerId) -> Vec {
self.distribution
.iter()
- .filter_map(
- |(shard, peer)| {
- if peer == &peer_id {
- Some(*shard)
- } else {
- None
- }
- },
- )
+ .filter_map(|(shard, peers)| {
+ if peers.contains(&peer_id) {
+ Some(shard)
+ } else {
+ None
+ }
+ })
+ .copied()
.collect()
}
- pub fn remote_shards_for(&self, peer_id: PeerId) -> Vec<(ShardId, PeerId)> {
+ pub fn remote_shards_for(&self, peer_id: PeerId) -> Vec<(ShardId, Vec)> {
self.distribution
.iter()
- .filter(|(_shard, peer)| peer != &peer_id)
- .copied()
+ .filter(|(_shard, peers)| !peers.contains(&peer_id))
+ .cloned()
.collect()
}
+
+ pub fn into(self, this_peer_id: PeerId) -> CollectionShardDistribution {
+ CollectionShardDistribution {
+ shards: self
+ .distribution
+ .into_iter()
+ .map(|(shard_id, peers)| match &peers[..] {
+ [peer] if *peer == this_peer_id => (shard_id, ShardType::Local),
+ [peer] => (shard_id, ShardType::Remote(*peer)),
+ peers => (
+ shard_id,
+ ShardType::ReplicaSet {
+ local: peers.contains(&this_peer_id),
+ remote: peers
+ .iter()
+ .copied()
+ .filter(|peer| peer != &this_peer_id)
+ .collect(),
+ },
+ ),
+ })
+ .collect(),
+ }
+ }
}
#[cfg(test)]
@@ -95,18 +117,24 @@ mod tests {
#[test]
fn test_distribution() {
let known_peers = vec![1, 2, 3, 4];
- let distribution = ShardDistributionProposal::new(6, &known_peers, vec![]);
+ let distribution = ShardDistributionProposal::new(
+ NonZeroU32::new(6).unwrap(),
+ NonZeroU32::new(1).unwrap(),
+ &known_peers,
+ );
// Check it distribution is as even as possible
let mut shard_counts: Vec = vec![0; known_peers.len()];
- for (_shard_id, peer_id) in &distribution.distribution {
- let peer_offset = known_peers
- .iter()
- .enumerate()
- .find(|(_, x)| *x == peer_id)
- .unwrap()
- .0;
- shard_counts[peer_offset] += 1;
+ for (_shard_id, peers) in &distribution.distribution {
+ for peer_id in peers {
+ let peer_offset = known_peers
+ .iter()
+ .enumerate()
+ .find(|(_, x)| *x == peer_id)
+ .unwrap()
+ .0;
+ shard_counts[peer_offset] += 1;
+ }
}
assert_eq!(shard_counts.iter().sum::(), 6);
diff --git a/lib/storage/src/content_manager/toc.rs b/lib/storage/src/content_manager/toc.rs
index ca0d647cc73..e21ac6ca65d 100644
--- a/lib/storage/src/content_manager/toc.rs
+++ b/lib/storage/src/content_manager/toc.rs
@@ -7,8 +7,8 @@ use std::sync::Arc;
use collection::collection::Collection;
use collection::collection_state;
use collection::collection_state::ShardInfo;
-use collection::config::{CollectionConfig, CollectionParams};
-use collection::operations::config_diff::{CollectionParamsDiff, DiffConfig};
+use collection::config::{default_replication_factor, CollectionConfig, CollectionParams};
+use collection::operations::config_diff::DiffConfig;
use collection::operations::snapshot_ops::SnapshotDescription;
use collection::operations::types::{
CountRequest, CountResult, PointRequest, RecommendRequest, RecommendRequestBatch, Record,
@@ -208,6 +208,7 @@ impl TableOfContent {
hnsw_config: hnsw_config_diff,
wal_config: wal_config_diff,
optimizers_config: optimizers_config_diff,
+ replication_factor,
} = operation;
self.collections
@@ -226,6 +227,8 @@ impl TableOfContent {
"If shard number was supplied then this exact number should be used in a distribution"
)
}
+ let replication_factor =
+ replication_factor.unwrap_or_else(|| default_replication_factor().get());
let collection_params = CollectionParams {
vectors,
@@ -234,8 +237,11 @@ impl TableOfContent {
description: "`shard_number` cannot be 0".to_string(),
})?,
on_disk_payload: on_disk_payload.unwrap_or(self.storage_config.on_disk_payload),
- // TODO: use `replication_factor` supplied in `CreateCollection`
- replication_factor: collection::config::default_replication_factor(),
+ replication_factor: NonZeroU32::new(replication_factor).ok_or(
+ StorageError::BadInput {
+ description: "`replication_factor` cannot be 0".to_string(),
+ },
+ )?,
};
let wal_config = match wal_config_diff {
None => self.storage_config.wal.clone(),
@@ -260,6 +266,7 @@ impl TableOfContent {
};
let collection = Collection::new(
collection_name.to_string(),
+ self.this_peer_id,
&collection_path,
&snapshots_path,
&collection_config,
@@ -279,7 +286,7 @@ impl TableOfContent {
fn on_peer_failure_callback(&self, collection_name: String) -> replica_set::OnPeerFailure {
let proposal_sender = self.consensus_proposal_sender.clone();
- Box::new(move |peer_id, shard_id| {
+ Arc::new(move |peer_id, shard_id| {
let proposal_sender = proposal_sender.clone();
let collection_name = collection_name.clone();
Box::new(async move {
@@ -303,9 +310,10 @@ impl TableOfContent {
collection_name: &str,
operation: UpdateCollection,
) -> Result {
- let UpdateCollection { optimizers_config } = operation;
- // TODO: get `params` from `UpdateCollection`
- let params: Option = None;
+ let UpdateCollection {
+ optimizers_config,
+ params,
+ } = operation;
let collection = self.get_collection(collection_name).await?;
if let Some(diff) = optimizers_config {
collection.update_optimizer_params_from_diff(diff).await?
@@ -392,14 +400,14 @@ impl TableOfContent {
operation: CollectionMetaOperations,
) -> Result {
match operation {
- CollectionMetaOperations::CreateCollectionDistributed(operation, distribution) => {
- let local = distribution.local_shards_for(self.this_peer_id);
- let remote = distribution.remote_shards_for(self.this_peer_id);
- let collection_shard_distribution = CollectionShardDistribution::new(local, remote);
+ CollectionMetaOperations::CreateCollectionDistributed(
+ operation,
+ distribution_proposal,
+ ) => {
self.create_collection(
&operation.collection_name,
operation.create_collection,
- collection_shard_distribution,
+ distribution_proposal.into(self.this_peer_id),
)
.await
}
@@ -835,21 +843,13 @@ impl TableOfContent {
None => {
let collection_path = self.create_collection_path(id).await?;
let snapshots_path = self.create_snapshots_path(id).await?;
- let shard_distribution = CollectionShardDistribution::from_shard_to_peer(
+ let shard_distribution = CollectionShardDistribution::from_shards_info(
self.this_peer_id,
- &state
- .shards
- .iter()
- .map(|(id, info)| match info {
- ShardInfo::ReplicaSet { replicas: _ } => {
- todo!("Handle multiple replicas in shard distribution")
- }
- ShardInfo::Single(peer_id) => (*id, *peer_id),
- })
- .collect(),
+ state.shards.clone(),
);
let collection = Collection::new(
id.to_string(),
+ self.this_peer_id,
&collection_path,
&snapshots_path,
&state.config,
@@ -899,11 +899,12 @@ impl TableOfContent {
pub async fn suggest_shard_distribution(
&self,
op: &CreateCollectionOperation,
- suggested_shard_number: u32,
+ suggested_shard_number: NonZeroU32,
) -> ShardDistributionProposal {
let shard_number = op
.create_collection
.shard_number
+ .and_then(NonZeroU32::new)
.unwrap_or(suggested_shard_number);
let mut known_peers_set: HashSet<_> = self
.channel_service
@@ -914,8 +915,14 @@ impl TableOfContent {
.collect();
known_peers_set.insert(self.this_peer_id());
let known_peers: Vec<_> = known_peers_set.into_iter().collect();
+ let replication_factor = op
+ .create_collection
+ .replication_factor
+ .and_then(NonZeroU32::new)
+ .unwrap_or_else(default_replication_factor);
- let shard_distribution = ShardDistributionProposal::new(shard_number, &known_peers, vec![]);
+ let shard_distribution =
+ ShardDistributionProposal::new(shard_number, replication_factor, &known_peers);
log::debug!(
"Suggesting distribution for {} shards for collection '{}' among {} peers {:?}",
diff --git a/lib/storage/src/dispatcher.rs b/lib/storage/src/dispatcher.rs
index 0de49aa5514..2d58e977adb 100644
--- a/lib/storage/src/dispatcher.rs
+++ b/lib/storage/src/dispatcher.rs
@@ -1,3 +1,4 @@
+use std::num::NonZeroU32;
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;
@@ -49,7 +50,11 @@ impl Dispatcher {
let number_of_peers = state.0.peer_count();
let shard_distribution = self
.toc
- .suggest_shard_distribution(&op, number_of_peers as u32)
+ .suggest_shard_distribution(
+ &op,
+ NonZeroU32::new(number_of_peers as u32)
+ .expect("Peer count should be always >= 1"),
+ )
.await;
CollectionMetaOperations::CreateCollectionDistributed(op, shard_distribution)
}
diff --git a/lib/storage/tests/alias_tests.rs b/lib/storage/tests/alias_tests.rs
index 8e13ae913eb..283e438a722 100644
--- a/lib/storage/tests/alias_tests.rs
+++ b/lib/storage/tests/alias_tests.rs
@@ -80,6 +80,7 @@ mod tests {
optimizers_config: None,
shard_number: Some(1),
on_disk_payload: None,
+ replication_factor: None,
},
}),
None,
diff --git a/src/consensus.rs b/src/consensus.rs
index c566a9f3659..be476c6dac2 100644
--- a/src/consensus.rs
+++ b/src/consensus.rs
@@ -269,11 +269,6 @@ impl Consensus {
// This needs to be propagated manually to other peers as it is not contained in any log entry.
state_ref.set_first_voter(all_peers.first_peer_id);
state_ref.set_conf_state(ConfState::from((vec![all_peers.first_peer_id], vec![])))?;
- // TODO: Remove in the next version after 0.10 as it does nothing and is left for compatability
- client
- .add_peer_as_participant(tonic::Request::new(api::grpc::qdrant::PeerId { id }))
- .await
- .context("Failed to add peer as participant")?;
Ok(())
}
@@ -767,6 +762,7 @@ mod tests {
optimizers_config: None,
shard_number: Some(2),
on_disk_payload: None,
+ replication_factor: None,
},
}),
None,