Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert experiment: ignore WAL clocks in partial state #5353

Merged
merged 6 commits into from
Nov 8, 2024
Next Next commit
Revert "Experiment: in stream records, set cutoff to latest clocks re…
…ceiver is guaranteed to have (#5375)"

This reverts commit e843647.
  • Loading branch information
timvisee committed Nov 8, 2024
commit 921016bc3e1a61be0594ac540375cd6fbddec09d
2 changes: 1 addition & 1 deletion lib/collection/src/shards/replica_set/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1116,8 +1116,8 @@ impl ReplicaState {
/// Check whether this is a state in which we ignore local clocks.
///
/// During some replica states, using clocks may create gaps. That'll be problematic if WAL
/// delta recovery is used later, resulting in missing operations. In these states we ignore
/// clocks all together to prevent this problem.
/// delta recovery is used later, resulting in missing operations. In these states we ignore
#[inline]
pub const fn is_ignore_local_clocks(self) -> bool {
match self {
Expand Down
49 changes: 31 additions & 18 deletions lib/collection/src/shards/transfer/stream_records.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ pub(super) async fn transfer_stream_records(
collection_id: &CollectionId,
) -> CollectionResult<()> {
let remote_peer_id = remote_shard.peer_id;
let cutoff;

log::debug!("Starting shard {shard_id} transfer to peer {remote_peer_id} by streaming records");

Expand Down Expand Up @@ -71,9 +70,6 @@ pub(super) async fn transfer_stream_records(
progress.lock().points_total = count_result.count;

replica_set.transfer_indexes().await?;

// Take our last seen clocks as cutoff point right before doing content batch transfers
cutoff = replica_set.shard_recovery_point().await?;
}

// Transfer contents batch by batch
Expand Down Expand Up @@ -110,20 +106,37 @@ pub(super) async fn transfer_stream_records(
}
}

// Update cutoff point on remote shard, disallow recovery before it
//
// We provide it our last seen clocks from just before transferrinmg the content batches, and
// not our current last seen clocks. We're sure that after the transfer the remote must have
// seen all point data for those clocks. While we cannot guarantee the remote has all point
// data for our current last seen clocks because some operations may still be in flight.
// This is a trade-off between being conservative and being too conservative.
//
// We must send a cutoff point to the remote so it can learn about all the clocks that exist.
// If we don't do this it is possible the remote will never see a clock, breaking all future
// WAL delta transfers.
remote_shard
.update_shard_cutoff_point(collection_id, remote_shard.id, &cutoff)
.await?;
// Update cutoff point on remote shard, disallow recovery before our current last seen
{
let shard_holder = shard_holder.read().await;
let Some(replica_set) = shard_holder.get_shard(shard_id) else {
// Forward proxy gone?!
// That would be a programming error.
return Err(CollectionError::service_error(format!(
"Shard {shard_id} is not found"
)));
};

let cutoff = replica_set.shard_recovery_point().await?;
let result = remote_shard
.update_shard_cutoff_point(collection_id, remote_shard.id, &cutoff)
.await;

// Warn and ignore if remote shard is running an older version, error otherwise
// TODO: this is fragile, improve this with stricter matches/checks
match result {
// This string match is fragile but there does not seem to be a better way
Err(err)
if err.to_string().starts_with(
"Service internal error: Tonic status error: status: Unimplemented",
) =>
{
log::warn!("Cannot update cutoff point on remote shard because it is running an older version, ignoring: {err}");
}
Err(err) => return Err(err),
Ok(()) => {}
}
}

// Synchronize all nodes
await_consensus_sync(consensus, &channel_service).await;
Expand Down