Skip to content

Commit

Permalink
Experiment: ignore clock tags when replica is in partial state (qdran…
Browse files Browse the repository at this point in the history
…t#5349)

* Add internal interface to enable/disable clock tags on recoverable WAL

* Disable WAL clocks when switching replica into partial state

* Synchronize consensus at the end of stream records transfer

* Minor refactoring

* Do not send updates to replicas in recovery state

* Fix test compilation
  • Loading branch information
timvisee committed Nov 8, 2024
1 parent 7daf1aa commit 758dca6
Show file tree
Hide file tree
Showing 14 changed files with 91 additions and 31 deletions.
6 changes: 4 additions & 2 deletions lib/collection/src/collection/shard_transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ impl Collection {
};

if transfer.to == self.this_peer_id {
replica_set.set_replica_state(transfer.to, state)?;
replica_set.set_replica_state(transfer.to, state).await?;
} else {
replica_set.add_remote(transfer.to, state).await?;
}
Expand Down Expand Up @@ -319,7 +319,9 @@ impl Collection {
// and so failed transfer does not introduce any inconsistencies to points
// that are not affected by resharding in all other shards
} else if transfer.sync {
replica_set.set_replica_state(transfer.to, ReplicaState::Dead)?;
replica_set
.set_replica_state(transfer.to, ReplicaState::Dead)
.await?;
} else {
replica_set.remove_peer(transfer.to).await?;
}
Expand Down
4 changes: 4 additions & 0 deletions lib/collection/src/shards/forward_proxy_shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ impl ForwardProxyShard {
pub fn update_tracker(&self) -> &UpdateTracker {
self.wrapped_shard.update_tracker()
}

pub fn set_clocks_enabled(&self, enabled: bool) {
self.wrapped_shard.set_clocks_enabled(enabled);
}
}

#[async_trait]
Expand Down
4 changes: 4 additions & 0 deletions lib/collection/src/shards/local_shard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1093,6 +1093,10 @@ impl LocalShard {
pub async fn update_cutoff(&self, cutoff: &RecoveryPoint) {
self.wal.update_cutoff(cutoff).await
}

pub fn set_clocks_enabled(&self, enabled: bool) {
self.wal.set_clocks_enabled(enabled);
}
}

impl Drop for LocalShard {
Expand Down
4 changes: 4 additions & 0 deletions lib/collection/src/shards/proxy_shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ impl ProxyShard {
pub fn update_tracker(&self) -> &UpdateTracker {
self.wrapped_shard.update_tracker()
}

pub fn set_clocks_enabled(&self, enabled: bool) {
self.wrapped_shard.set_clocks_enabled(enabled);
}
}

#[async_trait]
Expand Down
6 changes: 6 additions & 0 deletions lib/collection/src/shards/queue_proxy_shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,12 @@ impl QueueProxyShard {

(queue_proxy.wrapped_shard, queue_proxy.remote_shard)
}

pub fn set_clocks_enabled(&self, enabled: bool) {
self.inner_unchecked()
.wrapped_shard
.set_clocks_enabled(enabled);
}
}

#[async_trait]
Expand Down
14 changes: 12 additions & 2 deletions lib/collection/src/shards/replica_set/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -602,15 +602,19 @@ impl ShardReplicaSet {
state: ReplicaState,
) -> CollectionResult<()> {
if peer_id == self.this_peer_id() {
self.set_replica_state(peer_id, state)?;
self.set_replica_state(peer_id, state).await?;
} else {
// Create remote shard if necessary
self.add_remote(peer_id, state).await?;
}
Ok(())
}

pub fn set_replica_state(&self, peer_id: PeerId, state: ReplicaState) -> CollectionResult<()> {
pub async fn set_replica_state(
&self,
peer_id: PeerId,
state: ReplicaState,
) -> CollectionResult<()> {
log::debug!(
"Changing local shard {}:{} state from {:?} to {state:?}",
self.collection_id,
Expand All @@ -624,6 +628,12 @@ impl ShardReplicaSet {
}
rs.set_peer_state(peer_id, state);
})?;

// Disable WAL clocks only if the replica is in Partial state
if let Some(local) = self.local.read().await.as_ref() {
local.set_clocks_enabled(state != ReplicaState::Partial);
}

self.update_locally_disabled(peer_id);
Ok(())
}
Expand Down
4 changes: 1 addition & 3 deletions lib/collection/src/shards/replica_set/shard_transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,9 +412,7 @@ impl ShardReplicaSet {
return Ok(());
};

proxy.transfer_all_missed_updates().await?;

Ok(())
proxy.transfer_all_missed_updates().await
}

/// Send all queue proxy updates to remote and transform into forward proxy
Expand Down
19 changes: 9 additions & 10 deletions lib/collection/src/shards/replica_set/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,12 +424,9 @@ impl ShardReplicaSet {
Some(ReplicaState::Partial) => true,
Some(ReplicaState::Initializing) => true,
Some(ReplicaState::Listener) => true,
// Recovery: keep sending updates to prevent a data race
// The replica on the peer may still be active for some time if its consensus is slow.
// The peer may respond to read requests until it switches to recovery state too. We
// must keep sending updates to prevent those reads being stale.
// See: <https://github.com/qdrant/qdrant/pull/5298>
Some(ReplicaState::Recovery | ReplicaState::PartialSnapshot) => true,
// We must not send updates to replicas in recovery state.
// If we do we might create gaps in WAL clock tags.
Some(ReplicaState::Recovery | ReplicaState::PartialSnapshot) => false,
Some(ReplicaState::Resharding) => true,
Some(ReplicaState::Dead) | None => false,
};
Expand Down Expand Up @@ -563,10 +560,12 @@ mod tests {
// at build time the replicas are all dead, they need to be activated
assert_eq!(rs.highest_alive_replica_peer_id(), None);

rs.set_replica_state(1, ReplicaState::Active).unwrap();
rs.set_replica_state(3, ReplicaState::Active).unwrap();
rs.set_replica_state(4, ReplicaState::Active).unwrap();
rs.set_replica_state(5, ReplicaState::Partial).unwrap();
rs.set_replica_state(1, ReplicaState::Active).await.unwrap();
rs.set_replica_state(3, ReplicaState::Active).await.unwrap();
rs.set_replica_state(4, ReplicaState::Active).await.unwrap();
rs.set_replica_state(5, ReplicaState::Partial)
.await
.unwrap();

assert_eq!(rs.highest_replica_peer_id(), Some(5));
assert_eq!(rs.highest_alive_replica_peer_id(), Some(4));
Expand Down
10 changes: 10 additions & 0 deletions lib/collection/src/shards/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,4 +251,14 @@ impl Shard {
}
}
}

pub fn set_clocks_enabled(&self, enabled: bool) {
match self {
Self::Local(local_shard) => local_shard.set_clocks_enabled(enabled),
Self::Proxy(proxy_shard) => proxy_shard.set_clocks_enabled(enabled),
Self::ForwardProxy(proxy_shard) => proxy_shard.set_clocks_enabled(enabled),
Self::QueueProxy(proxy_shard) => proxy_shard.set_clocks_enabled(enabled),
Self::Dummy(_) => (),
}
}
}
1 change: 1 addition & 0 deletions lib/collection/src/shards/shard_holder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,7 @@ impl ShardHolder {
log::warn!("Local shard {collection_id}:{} stuck in Initializing state, changing to Active", replica_set.shard_id);
replica_set
.set_replica_state(local_peer_id, ReplicaState::Active)
.await
.expect("Failed to set local shard state");
}
let shard_key = shard_id_to_key_mapping.get(&shard_id).cloned();
Expand Down
2 changes: 1 addition & 1 deletion lib/collection/src/shards/shard_holder/resharding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ impl ShardHolder {
// Revert replicas in `Resharding` state back into `Active` state
for (peer, state) in shard.peers() {
if state == ReplicaState::Resharding {
shard.set_replica_state(peer, ReplicaState::Active)?;
shard.set_replica_state(peer, ReplicaState::Active).await?;
}
}

Expand Down
2 changes: 2 additions & 0 deletions lib/collection/src/shards/transfer/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ pub async fn transfer_shard(
progress,
local_shard_id,
remote_shard,
channel_service,
consensus,
&collection_id,
)
.await?;
Expand Down
9 changes: 8 additions & 1 deletion lib/collection/src/shards/transfer/stream_records.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ use common::counter::hardware_accumulator::HwMeasurementAcc;
use parking_lot::Mutex;

use super::transfer_tasks_pool::TransferTaskProgress;
use super::ShardTransferConsensus;
use crate::operations::types::{CollectionError, CollectionResult, CountRequestInternal};
use crate::shards::channel_service::ChannelService;
use crate::shards::remote_shard::RemoteShard;
use crate::shards::shard::ShardId;
use crate::shards::shard_holder::LockedShardHolder;
use crate::shards::CollectionId;
use crate::shards::{await_consensus_sync, CollectionId};

pub(super) const TRANSFER_BATCH_SIZE: usize = 100;

Expand All @@ -28,6 +30,8 @@ pub(super) async fn transfer_stream_records(
progress: Arc<Mutex<TransferTaskProgress>>,
shard_id: ShardId,
remote_shard: RemoteShard,
channel_service: ChannelService,
consensus: &dyn ShardTransferConsensus,
collection_id: &CollectionId,
) -> CollectionResult<()> {
let remote_peer_id = remote_shard.peer_id;
Expand Down Expand Up @@ -134,6 +138,9 @@ pub(super) async fn transfer_stream_records(
}
}

// Synchronize all nodes
await_consensus_sync(consensus, &channel_service).await;

log::debug!("Ending shard {shard_id} transfer to peer {remote_peer_id} by streaming records");

Ok(())
Expand Down
37 changes: 25 additions & 12 deletions lib/collection/src/wal_delta.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::fmt::Debug;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use parking_lot::{Mutex as ParkingMutex, MutexGuard as ParkingMutexGuard};
Expand Down Expand Up @@ -28,6 +29,13 @@ pub struct RecoverableWal {
/// - (so if we advance these clocks, we have to advance `newest_clocks` as well)
/// - this WAL cannot resolve any delta below any of these clocks
pub(super) oldest_clocks: Arc<Mutex<ClockMap>>,

/// Whether clock tags are enabled.
///
/// If enabled, the newest seen clocks are updated and operations with old clock tags may be
/// rejected. If not enabled, all operations will be accepted without affecting the newest
/// clocks.
clocks_enabled: AtomicBool,
}

impl RecoverableWal {
Expand All @@ -40,6 +48,7 @@ impl RecoverableWal {
wal,
newest_clocks: highest_clocks,
oldest_clocks: cutoff_clocks,
clocks_enabled: AtomicBool::new(true),
}
}

Expand All @@ -55,18 +64,17 @@ impl RecoverableWal {
operation: &mut OperationWithClockTag,
) -> crate::wal::Result<(u64, ParkingMutexGuard<'a, SerdeWal<OperationWithClockTag>>)> {
// Update last seen clock map and correct clock tag if necessary
if let Some(clock_tag) = &mut operation.clock_tag {
// TODO: Do not manually advance here!
//
// TODO: What does the above `TODO` mean? "Make sure to call `advance_clock_and_correct_tag`, but not `advance_clock`?"
let operation_accepted = self
.newest_clocks
.lock()
.await
.advance_clock_and_correct_tag(clock_tag);

if !operation_accepted {
return Err(crate::wal::WalError::ClockRejected);
if self.clocks_enabled.load(Ordering::Acquire) {
if let Some(clock_tag) = &mut operation.clock_tag {
let operation_accepted = self
.newest_clocks
.lock()
.await
.advance_clock_and_correct_tag(clock_tag);

if !operation_accepted {
return Err(crate::wal::WalError::ClockRejected);
}
}
}

Expand Down Expand Up @@ -146,6 +154,11 @@ impl RecoverableWal {
}
Ok(())
}

// TODO(timvisee): provide a better interface for this, pass param into `lock_and_write` instead?
pub fn set_clocks_enabled(&self, enabled: bool) {
self.clocks_enabled.store(enabled, Ordering::Release);
}
}

/// Resolve the WAL delta for the given `recovery_point`
Expand Down

0 comments on commit 758dca6

Please sign in to comment.