Skip to content

Commit

Permalink
Revert "Revert replication changes (qdrant#1055)" (qdrant#1056)
Browse files Browse the repository at this point in the history
This reverts commit a2ed0cf.
  • Loading branch information
e-ivkov authored Sep 23, 2022
1 parent a2ed0cf commit 9a3fad4
Show file tree
Hide file tree
Showing 21 changed files with 466 additions and 247 deletions.
19 changes: 19 additions & 0 deletions docs/grpc/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- [CollectionInfo.PayloadSchemaEntry](#qdrant-CollectionInfo-PayloadSchemaEntry)
- [CollectionOperationResponse](#qdrant-CollectionOperationResponse)
- [CollectionParams](#qdrant-CollectionParams)
- [CollectionParamsDiff](#qdrant-CollectionParamsDiff)
- [CreateAlias](#qdrant-CreateAlias)
- [CreateCollection](#qdrant-CreateCollection)
- [DeleteAlias](#qdrant-DeleteAlias)
Expand Down Expand Up @@ -273,6 +274,22 @@
| shard_number | [uint32](#uint32) | | Number of shards in collection |
| on_disk_payload | [bool](#bool) | | If true - point's payload will not be stored in memory |
| vectors_config | [VectorsConfig](#qdrant-VectorsConfig) | optional | Configuration for vectors |
| replication_factor | [uint32](#uint32) | | Number of replicas of each shard that network tries to maintain |






<a name="qdrant-CollectionParamsDiff"></a>

### CollectionParamsDiff



| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| replication_factor | [uint32](#uint32) | optional | Number of replicas of each shard that network tries to maintain |



Expand Down Expand Up @@ -311,6 +328,7 @@
| on_disk_payload | [bool](#bool) | optional | If true - point&#39;s payload will not be stored in memory |
| timeout | [uint64](#uint64) | optional | Wait timeout for operation commit in seconds, if not specified - default value will be supplied |
| vectors_config | [VectorsConfig](#qdrant-VectorsConfig) | optional | Configuration for vectors |
| replication_factor | [uint32](#uint32) | optional | Number of replicas of each shard that network tries to maintain, default = 1 |



Expand Down Expand Up @@ -543,6 +561,7 @@ If indexation speed have more priority for your - make this parameter lower. If
| collection_name | [string](#string) | | Name of the collection |
| optimizers_config | [OptimizersConfigDiff](#qdrant-OptimizersConfigDiff) | optional | New configuration parameters for the collection |
| timeout | [uint64](#uint64) | optional | Wait timeout for operation commit in seconds, if not specified - default value will be supplied |
| params | [CollectionParamsDiff](#qdrant-CollectionParamsDiff) | optional | New configuration parameters for the collection |



Expand Down
38 changes: 38 additions & 0 deletions docs/redoc/master/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -2675,6 +2675,13 @@
"format": "uint32",
"minimum": 1
},
"replication_factor": {
"description": "Number of replicas for each shard",
"default": 1,
"type": "integer",
"format": "uint32",
"minimum": 1
},
"on_disk_payload": {
"description": "If true - point's payload will not be stored in memory. It will be read from the disk every time it is requested. This setting saves RAM by (slightly) increasing the response time. Note: those payload values that are involved in filtering and are indexed - remain in RAM.",
"default": false,
Expand Down Expand Up @@ -3833,6 +3840,14 @@
"minimum": 0,
"nullable": true
},
"replication_factor": {
"description": "Number of shards replicas. Default is 1 Minimum is 1",
"default": null,
"type": "integer",
"format": "uint32",
"minimum": 0,
"nullable": true
},
"on_disk_payload": {
"description": "If true - point's payload will not be stored in memory. It will be read from the disk every time it is requested. This setting saves RAM by (slightly) increasing the response time. Note: those payload values that are involved in filtering and are indexed - remain in RAM.",
"default": null,
Expand Down Expand Up @@ -3993,6 +4008,29 @@
"nullable": true
}
]
},
"params": {
"description": "Collection base params. If none - values from service configuration file are used.",
"anyOf": [
{
"$ref": "#/components/schemas/CollectionParamsDiff"
},
{
"nullable": true
}
]
}
}
},
"CollectionParamsDiff": {
"type": "object",
"properties": {
"replication_factor": {
"description": "Number of replicas for each shard",
"type": "integer",
"format": "uint32",
"minimum": 1,
"nullable": true
}
}
},
Expand Down
7 changes: 7 additions & 0 deletions lib/api/src/grpc/proto/collections.proto
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,14 @@ message CreateCollection {
optional bool on_disk_payload = 8; // If true - point's payload will not be stored in memory
optional uint64 timeout = 9; // Wait timeout for operation commit in seconds, if not specified - default value will be supplied
optional VectorsConfig vectors_config = 10; // Configuration for vectors
optional uint32 replication_factor = 11; // Number of replicas of each shard that network tries to maintain, default = 1
}

message UpdateCollection {
string collection_name = 1; // Name of the collection
optional OptimizersConfigDiff optimizers_config = 2; // New configuration parameters for the collection
optional uint64 timeout = 3; // Wait timeout for operation commit in seconds, if not specified - default value will be supplied
optional CollectionParamsDiff params = 4; // New configuration parameters for the collection
}

message DeleteCollection {
Expand All @@ -181,6 +183,11 @@ message CollectionParams {
uint32 shard_number = 3; // Number of shards in collection
bool on_disk_payload = 4; // If true - point's payload will not be stored in memory
optional VectorsConfig vectors_config = 5; // Configuration for vectors
uint32 replication_factor = 6; // Number of replicas of each shard that network tries to maintain
}

message CollectionParamsDiff {
optional uint32 replication_factor = 1; // Number of replicas of each shard that network tries to maintain
}

message CollectionConfig {
Expand Down
15 changes: 15 additions & 0 deletions lib/api/src/grpc/qdrant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ pub struct CreateCollection {
/// Configuration for vectors
#[prost(message, optional, tag="10")]
pub vectors_config: ::core::option::Option<VectorsConfig>,
/// Number of replicas of each shard that network tries to maintain, default = 1
#[prost(uint32, optional, tag="11")]
pub replication_factor: ::core::option::Option<u32>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct UpdateCollection {
Expand All @@ -187,6 +190,9 @@ pub struct UpdateCollection {
/// Wait timeout for operation commit in seconds, if not specified - default value will be supplied
#[prost(uint64, optional, tag="3")]
pub timeout: ::core::option::Option<u64>,
/// New configuration parameters for the collection
#[prost(message, optional, tag="4")]
pub params: ::core::option::Option<CollectionParamsDiff>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct DeleteCollection {
Expand Down Expand Up @@ -217,6 +223,15 @@ pub struct CollectionParams {
/// Configuration for vectors
#[prost(message, optional, tag="5")]
pub vectors_config: ::core::option::Option<VectorsConfig>,
/// Number of replicas of each shard that network tries to maintain
#[prost(uint32, tag="6")]
pub replication_factor: u32,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CollectionParamsDiff {
/// Number of replicas of each shard that network tries to maintain
#[prost(uint32, optional, tag="1")]
pub replication_factor: ::core::option::Option<u32>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CollectionConfig {
Expand Down
120 changes: 61 additions & 59 deletions lib/collection/src/collection.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::cmp::max;
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::num::NonZeroU32;
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::sync::Arc;

Expand Down Expand Up @@ -35,11 +35,11 @@ use crate::operations::types::{
};
use crate::operations::{CollectionUpdateOperations, Validate};
use crate::optimizers_builder::OptimizersConfig;
use crate::shard::collection_shard_distribution::CollectionShardDistribution;
use crate::shard::collection_shard_distribution::{self, CollectionShardDistribution};
use crate::shard::local_shard::LocalShard;
use crate::shard::remote_shard::RemoteShard;
use crate::shard::replica_set::ReplicaSet;
use crate::shard::shard_config::{ShardConfig, ShardType};
use crate::shard::shard_config::{self, ShardConfig};
use crate::shard::shard_holder::{LockedShardHolder, ShardHolder};
use crate::shard::shard_versioning::versioned_shard_path;
use crate::shard::transfer::shard_transfer::{
Expand Down Expand Up @@ -80,8 +80,10 @@ impl Collection {
self.id.clone()
}

#[allow(clippy::too_many_arguments)]
pub async fn new(
id: CollectionId,
this_peer_id: PeerId,
path: &Path,
snapshots_path: &Path,
config: &CollectionConfig,
Expand All @@ -97,61 +99,61 @@ impl Collection {
let mut shard_holder = ShardHolder::new(path, HashRing::fair(HASH_RING_SHARD_SCALE))?;

let shared_config = Arc::new(RwLock::new(config.clone()));
for shard_id in shard_distribution.local {
let shard_path = create_shard_dir(path, shard_id).await;
let shard = match shard_path {
Ok(shard_path) => {
LocalShard::build(shard_id, id.clone(), &shard_path, shared_config.clone())
.await
for (shard_id, shard_type) in shard_distribution.shards {
let shard = match shard_type {
collection_shard_distribution::ShardType::Local => {
let shard_path = create_shard_dir(path, shard_id).await;
let shard = match shard_path {
Ok(shard_path) => {
LocalShard::build(
shard_id,
id.clone(),
&shard_path,
shared_config.clone(),
)
.await
}
Err(e) => Err(e),
};
shard.map(Shard::Local)
}
Err(e) => Err(e),
};

let shard = match shard {
Ok(shard) => shard,
Err(err) => {
shard_holder.before_drop().await;
return Err(err);
collection_shard_distribution::ShardType::Remote(peer_id) => {
create_shard_dir(path, shard_id)
.await
.and_then(|shard_path| {
RemoteShard::init(
shard_id,
id.clone(),
peer_id,
shard_path,
channel_service.clone(),
)
})
.map(Shard::Remote)
}
};
shard_holder.add_shard(shard_id, Shard::Local(shard));
}

for (shard_id, peer_id) in shard_distribution.remote {
let shard = create_shard_dir(path, shard_id)
.await
.and_then(|shard_path| {
RemoteShard::init(
collection_shard_distribution::ShardType::ReplicaSet { local, remote } => {
ReplicaSet::build(
shard_id,
id.clone(),
peer_id,
shard_path,
channel_service.clone(),
this_peer_id,
local,
remote,
on_replica_failure.clone(),
path,
shared_config.clone(),
)
});
.await
.map(Shard::ReplicaSet)
}
};
let shard = match shard {
Ok(shard) => shard,
Err(err) => {
shard_holder.before_drop().await;
return Err(err);
}
};
shard_holder.add_shard(shard_id, Shard::Remote(shard));
}
// This is a stub to check that types and hidden lifetimes fit
// It might be worth leaving it here for now until we have an actual impl
// TODO: Create ReplicaSet shards
if false {
let shard = ReplicaSet::new(
1,
1,
Default::default(),
Default::default(),
Default::default(),
0.0,
on_replica_failure,
);
shard_holder.add_shard(0, Shard::ReplicaSet(shard))
shard_holder.add_shard(shard_id, shard);
}

let locked_shard_holder = Arc::new(LockedShardHolder::new(shard_holder));
Expand Down Expand Up @@ -1037,24 +1039,22 @@ impl Collection {
Ok(points)
}

#[allow(unreachable_code, clippy::diverging_sub_expression)]
pub async fn update_params_from_diff(
&self,
params_diff: CollectionParamsDiff,
) -> CollectionResult<()> {
let mut config = self.config.write().await;
let old_repl_factor = config.params.replication_factor;
config.params = params_diff.update(&config.params)?;
self.handle_repl_factor_change(old_repl_factor, config.params.replication_factor);
self.handle_replica_changes(todo!("supply replica changes"));
Ok(())
}

pub fn handle_repl_factor_change(&self, old: NonZeroU32, new: NonZeroU32) {
if old != new {
// TODO: remove or add replicas. In case of replica addition:
// 1. Create and mark them as inactive
// 2. Copy data
// 3. Mark them as active
}
pub fn handle_replica_changes(&self, _replica_changes: HashSet<replica_set::Change>) {
// TODO: remove or add replicas. In case of replica addition:
// 1. Create and mark them as inactive
// 2. Copy data
// 3. Mark them as active
}

/// Updates shard optimization params:
Expand Down Expand Up @@ -1240,7 +1240,7 @@ impl Collection {
.map(|(shard_id, shard)| {
let shard_info = match shard {
Shard::ReplicaSet(replicas) => ShardInfo::ReplicaSet {
replicas: replicas.replica_state.clone(),
replicas: replicas.replica_state.deref().clone(),
},
shard => ShardInfo::Single(
*shard
Expand Down Expand Up @@ -1370,9 +1370,11 @@ impl Collection {
let shard_config_opt = ShardConfig::load(&shard_path)?;
if let Some(shard_config) = shard_config_opt {
match shard_config.r#type {
ShardType::Local => LocalShard::restore_snapshot(&shard_path)?,
ShardType::Remote { .. } => RemoteShard::restore_snapshot(&shard_path),
ShardType::Temporary => {}
shard_config::ShardType::Local => LocalShard::restore_snapshot(&shard_path)?,
shard_config::ShardType::Remote { .. } => {
RemoteShard::restore_snapshot(&shard_path)
}
shard_config::ShardType::Temporary => {}
}
} else {
return Err(CollectionError::service_error(format!(
Expand Down
4 changes: 2 additions & 2 deletions lib/collection/src/collection_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ impl State {
Ok(())
}

#[allow(unreachable_code, clippy::diverging_sub_expression)]
async fn apply_config(
new_config: CollectionConfig,
collection: &Collection,
Expand All @@ -74,9 +75,8 @@ impl State {
.await?;
// updating replication factor
let mut config = collection.config.write().await;
let old_repl_factor = config.params.replication_factor;
config.params.replication_factor = new_config.params.replication_factor;
collection.handle_repl_factor_change(old_repl_factor, config.params.replication_factor);
collection.handle_replica_changes(todo!("Calculate and add changes"));
Ok(())
}

Expand Down
2 changes: 0 additions & 2 deletions lib/collection/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ pub struct CollectionParams {
#[serde(default = "default_shard_number")]
pub shard_number: NonZeroU32,
/// Number of replicas for each shard
// TODO: do not skip in v1.0 (when replication ships)
#[serde(skip)]
#[serde(default = "default_replication_factor")]
pub replication_factor: NonZeroU32,
/// If true - point's payload will not be stored in memory.
Expand Down
Loading

0 comments on commit 9a3fad4

Please sign in to comment.