Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Init collection from #1364

Merged
merged 11 commits into from
Jan 22, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/grpc/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@
| vectors_config | [VectorsConfig](#qdrant-VectorsConfig) | optional | Configuration for vectors |
| replication_factor | [uint32](#uint32) | optional | Number of replicas of each shard that network tries to maintain, default = 1 |
| write_consistency_factor | [uint32](#uint32) | optional | How many replicas should apply the operation for us to consider it successful, default = 1 |
| init_from_collection | [string](#string) | optional | Specify name of the other collection to copy data from |



Expand Down
24 changes: 24 additions & 0 deletions docs/redoc/master/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -4261,6 +4261,18 @@
"nullable": true
}
]
},
"init_from": {
"description": "Specify other collection to copy data from.",
"default": null,
"anyOf": [
{
"$ref": "#/components/schemas/InitFrom"
},
{
"nullable": true
}
]
}
}
},
Expand Down Expand Up @@ -4391,6 +4403,18 @@
}
}
},
"InitFrom": {
"description": "Operation for creating new collection and (optionally) specify index params",
"type": "object",
"required": [
"collection"
],
"properties": {
"collection": {
"type": "string"
}
}
},
"UpdateCollection": {
"description": "Operation for updating parameters of the existing collection",
"type": "object",
Expand Down
1 change: 1 addition & 0 deletions lib/api/src/grpc/proto/collections.proto
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ message CreateCollection {
optional VectorsConfig vectors_config = 10; // Configuration for vectors
optional uint32 replication_factor = 11; // Number of replicas of each shard that network tries to maintain, default = 1
optional uint32 write_consistency_factor = 12; // How many replicas should apply the operation for us to consider it successful, default = 1
optional string init_from_collection = 13; // Specify name of the other collection to copy data from
}

message UpdateCollection {
Expand Down
3 changes: 3 additions & 0 deletions lib/api/src/grpc/qdrant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ pub struct CreateCollection {
/// How many replicas should apply the operation for us to consider it successful, default = 1
#[prost(uint32, optional, tag = "12")]
pub write_consistency_factor: ::core::option::Option<u32>,
/// Specify name of the other collection to copy data from
#[prost(string, optional, tag = "13")]
pub init_from_collection: ::core::option::Option<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
43 changes: 42 additions & 1 deletion lib/collection/src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ use semver::Version;
use tar::Builder as TarBuilder;
use tokio::fs::{copy, create_dir_all, remove_dir_all, remove_file, rename};
use tokio::runtime::Handle;
use tokio::sync::{Mutex, RwLock};
use tokio::sync::{Mutex, RwLock, RwLockWriteGuard};

use crate::collection_state::{ShardInfo, State};
use crate::common::is_ready::IsReady;
use crate::config::CollectionConfig;
use crate::hash_ring::HashRing;
use crate::operations::config_diff::{CollectionParamsDiff, DiffConfig, OptimizersConfigDiff};
Expand Down Expand Up @@ -81,6 +82,13 @@ pub struct Collection {
#[allow(dead_code)] //Might be useful in case of repartition implementation
notify_peer_failure_cb: OnPeerFailure,
init_time: Duration,
// One-way boolean flag that is set to true when the collection is fully initialized
// i.e. all shards are activated for the first time.
is_initialized: Arc<IsReady>,
// Lock to temporary block collection update operations while the collection is being migrated.
// Lock is acquired for read on update operation and can be acquired for write externally,
// which will block all update operations until the lock is released.
updates_lock: RwLock<()>,
}

impl Collection {
Expand Down Expand Up @@ -151,6 +159,8 @@ impl Collection {
request_shard_transfer_cb: request_shard_transfer.clone(),
notify_peer_failure_cb: on_replica_failure.clone(),
init_time: start_time.elapsed(),
is_initialized: Arc::new(Default::default()),
updates_lock: RwLock::new(()),
})
}

Expand Down Expand Up @@ -250,6 +260,8 @@ impl Collection {
request_shard_transfer_cb: request_shard_transfer.clone(),
notify_peer_failure_cb: on_replica_failure,
init_time: start_time.elapsed(),
is_initialized: Arc::new(Default::default()),
updates_lock: RwLock::new(()),
}
}

Expand Down Expand Up @@ -328,6 +340,25 @@ impl Collection {
}
}

if !self.is_initialized.check_ready() {
// If not initialized yet, we need to check if it was initialized by this call
let state = self.state().await;
let mut is_fully_active = true;
for (_shard_id, shard_info) in state.shards {
if shard_info
.replicas
.into_iter()
.any(|(_peer_id, state)| state != ReplicaState::Active)
{
is_fully_active = false;
break;
}
}
if is_fully_active {
self.is_initialized.make_ready();
}
}

// Try to request shard transfer if replicas on the current peer are dead
if state == ReplicaState::Dead && self.this_peer_id == peer_id {
let transfer_from = replica_set
Expand Down Expand Up @@ -624,6 +655,7 @@ impl Collection {
shard_selection: ShardId,
wait: bool,
) -> CollectionResult<UpdateResult> {
let _update_lock = self.updates_lock.read().await;
let shard_holder_guard = self.shards_holder.read().await;

let res = match shard_holder_guard.get_shard(&shard_selection) {
Expand All @@ -647,6 +679,7 @@ impl Collection {
wait: bool,
) -> CollectionResult<UpdateResult> {
operation.validate()?;
let _update_lock = self.updates_lock.read().await;

let mut results = {
let shards_holder = self.shards_holder.read().await;
Expand Down Expand Up @@ -1418,6 +1451,14 @@ impl Collection {

Ok(())
}

pub fn wait_collection_initiated(&self, timeout: Duration) -> bool {
self.is_initialized.await_ready_for_timeout(timeout)
}

pub async fn lock_updates(&self) -> RwLockWriteGuard<()> {
self.updates_lock.write().await
}
}

impl Drop for Collection {
Expand Down
1 change: 1 addition & 0 deletions lib/collection/src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod is_ready;
pub mod stoppable_task;
pub mod stoppable_task_async;
2 changes: 1 addition & 1 deletion lib/collection/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
pub mod collection;
pub mod collection_manager;
pub mod collection_state;
mod common;
pub mod common;
pub mod config;
pub mod hash_ring;
pub mod operations;
Expand Down
10 changes: 10 additions & 0 deletions lib/storage/src/content_manager/collection_meta_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@ impl From<RenameAlias> for AliasOperations {
}
}

/// Operation for creating new collection and (optionally) specify index params
#[derive(Debug, Deserialize, Serialize, JsonSchema, PartialEq, Eq, Hash, Clone)]
#[serde(rename_all = "snake_case")]
pub struct InitFrom {
pub collection: CollectionId,
}

/// Operation for creating new collection and (optionally) specify index params
#[derive(Debug, Deserialize, Serialize, JsonSchema, PartialEq, Eq, Hash, Clone)]
#[serde(rename_all = "snake_case")]
Expand Down Expand Up @@ -120,6 +127,9 @@ pub struct CreateCollection {
pub wal_config: Option<WalConfigDiff>,
/// Custom params for Optimizers. If none - values from service configuration file are used.
pub optimizers_config: Option<OptimizersConfigDiff>,
/// Specify other collection to copy data from.
#[serde(default)]
pub init_from: Option<InitFrom>,
}

/// Operation for creating new collection and (optionally) specify index params
Expand Down
3 changes: 2 additions & 1 deletion lib/storage/src/content_manager/collections_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ use std::collections::HashMap;

use async_trait::async_trait;
use collection::collection::Collection;
use collection::shards::CollectionId;

use crate::content_manager::errors::StorageError;

pub type Collections = HashMap<String, Collection>;
pub type Collections = HashMap<CollectionId, Collection>;

#[async_trait]
pub trait Checker {
Expand Down
1 change: 0 additions & 1 deletion lib/storage/src/content_manager/consensus/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
pub mod consensus_wal;
pub mod entry_queue;
pub mod is_ready;
pub mod operation_sender;
pub mod persistent;
2 changes: 1 addition & 1 deletion lib/storage/src/content_manager/consensus_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::time::Duration;
use anyhow::{anyhow, Context};
use chrono::Utc;
use collection::collection_state;
use collection::common::is_ready::IsReady;
use collection::shards::shard::PeerId;
use collection::shards::CollectionId;
use futures::future::join_all;
Expand All @@ -27,7 +28,6 @@ use super::errors::StorageError;
use super::CollectionContainer;
use crate::content_manager::consensus::consensus_wal::ConsensusOpWal;
use crate::content_manager::consensus::entry_queue::EntryId;
use crate::content_manager::consensus::is_ready::IsReady;
use crate::content_manager::consensus::operation_sender::OperationSender;
use crate::content_manager::consensus::persistent::Persistent;
use crate::types::{
Expand Down
5 changes: 4 additions & 1 deletion lib/storage/src/content_manager/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tonic::Status;
use crate::content_manager::collection_meta_ops::{
AliasOperations, ChangeAliasesOperation, CollectionMetaOperations, CreateAlias,
CreateAliasOperation, CreateCollection, CreateCollectionOperation, DeleteAlias,
DeleteAliasOperation, DeleteCollectionOperation, RenameAlias, RenameAliasOperation,
DeleteAliasOperation, DeleteCollectionOperation, InitFrom, RenameAlias, RenameAliasOperation,
UpdateCollection, UpdateCollectionOperation,
};
use crate::content_manager::errors::StorageError;
Expand Down Expand Up @@ -56,6 +56,9 @@ impl TryFrom<api::grpc::qdrant::CreateCollection> for CollectionMetaOperations {
on_disk_payload: value.on_disk_payload,
replication_factor: value.replication_factor,
write_consistency_factor: value.write_consistency_factor,
init_from: value
.init_from_collection
.map(|v| InitFrom { collection: v }),
},
)))
}
Expand Down
Loading