Skip to content

Commit

Permalink
Peer removal API (qdrant#894)
Browse files Browse the repository at this point in the history
* Remove peer api

* Keep the same cluster status api for compatibility
  • Loading branch information
e-ivkov authored Aug 2, 2022
1 parent 76d9584 commit 0a63445
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 1 deletion.
69 changes: 69 additions & 0 deletions docs/redoc/master/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,75 @@
}
}
},
"/cluster/peer/{peer_id}": {
"delete": {
"tags": [
"cluster"
],
"summary": "Remove peer from the cluster",
"description": "Tries to remove peer from the cluster. Will return an error if peer has shards on it.",
"operationId": "remove_peer",
"parameters": [
{
"name": "peer_id",
"in": "path",
"description": "Id of the peer",
"required": true,
"schema": {
"type": "integer"
}
}
],
"responses": {
"default": {
"description": "error",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorResponse"
}
}
}
},
"4XX": {
"description": "error",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorResponse"
}
}
}
},
"200": {
"description": "successful operation",
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"time": {
"type": "number",
"format": "float",
"description": "Time spent to process this request"
},
"status": {
"type": "string",
"enum": [
"ok"
]
},
"result": {
"type": "boolean"
}
}
}
}
}
}
}
}
},
"/collections": {
"get": {
"tags": [
Expand Down
10 changes: 10 additions & 0 deletions lib/storage/src/content_manager/consensus/persistent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,16 @@ impl Persistent {
self.save()
}

pub fn remove_peer(&mut self, peer_id: PeerId) -> Result<(), StorageError> {
self.peer_address_by_id
.write()
.remove(&peer_id)
.ok_or_else(|| StorageError::NotFound {
description: format!("Peer with id {peer_id} not found"),
})?;
self.save()
}

pub fn last_applied_entry(&self) -> Option<u64> {
self.apply_progress_queue.get_last_applied()
}
Expand Down
14 changes: 14 additions & 0 deletions lib/storage/src/content_manager/consensus_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ impl<C: CollectionContainer> ConsensusState<C> {
})?,
)
.map(|()| true),
ConsensusOperations::RemovePeer(peer_id) => self.remove_peer(peer_id).map(|()| true),
};
if let Some(on_apply) = on_apply {
if on_apply.send(result.clone()).is_err() {
Expand Down Expand Up @@ -323,6 +324,15 @@ impl<C: CollectionContainer> ConsensusState<C> {
self.persistent.write().insert_peer(peer_id, uri)
}

pub fn remove_peer(&self, peer_id: PeerId) -> Result<(), StorageError> {
if self.toc.peer_has_shards(peer_id) {
return Err(StorageError::BadRequest {
description: format!("Cannot remove peer {peer_id} as there are shards on it"),
});
}
self.persistent.write().remove_peer(peer_id)
}

pub async fn propose_consensus_op(
&self,
operation: ConsensusOperations,
Expand Down Expand Up @@ -628,6 +638,10 @@ mod tests {
) -> Result<(), crate::content_manager::errors::StorageError> {
Ok(())
}

fn peer_has_shards(&self, _: u64) -> bool {
false
}
}

fn setup_storages(
Expand Down
5 changes: 5 additions & 0 deletions lib/storage/src/content_manager/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use collection::shard::PeerId;

use self::collection_meta_ops::CollectionMetaOperations;
use self::consensus_state::CollectionsSnapshot;
use self::errors::StorageError;
Expand Down Expand Up @@ -25,6 +27,7 @@ pub mod consensus_ops {
pub enum ConsensusOperations {
CollectionMeta(Box<CollectionMetaOperations>),
AddPeer(PeerId, String),
RemovePeer(PeerId),
}

impl TryFrom<&RaftEntry> for ConsensusOperations {
Expand All @@ -47,4 +50,6 @@ pub trait CollectionContainer {
fn collections_snapshot(&self) -> CollectionsSnapshot;

fn apply_collections_snapshot(&self, data: CollectionsSnapshot) -> Result<(), StorageError>;

fn peer_has_shards(&self, peer_id: PeerId) -> bool;
}
20 changes: 20 additions & 0 deletions lib/storage/src/content_manager/toc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,22 @@ impl TableOfContent {
}
result
}

async fn peer_has_shards(&self, peer_id: PeerId) -> bool {
for collection in self.collections.read().await.values() {
let state = collection.state(self.this_peer_id()).await;
let peers_with_shards: HashSet<_> = state.shard_to_peer.values().collect();
if peers_with_shards.contains(&peer_id) {
return true;
}
}
false
}

fn peer_has_shards_sync(&self, peer_id: PeerId) -> bool {
self.collection_management_runtime
.block_on(self.peer_has_shards(peer_id))
}
}

impl CollectionContainer for TableOfContent {
Expand All @@ -848,6 +864,10 @@ impl CollectionContainer for TableOfContent {
) -> Result<(), StorageError> {
self.apply_collections_snapshot(data)
}

fn peer_has_shards(&self, peer_id: PeerId) -> bool {
self.peer_has_shards_sync(peer_id)
}
}

// `TableOfContent` should not be dropped from async context.
Expand Down
16 changes: 16 additions & 0 deletions openapi/openapi-cluster.ytt.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,19 @@ paths:
description: Get information about the current state and composition of the cluster
operationId: cluster_status
responses: #@ response(reference("ClusterStatus"))

/cluster/peer/{peer_id}:
delete:
tags:
- cluster
summary: Remove peer from the cluster
description: Tries to remove peer from the cluster. Will return an error if peer has shards on it.
operationId: remove_peer
parameters:
- name: peer_id
in: path
description: Id of the peer
required: true
schema:
type: integer
responses: #@ response(type("boolean"))
20 changes: 19 additions & 1 deletion src/actix/api/cluster_api.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use actix_web::rt::time::Instant;
use actix_web::{get, web, Responder};
use actix_web::{delete, get, web, Responder};
use storage::content_manager::consensus_ops::ConsensusOperations;
use storage::content_manager::errors::StorageError;
use storage::dispatcher::Dispatcher;

use crate::actix::helpers::process_response;
Expand All @@ -11,6 +13,22 @@ async fn cluster_status(dispatcher: web::Data<Dispatcher>) -> impl Responder {
process_response(Ok(response), timing)
}

#[delete("/cluster/peer/{peer_id}")]
async fn remove_peer(dispatcher: web::Data<Dispatcher>, peer_id: web::Path<u64>) -> impl Responder {
let timing = Instant::now();
let response = match dispatcher.consensus_state() {
Some(consensus_state) => {
consensus_state
.propose_consensus_op(ConsensusOperations::RemovePeer(*peer_id), None)
.await
}
None => Err(StorageError::BadRequest {
description: "Distributed deployment is disabled.".to_string(),
}),
};
process_response(response, timing)
}

// Configure services
pub fn config_cluster_api(cfg: &mut web::ServiceConfig) {
cfg.service(cluster_status);
Expand Down

0 comments on commit 0a63445

Please sign in to comment.