Skip to content

Commit

Permalink
Prefer owned versus referenced usage of PeerId/ShardId, they are Copy (
Browse files Browse the repository at this point in the history
  • Loading branch information
timvisee committed Nov 8, 2024
1 parent 4904e62 commit 1128ac8
Show file tree
Hide file tree
Showing 23 changed files with 153 additions and 159 deletions.
3 changes: 1 addition & 2 deletions lib/collection/src/collection/collection_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ impl Collection {
Change::Remove(shard_id, peer_id) => (shard_id, peer_id),
};

let Some(replica_set) = shard_holder.get_shard(&shard_id) else {
let Some(replica_set) = shard_holder.get_shard(shard_id) else {
return Err(CollectionError::BadRequest {
description: format!("Shard {} of {} not found", shard_id, self.name()),
});
Expand Down Expand Up @@ -333,7 +333,6 @@ impl Collection {

// extract shards info
for (shard_id, replica_set) in shards_holder.get_shards() {
let shard_id = *shard_id;
let peers = replica_set.peers();

if replica_set.has_local_shard().await {
Expand Down
40 changes: 20 additions & 20 deletions lib/collection/src/collection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ impl Collection {
}

pub async fn contains_shard(&self, shard_id: ShardId) -> bool {
self.shards_holder.read().await.contains_shard(&shard_id)
self.shards_holder.read().await.contains_shard(shard_id)
}

pub async fn wait_local_shard_replica_state(
Expand All @@ -340,7 +340,7 @@ impl Collection {
) -> CollectionResult<()> {
let shard_holder_read = self.shards_holder.read().await;

let shard = shard_holder_read.get_shard(&shard_id);
let shard = shard_holder_read.get_shard(shard_id);
let Some(replica_set) = shard else {
return Err(CollectionError::NotFound {
what: "Shard {shard_id}".into(),
Expand All @@ -359,16 +359,16 @@ impl Collection {
) -> CollectionResult<()> {
let shard_holder = self.shards_holder.read().await;
let replica_set = shard_holder
.get_shard(&shard_id)
.get_shard(shard_id)
.ok_or_else(|| shard_not_found_error(shard_id))?;

log::debug!(
"Changing shard {}:{shard_id} replica state from {:?} to {state:?}",
self.id,
replica_set.peer_state(&peer_id),
replica_set.peer_state(peer_id),
);

let current_state = replica_set.peer_state(&peer_id);
let current_state = replica_set.peer_state(peer_id);

// Validation:
//
Expand All @@ -380,7 +380,7 @@ impl Collection {
.read()
.contains_key(&peer_id);

let replica_exists = replica_set.peer_state(&peer_id).is_some();
let replica_exists = replica_set.peer_state(peer_id).is_some();

if !peer_exists && !replica_exists {
return Err(CollectionError::bad_input(format!(
Expand Down Expand Up @@ -443,14 +443,14 @@ impl Collection {
}

replica_set
.ensure_replica_with_state(&peer_id, state)
.ensure_replica_with_state(peer_id, state)
.await?;

if state == ReplicaState::Dead {
// TODO(resharding): Abort all resharding transfers!?

// Terminate transfer if source or target replicas are now dead
let related_transfers = shard_holder.get_related_transfers(&shard_id, &peer_id);
let related_transfers = shard_holder.get_related_transfers(shard_id, peer_id);

// `abort_shard_transfer` locks `shard_holder`!
drop(shard_holder);
Expand Down Expand Up @@ -485,7 +485,7 @@ impl Collection {
pub async fn shard_recovery_point(&self, shard_id: ShardId) -> CollectionResult<RecoveryPoint> {
let shard_holder_read = self.shards_holder.read().await;

let shard = shard_holder_read.get_shard(&shard_id);
let shard = shard_holder_read.get_shard(shard_id);
let Some(replica_set) = shard else {
return Err(CollectionError::NotFound {
what: format!("Shard {shard_id}"),
Expand All @@ -502,7 +502,7 @@ impl Collection {
) -> CollectionResult<()> {
let shard_holder_read = self.shards_holder.read().await;

let shard = shard_holder_read.get_shard(&shard_id);
let shard = shard_holder_read.get_shard(shard_id);
let Some(replica_set) = shard else {
return Err(CollectionError::NotFound {
what: "Shard {shard_id}".into(),
Expand All @@ -524,7 +524,7 @@ impl Collection {
let shard_info = ShardInfo {
replicas: replicas.peers(),
};
(*shard_id, shard_info)
(shard_id, shard_info)
})
.collect(),
resharding,
Expand Down Expand Up @@ -579,7 +579,7 @@ impl Collection {
}

// Check for un-reported finished transfers
let outgoing_transfers = shard_holder.get_outgoing_transfers(&self.this_peer_id);
let outgoing_transfers = shard_holder.get_outgoing_transfers(self.this_peer_id);
let tasks_lock = self.transfer_tasks.lock().await;
for transfer in outgoing_transfers {
match tasks_lock
Expand Down Expand Up @@ -619,29 +619,29 @@ impl Collection {

// Check for proper replica states
for replica_set in shard_holder.all_shards() {
let this_peer_id = &replica_set.this_peer_id();
let this_peer_id = replica_set.this_peer_id();
let shard_id = replica_set.shard_id;

let peers = replica_set.peers();
let this_peer_state = peers.get(this_peer_id).copied();
let this_peer_state = peers.get(&this_peer_id).copied();
let is_last_active = peers.values().filter(|state| **state == Active).count() == 1;

if this_peer_state == Some(Initializing) {
// It is possible, that collection creation didn't report
// Try to activate shard, as the collection clearly exists
on_finish_init(*this_peer_id, shard_id);
on_finish_init(this_peer_id, shard_id);
continue;
}

if self.shared_storage_config.node_type == NodeType::Listener {
if this_peer_state == Some(Active) && !is_last_active {
// Convert active node from active to listener
on_convert_to_listener(*this_peer_id, shard_id);
on_convert_to_listener(this_peer_id, shard_id);
continue;
}
} else if this_peer_state == Some(Listener) {
// Convert listener node to active
on_convert_from_listener(*this_peer_id, shard_id);
on_convert_from_listener(this_peer_id, shard_id);
continue;
}

Expand All @@ -654,7 +654,7 @@ impl Collection {

// Respect shard transfer limit, consider already proposed transfers in our counts
let (mut incoming, outgoing) = shard_holder.count_shard_transfer_io(this_peer_id);
incoming += proposed.get(this_peer_id).copied().unwrap_or(0);
incoming += proposed.get(&this_peer_id).copied().unwrap_or(0);
if self.check_auto_shard_transfer_limit(incoming, outgoing) {
log::trace!("Postponing automatic shard {shard_id} transfer to stay below limit on this node (incoming: {incoming}, outgoing: {outgoing})");
continue;
Expand All @@ -680,7 +680,7 @@ impl Collection {
for replica_id in replica_set.active_remote_shards().await {
let transfer = ShardTransfer {
from: replica_id,
to: *this_peer_id,
to: this_peer_id,
shard_id,
to_shard_id: None,
sync: true,
Expand All @@ -693,7 +693,7 @@ impl Collection {
}

// Respect shard transfer limit, consider already proposed transfers in our counts
let (incoming, mut outgoing) = shard_holder.count_shard_transfer_io(&replica_id);
let (incoming, mut outgoing) = shard_holder.count_shard_transfer_io(replica_id);
outgoing += proposed.get(&replica_id).copied().unwrap_or(0);
if self.check_auto_shard_transfer_limit(incoming, outgoing) {
log::trace!("Postponing automatic shard {shard_id} transfer to stay below limit on peer {replica_id} (incoming: {incoming}, outgoing: {outgoing})");
Expand Down
2 changes: 1 addition & 1 deletion lib/collection/src/collection/point_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl Collection {
let result = tokio::task::spawn(async move {
let _update_lock = update_lock;

let Some(shard) = shard_holder.get_shard(&shard_selection) else {
let Some(shard) = shard_holder.get_shard(shard_selection) else {
return Ok(None);
};

Expand Down
2 changes: 1 addition & 1 deletion lib/collection/src/collection/resharding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ impl Collection {
if resharding_key.direction == ReshardingDirection::Down {
// Remove the shard we've now migrated all points out of
if let Some(shard_key) = &resharding_key.shard_key {
shard_holder.remove_shard_from_key_mapping(&resharding_key.shard_id, shard_key)?;
shard_holder.remove_shard_from_key_mapping(resharding_key.shard_id, shard_key)?;
}
shard_holder
.drop_and_remove_shard(resharding_key.shard_id)
Expand Down
28 changes: 14 additions & 14 deletions lib/collection/src/collection/shard_transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::shards::transfer::{
};

impl Collection {
pub async fn get_outgoing_transfers(&self, current_peer_id: &PeerId) -> Vec<ShardTransfer> {
pub async fn get_outgoing_transfers(&self, current_peer_id: PeerId) -> Vec<ShardTransfer> {
self.shards_holder
.read()
.await
Expand Down Expand Up @@ -67,10 +67,10 @@ impl Collection {
.unwrap_or(shard_transfer.shard_id);

let shards_holder = self.shards_holder.read().await;
let from_replica_set = shards_holder.get_shard(&from_shard_id).ok_or_else(|| {
let from_replica_set = shards_holder.get_shard(from_shard_id).ok_or_else(|| {
CollectionError::service_error(format!("Shard {from_shard_id} doesn't exist"))
})?;
let to_replica_set = shards_holder.get_shard(&to_shard_id).ok_or_else(|| {
let to_replica_set = shards_holder.get_shard(to_shard_id).ok_or_else(|| {
CollectionError::service_error(format!("Shard {to_shard_id} doesn't exist"))
})?;
let _was_not_transferred =
Expand Down Expand Up @@ -112,7 +112,7 @@ impl Collection {
debug_assert!(old_shard.is_none(), "We should not have a local shard yet");
} else {
to_replica_set
.ensure_replica_with_state(&shard_transfer.to, initial_state)
.ensure_replica_with_state(shard_transfer.to, initial_state)
.await?;
}

Expand Down Expand Up @@ -206,10 +206,10 @@ impl Collection {
let mut is_dest_replica_active = false;

let dest_replica_set =
shard_holder.get_shard(&transfer.to_shard_id.unwrap_or(transfer.shard_id));
shard_holder.get_shard(transfer.to_shard_id.unwrap_or(transfer.shard_id));

if let Some(replica_set) = dest_replica_set {
if replica_set.peer_state(&transfer.to).is_some() {
if replica_set.peer_state(transfer.to).is_some() {
// Promote *destination* replica/shard to `Active` if:
//
// - replica *exists*
Expand All @@ -227,7 +227,7 @@ impl Collection {
};

if transfer.to == self.this_peer_id {
replica_set.set_replica_state(&transfer.to, state)?;
replica_set.set_replica_state(transfer.to, state)?;
} else {
replica_set.add_remote(transfer.to, state).await?;
}
Expand All @@ -237,7 +237,7 @@ impl Collection {
}

// Handle *source* replica
let src_replica_set = shard_holder.get_shard(&transfer.shard_id);
let src_replica_set = shard_holder.get_shard(transfer.shard_id);

if let Some(replica_set) = src_replica_set {
if transfer.sync || is_resharding_transfer {
Expand Down Expand Up @@ -303,8 +303,8 @@ impl Collection {

let shard_id = transfer_key.to_shard_id.unwrap_or(transfer_key.shard_id);

if let Some(replica_set) = shard_holder.get_shard(&shard_id) {
if replica_set.peer_state(&transfer.to).is_some() {
if let Some(replica_set) = shard_holder.get_shard(shard_id) {
if replica_set.peer_state(transfer.to).is_some() {
if is_resharding_transfer {
// If *resharding* shard transfer failed, we don't need/want to change replica state:
// - on transfer failure, the whole resharding would be aborted (see below),
Expand All @@ -319,7 +319,7 @@ impl Collection {
// and so failed transfer does not introduce any inconsistencies to points
// that are not affected by resharding in all other shards
} else if transfer.sync {
replica_set.set_replica_state(&transfer.to, ReplicaState::Dead)?;
replica_set.set_replica_state(transfer.to, ReplicaState::Dead)?;
} else {
replica_set.remove_peer(transfer.to).await?;
}
Expand Down Expand Up @@ -362,7 +362,7 @@ impl Collection {
async move {
let shards_holder = shards_holder.read_owned().await;

let Some(replica_set) = shards_holder.get_shard(&shard_id) else {
let Some(replica_set) = shards_holder.get_shard(shard_id) else {
return Err(CollectionError::service_error(format!(
"Shard {shard_id} doesn't exist, repartition is not supported yet"
)));
Expand Down Expand Up @@ -390,7 +390,7 @@ impl Collection {
// We can guarantee that replica_set is not None, cause we checked it before
// and `shards_holder` is holding the lock.
// This is a workaround for lifetime checker.
let replica_set = shards_holder.get_shard(&shard_id).unwrap();
let replica_set = shards_holder.get_shard(shard_id).unwrap();
let shard_transfer_registered = shards_holder.shard_transfers.wait_for(
|shard_transfers| {
shard_transfers.iter().any(|shard_transfer| {
Expand All @@ -409,7 +409,7 @@ impl Collection {
&& replica_set.wait_for_state_condition_sync(
|state| {
state
.get_peer_state(&this_peer_id)
.get_peer_state(this_peer_id)
.map_or(false, |peer_state| peer_state.is_partial_or_recovery())
},
defaults::CONSENSUS_META_OP_WAIT,
Expand Down
4 changes: 2 additions & 2 deletions lib/collection/src/collection/snapshots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl Collection {
// Create snapshot of each shard
for (shard_id, replica_set) in shards_holder.get_shards() {
let shard_snapshot_path =
shard_versioning::versioned_shard_path(Path::new(""), *shard_id, 0);
shard_versioning::versioned_shard_path(Path::new(""), shard_id, 0);

// If node is listener, we can save whatever currently is in the storage
let save_wal = self.shared_storage_config.node_type != NodeType::Listener;
Expand Down Expand Up @@ -284,7 +284,7 @@ impl Collection {
) -> CollectionResult<SnapshotStream> {
let shard = OwnedRwLockReadGuard::try_map(
Arc::clone(&self.shards_holder).read_owned().await,
|x| x.get_shard(&shard_id),
|x| x.get_shard(shard_id),
)
.map_err(|_| shard_not_found_error(shard_id))?;

Expand Down
2 changes: 1 addition & 1 deletion lib/collection/src/collection/state_management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl Collection {
// and create new shards if needed

for (shard_id, shard_info) in shards {
match self.shards_holder.read().await.get_shard(&shard_id) {
match self.shards_holder.read().await.get_shard(shard_id) {
Some(replica_set) => replica_set.apply_state(shard_info.replicas).await?,
None => {
let shard_replicas: Vec<_> = shard_info.replicas.keys().copied().collect();
Expand Down
10 changes: 5 additions & 5 deletions lib/collection/src/shards/replica_set/execute_read_operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ impl ShardReplicaSet {

let read_consistency = read_consistency.unwrap_or_default();

let local_count = usize::from(self.peer_state(&self.this_peer_id()).is_some());
let active_local_count = usize::from(self.peer_is_active(&self.this_peer_id()));
let local_count = usize::from(self.peer_state(self.this_peer_id()).is_some());
let active_local_count = usize::from(self.peer_is_active(self.this_peer_id()));

let remotes = self.remotes.read().await;

Expand All @@ -65,7 +65,7 @@ impl ShardReplicaSet {
// TODO(resharding): Handle resharded shard?
let active_remotes_count = remotes
.iter()
.filter(|remote| self.peer_is_active(&remote.peer_id))
.filter(|remote| self.peer_is_active(remote.peer_id))
.count();

let total_count = local_count + remotes_count;
Expand Down Expand Up @@ -159,7 +159,7 @@ impl ShardReplicaSet {
Err(_) => (self.local.read().right_future(), false, None),
};

let local_is_active = self.peer_is_active(&self.this_peer_id());
let local_is_active = self.peer_is_active(self.this_peer_id());

let local_operation = if local_is_active {
let local_operation = async {
Expand All @@ -183,7 +183,7 @@ impl ShardReplicaSet {
// TODO(resharding): Handle resharded shard?
let mut active_remotes: Vec<_> = remotes
.iter()
.filter(|remote| self.peer_is_active(&remote.peer_id))
.filter(|remote| self.peer_is_active(remote.peer_id))
.collect();

active_remotes.shuffle(&mut rand::thread_rng());
Expand Down
Loading

0 comments on commit 1128ac8

Please sign in to comment.