Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Buffered connection management for collator-protocol #6022

Merged
merged 23 commits into from
Oct 5, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Only reconnect on new advertisements
  • Loading branch information
slumber committed Sep 17, 2022
commit 968160eff5b6514db1fa80d4f0b603254e787b2e
44 changes: 7 additions & 37 deletions node/network/collator-protocol/src/collator_side/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,14 @@ use polkadot_primitives::v2::{
GroupIndex, Hash, Id as ParaId, SessionIndex,
};

use self::validators_buffer::ResetBitDelay;

use super::LOG_TARGET;
use crate::error::{log_error, Error, FatalError, Result};
use fatality::Split;

mod metrics;
mod validators_buffer;

use validators_buffer::{ValidatorGroupsBuffer, RESET_BIT_DELAY, VALIDATORS_BUFFER_CAPACITY};
use validators_buffer::{ValidatorGroupsBuffer, VALIDATORS_BUFFER_CAPACITY};

pub use metrics::Metrics;

Expand Down Expand Up @@ -213,14 +211,6 @@ struct State {
/// Tracks which validators we want to stay connected to.
validator_groups_buf: ValidatorGroupsBuffer<VALIDATORS_BUFFER_CAPACITY>,

/// A set of futures that notify the subsystem to reset validator's bit in
/// a buffer with respect to advertisement.
///
/// This doesn't necessarily mean that a validator will be disconnected
/// as there may exist several collations in our view this validator is interested
/// in.
reset_bit_delays: FuturesUnordered<ResetBitDelay>,

/// Metrics.
metrics: Metrics,

Expand Down Expand Up @@ -253,7 +243,6 @@ impl State {
our_validators_groups: Default::default(),
peer_ids: Default::default(),
validator_groups_buf: ValidatorGroupsBuffer::new(),
reset_bit_delays: Default::default(),
waiting_collation_fetches: Default::default(),
active_collation_fetches: Default::default(),
}
Expand Down Expand Up @@ -360,7 +349,7 @@ async fn distribute_collation<Context>(
);

// Update a set of connected validators if necessary.
reconnect_to_validators(ctx, &state.validator_groups_buf).await;
connect_to_validators(ctx, &state.validator_groups_buf).await;

state.our_validators_groups.insert(relay_parent, ValidatorGroup::new());

Expand Down Expand Up @@ -472,13 +461,12 @@ async fn declare<Context>(ctx: &mut Context, state: &mut State, peer: PeerId) {
}

/// Updates a set of connected validators based on their advertisement-bits
/// in a buffer.
/// in a validators buffer.
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
async fn reconnect_to_validators<Context>(
async fn connect_to_validators<Context>(
ctx: &mut Context,
validator_groups_buf: &ValidatorGroupsBuffer<VALIDATORS_BUFFER_CAPACITY>,
) {
// Validators not present in this vec are disconnected.
let validator_ids = validator_groups_buf.validators_to_connect();
slumber marked this conversation as resolved.
Show resolved Hide resolved

// ignore address resolution failure
Expand Down Expand Up @@ -547,16 +535,6 @@ async fn advertise_collation<Context>(
))
.await;

// If a validator doesn't fetch a collation within a timeout,
// reset its bit anyway.
if let Some(authority_ids) = state.peer_ids.get(&peer) {
state.reset_bit_delays.push(ResetBitDelay::new(
relay_parent,
authority_ids.clone(),
RESET_BIT_DELAY,
));
}

if let Some(validators) = state.our_validators_groups.get_mut(&relay_parent) {
validators.advertised_to_peer(&state.peer_ids, &peer);
}
Expand Down Expand Up @@ -967,11 +945,9 @@ pub(crate) async fn run<Context>(
FromOrchestra::Signal(Conclude) => return Ok(()),
},
(relay_parent, peer_id) = state.active_collation_fetches.select_next_some() => {
// Schedule a bit reset for this peer.
if let Some(authority_ids) = state.peer_ids.get(&peer_id) {
state.reset_bit_delays.push(ResetBitDelay::new(
relay_parent, authority_ids.clone(), RESET_BIT_DELAY
));
for authority_id in state.peer_ids.get(&peer_id).into_iter().flatten() {
// This peer is no longer interested in this relay parent.
state.validator_groups_buf.reset_validator_bit(relay_parent, authority_id);
slumber marked this conversation as resolved.
Show resolved Hide resolved
}

let next = if let Some(waiting) = state.waiting_collation_fetches.get_mut(&relay_parent) {
Expand All @@ -994,12 +970,6 @@ pub(crate) async fn run<Context>(
send_collation(&mut state, next, receipt, pov).await;
}
},
(relay_parent, authority_ids) = state.reset_bit_delays.select_next_some() => {
for authority_id in authority_ids {
state.validator_groups_buf.reset_validator_bit(relay_parent, &authority_id);
}
reconnect_to_validators(&mut ctx, &state.validator_groups_buf).await;
}
in_req = recv_req => {
match in_req {
Ok(req) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,30 +24,21 @@
//! 1 indicating we want to be connected to i-th validator in a buffer, 0 otherwise.
//!
//! The bit is set to 1 on new advertisements, and back to 0 when a collation is fetched
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually not implemented here. This is just a usage pattern we envision, should be documented as such.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In particular this module does not (and should not) care about why we are clearing a bit. I could be because the collation was fetched or because a timeout happened. Instead I would love more documentation about what this module actually implements and why, what invariants it keeps, ...

//! by a validator or the timeout has been hit.
//! by a validator.
//!
//! The bitwise OR over known advertisements gives us validators indices for connection request.

use std::{
collections::{HashMap, HashSet, VecDeque},
future::Future,
collections::{HashMap, VecDeque},
ops::Range,
pin::Pin,
task::{Context, Poll},
time::Duration,
};

use bitvec::{bitvec, vec::BitVec};
use futures::FutureExt;

use polkadot_primitives::v2::{AuthorityDiscoveryId, GroupIndex, Hash, SessionIndex};

pub const VALIDATORS_BUFFER_CAPACITY: usize = 3;

/// Validators bits are only reset after a delay, to mitigate
/// the risk of disconnecting from the same group throughout rotation.
pub const RESET_BIT_DELAY: Duration = Duration::from_secs(12);

/// Unique identifier of a validators group.
#[derive(Debug, Default, Clone, PartialEq, Eq)]
struct ValidatorsGroupInfo {
Expand Down Expand Up @@ -176,32 +167,6 @@ impl<const N: usize> ValidatorGroupsBuffer<N> {
}
}

pub struct ResetBitDelay {
fut: futures_timer::Delay,
relay_parent: Hash,
authority_ids: HashSet<AuthorityDiscoveryId>,
}

impl ResetBitDelay {
pub fn new(
relay_parent: Hash,
authority_ids: HashSet<AuthorityDiscoveryId>,
delay: Duration,
) -> Self {
Self { fut: futures_timer::Delay::new(delay), relay_parent, authority_ids }
}
}

impl Future for ResetBitDelay {
type Output = (Hash, HashSet<AuthorityDiscoveryId>);

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.fut
.poll_unpin(cx)
.map(|_| (self.relay_parent, std::mem::take(&mut self.authority_ids)))
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down