Skip to content

Commit

Permalink
Fix recover snapshot peer (qdrant#1534)
Browse files Browse the repository at this point in the history
* allow to recover distributed snapshot on local intance + allow recover non-existing collections

* async snapshot recovery

* fix tests

* be polite earlier
  • Loading branch information
generall committed Mar 15, 2023
1 parent 128e49f commit d7379f0
Show file tree
Hide file tree
Showing 16 changed files with 215 additions and 75 deletions.
11 changes: 10 additions & 1 deletion docs/redoc/master/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -1320,7 +1320,7 @@
"collections"
],
"summary": "Recover from a snapshot",
"description": "Recover local collection data from a snapshot. This will overwrite any data, stored on this node, for the collection.",
"description": "Recover local collection data from a snapshot. This will overwrite any data, stored on this node, for the collection. If collection does not exist - it will be created.",
"operationId": "recover_from_snapshot",
"parameters": [
{
Expand All @@ -1331,6 +1331,15 @@
"schema": {
"type": "string"
}
},
{
"name": "wait",
"in": "query",
"description": "If true, wait for changes to actually happen. If false - let changes happen in background. Default is true.",
"required": false,
"schema": {
"type": "boolean"
}
}
],
"requestBody": {
Expand Down
13 changes: 11 additions & 2 deletions lib/collection/src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1443,7 +1443,12 @@ impl Collection {
/// Restore collection from snapshot
///
/// This method performs blocking IO.
pub fn restore_snapshot(snapshot_path: &Path, target_dir: &Path) -> CollectionResult<()> {
pub fn restore_snapshot(
snapshot_path: &Path,
target_dir: &Path,
this_peer_id: PeerId,
is_distributed: bool,
) -> CollectionResult<()> {
// decompress archive
let archive_file = std::fs::File::open(snapshot_path)?;
let mut ar = tar::Archive::new(archive_file);
Expand All @@ -1463,7 +1468,11 @@ impl Collection {
}
shard_config::ShardType::Temporary => {}
shard_config::ShardType::ReplicaSet { .. } => {
ReplicaSetShard::restore_snapshot(&shard_path)?
ReplicaSetShard::restore_snapshot(
&shard_path,
this_peer_id,
is_distributed,
)?
}
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion lib/collection/src/shards/local_shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ impl LocalShard {
update_runtime: Handle,
) -> CollectionResult<LocalShard> {
// initialize local shard config file
let local_shard_config = ShardConfig::new_local();
let local_shard_config = ShardConfig::new_replica_set();
let shard =
Self::build(id, collection_id, shard_path, shared_config, update_runtime).await?;
local_shard_config.save(shard_path)?;
Expand Down
28 changes: 1 addition & 27 deletions lib/collection/src/shards/remote_shard.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::future::Future;
use std::path::{Path, PathBuf};
use std::path::Path;
use std::sync::Arc;

use api::grpc::qdrant::collections_internal_client::CollectionsInternalClient;
Expand Down Expand Up @@ -37,7 +37,6 @@ use crate::shards::conversions::{
internal_upsert_points, try_scored_point_from_grpc,
};
use crate::shards::shard::{PeerId, ShardId};
use crate::shards::shard_config::ShardConfig;
use crate::shards::shard_trait::ShardOperation;
use crate::shards::telemetry::RemoteShardTelemetry;
use crate::shards::CollectionId;
Expand Down Expand Up @@ -72,35 +71,10 @@ impl RemoteShard {
}
}

/// Initialize remote shard by persisting its info on the file system.
pub fn init(
id: ShardId,
collection_id: CollectionId,
peer_id: PeerId,
shard_path: PathBuf,
channel_service: ChannelService,
) -> CollectionResult<Self> {
// initialize remote shard config file
let shard_config = ShardConfig::new_remote(peer_id);
shard_config.save(&shard_path)?;
Ok(RemoteShard::new(
id,
collection_id,
peer_id,
channel_service,
))
}

pub fn restore_snapshot(_snapshot_path: &Path) {
// NO extra actions needed for remote shards
}

pub async fn create_snapshot(&self, target_path: &Path) -> CollectionResult<()> {
let shard_config = ShardConfig::new_remote(self.peer_id);
shard_config.save(target_path)?;
Ok(())
}

fn current_address(&self) -> CollectionResult<Uri> {
let guard_peer_address = self.channel_service.id_to_address.read();
let peer_address = guard_peer_address.get(&self.peer_id).cloned();
Expand Down
31 changes: 30 additions & 1 deletion lib/collection/src/shards/replica_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -959,9 +959,38 @@ impl ShardReplicaSet {
}
}

pub fn restore_snapshot(snapshot_path: &Path) -> CollectionResult<()> {
pub fn restore_snapshot(
snapshot_path: &Path,
this_peer_id: PeerId,
is_distributed: bool,
) -> CollectionResult<()> {
let replica_state: SaveOnDisk<ReplicaSetState> =
SaveOnDisk::load_or_init(snapshot_path.join(REPLICA_STATE_FILE))?;

// If this shard have local data
let is_snapshot_local = replica_state.read().is_local;

if !is_distributed && !is_snapshot_local {
return Err(CollectionError::service_error(format!(
"Can't restore snapshot is local mode with missing data at shard: {}",
snapshot_path.display()
)));
}

replica_state.write(|state| {
state.this_peer_id = this_peer_id;
if is_distributed {
state
.peers
.remove(&this_peer_id)
.and_then(|replica_state| state.peers.insert(this_peer_id, replica_state));
} else {
// In local mode we don't want any remote peers
state.peers.clear();
state.peers.insert(this_peer_id, ReplicaState::Active);
}
})?;

if replica_state.read().is_local {
LocalShard::restore_snapshot(snapshot_path)?;
}
Expand Down
21 changes: 3 additions & 18 deletions lib/collection/src/shards/shard_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ pub const SHARD_CONFIG_FILE: &str = "shard_config.json";

#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)]
pub enum ShardType {
Local,
Remote { peer_id: PeerId },
Temporary, // same as local, but not ready yet
Local, // Deprecated
Remote { peer_id: PeerId }, // Deprecated
Temporary, // Deprecated
ReplicaSet,
}

Expand All @@ -32,21 +32,6 @@ impl ShardConfig {
}
}

pub fn new_remote(peer_id: PeerId) -> Self {
let r#type = ShardType::Remote { peer_id };
Self { r#type }
}

pub fn new_local() -> Self {
let r#type = ShardType::Local;
Self { r#type }
}

pub fn new_temp() -> Self {
let r#type = ShardType::Temporary;
Self { r#type }
}

pub fn load(shard_path: &Path) -> CollectionResult<Option<Self>> {
let config_path = Self::get_config_path(shard_path);
if !config_path.exists() {
Expand Down
19 changes: 16 additions & 3 deletions lib/collection/src/tests/snapshot_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,24 @@ async fn test_snapshot_collection() {
.await
.unwrap();

Collection::restore_snapshot(
&snapshots_path.path().join(snapshot_description.name),
// Do not recover in local mode if some shards are remote
assert!(Collection::restore_snapshot(
&snapshots_path.path().join(&snapshot_description.name),
recover_dir.path(),
0,
false,
)
.unwrap();
.is_err());

if let Err(err) = Collection::restore_snapshot(
&snapshots_path.path().join(snapshot_description.name),
recover_dir.path(),
0,
true,
) {
collection.before_drop().await;
panic!("Failed to restore snapshot: {err}")
}

let mut recovered_collection = Collection::load(
collection_name_rec,
Expand Down
20 changes: 20 additions & 0 deletions lib/storage/src/content_manager/collection_meta_ops.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use collection::config::CollectionConfig;
use collection::operations::config_diff::{
CollectionParamsDiff, HnswConfigDiff, OptimizersConfigDiff, WalConfigDiff,
};
Expand Down Expand Up @@ -284,3 +285,22 @@ pub enum CollectionMetaOperations {
SetShardReplicaState(SetShardReplicaState),
Nop { token: usize }, // Empty operation
}

/// Use config of the existing collection to generate a create collection operation
/// for the new collection
impl From<CollectionConfig> for CreateCollection {
fn from(value: CollectionConfig) -> Self {
Self {
vectors: value.params.vectors,
shard_number: Some(value.params.shard_number.get()),
replication_factor: Some(value.params.replication_factor.get()),
write_consistency_factor: Some(value.params.write_consistency_factor.get()),
on_disk_payload: Some(value.params.on_disk_payload),
hnsw_config: Some(value.hnsw_config.into()),
wal_config: Some(value.wal_config.into()),
optimizers_config: Some(value.optimizer_config.into()),
init_from: None,
quantization_config: value.quantization_config,
}
}
}
4 changes: 2 additions & 2 deletions lib/storage/src/content_manager/consensus/persistent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub struct Persistent {
/// Last known cluster topology
#[serde(with = "serialize_peer_addresses")]
pub peer_address_by_id: Arc<RwLock<PeerAddressById>>,
pub this_peer_id: u64,
pub this_peer_id: PeerId,
#[serde(skip)]
pub path: PathBuf,
/// Tracks if there are some unsaved changes due to the failure on save
Expand Down Expand Up @@ -146,7 +146,7 @@ impl Persistent {
self.peer_address_by_id.read().clone()
}

pub fn this_peer_id(&self) -> u64 {
pub fn this_peer_id(&self) -> PeerId {
self.this_peer_id
}

Expand Down
55 changes: 50 additions & 5 deletions lib/storage/src/content_manager/snapshots/recover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ use collection::shards::shard::{PeerId, ShardId};
use collection::shards::shard_config::ShardType;
use collection::shards::shard_versioning::latest_shard_paths;

use crate::content_manager::collection_meta_ops::{
CollectionMetaOperations, CreateCollectionOperation,
};
use crate::content_manager::snapshots::download::{download_snapshot, downloaded_snapshots_dir};
use crate::dispatcher::Dispatcher;
use crate::{StorageError, TableOfContent};

async fn activate_shard(
Expand Down Expand Up @@ -44,13 +48,35 @@ async fn activate_shard(
}

pub async fn do_recover_from_snapshot(
toc: &TableOfContent,
dispatcher: &Dispatcher,
collection_name: &str,
source: SnapshotRecover,
wait: bool,
) -> Result<bool, StorageError> {
let dispatch = dispatcher.clone();
let collection_name = collection_name.to_string();
let recovery =
tokio::spawn(
async move { _do_recover_from_snapshot(dispatch, &collection_name, source).await },
);
if wait {
Ok(recovery.await??)
} else {
Ok(true)
}
}

async fn _do_recover_from_snapshot(
dispatcher: Dispatcher,
collection_name: &str,
source: SnapshotRecover,
) -> Result<bool, StorageError> {
let SnapshotRecover { location, priority } = source;
let toc = dispatcher.toc();

let this_peer_id = toc.this_peer_id;

let collection = toc.get_collection(collection_name).await?;
let is_distributed = toc.is_distributed();

let snapshot_download_path = downloaded_snapshots_dir(toc.snapshots_path());
tokio::fs::create_dir_all(&snapshot_download_path).await?;
Expand Down Expand Up @@ -85,12 +111,33 @@ pub async fn do_recover_from_snapshot(
let tmp_collection_dir_clone = tmp_collection_dir.clone();
let restoring = tokio::task::spawn_blocking(move || {
// Unpack snapshot collection to the target folder
Collection::restore_snapshot(&snapshot_path, &tmp_collection_dir_clone)
Collection::restore_snapshot(
&snapshot_path,
&tmp_collection_dir_clone,
this_peer_id,
is_distributed,
)
});
restoring.await??;

let snapshot_config = CollectionConfig::load(&tmp_collection_dir)?;

let collection = match toc.get_collection(collection_name).await.ok() {
Some(collection) => collection,
None => {
log::debug!("Collection {} does not exist, creating it", collection_name);
let operation =
CollectionMetaOperations::CreateCollection(CreateCollectionOperation::new(
collection_name.to_string(),
snapshot_config.clone().into(),
));
dispatcher
.submit_collection_meta_op(operation, None)
.await?;
toc.get_collection(collection_name).await?
}
};

let state = collection.state().await;

// Check config compatibility
Expand All @@ -110,8 +157,6 @@ pub async fn do_recover_from_snapshot(
}

// Deactivate collection local shards during recovery
let this_peer_id = toc.this_peer_id;

for (shard_id, shard_info) in &state.shards {
let local_shard_state = shard_info.replicas.get(&this_peer_id);
match local_shard_state {
Expand Down
9 changes: 9 additions & 0 deletions lib/storage/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,12 @@ impl Deref for Dispatcher {
self.toc.deref()
}
}

impl Clone for Dispatcher {
fn clone(&self) -> Self {
Self {
toc: self.toc.clone(),
consensus_state: self.consensus_state.clone(),
}
}
}
8 changes: 7 additions & 1 deletion openapi/openapi-snapshots.ytt.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ paths:
- snapshots
- collections
summary: Recover from a snapshot
description: Recover local collection data from a snapshot. This will overwrite any data, stored on this node, for the collection.
description: Recover local collection data from a snapshot. This will overwrite any data, stored on this node, for the collection. If collection does not exist - it will be created.
operationId: recover_from_snapshot
parameters:
- name: collection_name
Expand All @@ -16,6 +16,12 @@ paths:
required: true
schema:
type: string
- name: wait
in: query
description: "If true, wait for changes to actually happen. If false - let changes happen in background. Default is true."
required: false
schema:
type: boolean
requestBody:
description: Snapshot to recover from
content:
Expand Down
Loading

0 comments on commit d7379f0

Please sign in to comment.