Skip to content

Commit

Permalink
Update collection config, when applying Raft snapshot (qdrant#5367)
Browse files Browse the repository at this point in the history
Co-authored-by: generall <andrey@vasnetsov.com>
  • Loading branch information
2 people authored and timvisee committed Nov 8, 2024
1 parent e0b4c02 commit 9857797
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 11 deletions.
72 changes: 64 additions & 8 deletions lib/collection/src/collection/state_management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,21 @@ use crate::collection::payload_index_schema::PayloadIndexSchema;
use crate::collection::Collection;
use crate::collection_state::{ShardInfo, State};
use crate::config::CollectionConfig;
use crate::operations::types::CollectionResult;
use crate::operations::types::{CollectionError, CollectionResult};
use crate::shards::replica_set::ShardReplicaSet;
use crate::shards::shard::{PeerId, ShardId};
use crate::shards::shard_holder::{ShardKeyMapping, ShardTransferChange};
use crate::shards::transfer::ShardTransfer;

impl Collection {
pub async fn check_config_compatible(&self, config: &CollectionConfig) -> CollectionResult<()> {
self.collection_config
.read()
.await
.params
.check_compatible(&config.params)
}

pub async fn apply_state(
&self,
state: State,
Expand Down Expand Up @@ -64,18 +72,66 @@ impl Collection {
}

async fn apply_config(&self, new_config: CollectionConfig) -> CollectionResult<()> {
log::warn!("Applying only optimizers config snapshot. Other config updates are not yet implemented.");
self.update_optimizer_params(new_config.optimizer_config)
.await?;
let recreate_optimizers;

// Update replication factor
{
let mut config = self.collection_config.write().await;
config.params.replication_factor = new_config.params.replication_factor;
config.params.write_consistency_factor = new_config.params.write_consistency_factor;

if let Err(err) = config.params.check_compatible(&new_config.params) {
// Stop consensus with a service error, if new config is incompatible with current one.
//
// We expect that `apply_config` is only called when configs are compatible, otherwise
// collection have to be *recreated*.
return Err(CollectionError::service_error(err.to_string()));
}

// Destructure `new_config`, to ensure we compare all config fields. Compiler would
// complain, if new field is added to `CollectionConfig` struct, but not destructured
// explicitly. We have to explicitly compare config fields, because we want to compare
// `wal_config` and `strict_mode_config` independently of other fields.
let CollectionConfig {
params,
hnsw_config,
optimizer_config,
wal_config,
quantization_config,
strict_mode_config,
} = &new_config;

let is_core_config_updated = params != &config.params
|| hnsw_config != &config.hnsw_config
|| optimizer_config != &config.optimizer_config
|| quantization_config != &config.quantization_config;

let is_wal_config_updated = wal_config != &config.wal_config;
let is_strict_mode_config_updated = strict_mode_config != &config.strict_mode_config;

let is_config_updated =
is_core_config_updated || is_wal_config_updated || is_strict_mode_config_updated;

if !is_config_updated {
return Ok(());
}

if is_wal_config_updated {
log::warn!(
"WAL config of collection {} updated when applying Raft snapshot, \
but updated WAL config will only be applied on Qdrant restart",
self.id,
);
}

*config = new_config;

// We need to recreate optimizers, if "core" config was updated
recreate_optimizers = is_core_config_updated;
}

self.recreate_optimizers_blocking().await?;
self.collection_config.read().await.save(&self.path)?;

if recreate_optimizers {
self.recreate_optimizers_blocking().await?;
}

Ok(())
}
Expand Down
50 changes: 49 additions & 1 deletion lib/collection/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fs::File;
use std::io::{Read, Write};
use std::num::NonZeroU32;
Expand Down Expand Up @@ -116,6 +116,54 @@ impl CollectionParams {
PayloadStorageType::InMemory
}
}

pub fn check_compatible(&self, other: &CollectionParams) -> CollectionResult<()> {
let CollectionParams {
vectors,
shard_number: _, // Maybe be updated by resharding, assume local shards needs to be dropped
sharding_method, // Not changeable
replication_factor: _, // May be changed
write_consistency_factor: _, // May be changed
read_fan_out_factor: _, // May be changed
on_disk_payload: _, // May be changed
sparse_vectors, // Parameters may be changes, but not the structure
} = other;

self.vectors.check_compatible(vectors)?;

let this_sparse_vectors: HashSet<_> = if let Some(sparse_vectors) = &self.sparse_vectors {
sparse_vectors.keys().collect()
} else {
HashSet::new()
};

let other_sparse_vectors: HashSet<_> = if let Some(sparse_vectors) = sparse_vectors {
sparse_vectors.keys().collect()
} else {
HashSet::new()
};

if this_sparse_vectors != other_sparse_vectors {
return Err(CollectionError::bad_input(format!(
"sparse vectors are incompatible: \
origin sparse vectors: {this_sparse_vectors:?}, \
while other sparse vectors: {other_sparse_vectors:?}",
)));
}

let this_sharding_method = self.sharding_method.unwrap_or_default();
let other_sharding_method = sharding_method.unwrap_or_default();

if this_sharding_method != other_sharding_method {
return Err(CollectionError::bad_input(format!(
"sharding method is incompatible: \
origin sharding method: {this_sharding_method:?}, \
while other sharding method: {other_sharding_method:?}",
)));
}

Ok(())
}
}

impl Anonymize for CollectionParams {
Expand Down
28 changes: 26 additions & 2 deletions lib/storage/src/content_manager/toc/collection_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,24 @@ impl TableOfContent {
let mut collections = self.collections.write().await;

for (id, state) in &data.collections {
if let Some(collection) = collections.get(id) {
if let Err(err) = collection.check_config_compatible(&state.config).await {
log::warn!(
"Recreating collection {id}, because collection config is incompatible: \
{err}",
);

// Drop `collections` lock
drop(collections);

// Delete collection
self.delete_collection(id).await?;

// Re-acquire `collections` lock 🙄
collections = self.collections.write().await;
}
}

let collection_exists = collections.contains_key(id);

// Create collection if not present locally
Expand Down Expand Up @@ -221,8 +239,14 @@ impl TableOfContent {
}
}

// Remove collections that are present locally but are not in the snapshot state
for collection_name in collections.keys() {
// Collect names of collections that are present locally
let collection_names: Vec<_> = collections.keys().cloned().collect();

// Drop `collections` lock
drop(collections);

// Remove collections that are present locally, but are not in the snapshot state
for collection_name in &collection_names {
if !data.collections.contains_key(collection_name) {
log::debug!(
"Deleting collection {collection_name} \
Expand Down

0 comments on commit 9857797

Please sign in to comment.