Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Rework reconnect logic
Browse files Browse the repository at this point in the history
  • Loading branch information
slumber committed Oct 4, 2022
1 parent 89e6169 commit 2ed7f7a
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 48 deletions.
90 changes: 43 additions & 47 deletions node/network/collator-protocol/src/collator_side/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use std::{
collections::{HashMap, HashSet, VecDeque},
pin::Pin,
time::{Duration, Instant},
time::Duration,
};

use futures::{
Expand Down Expand Up @@ -59,7 +59,10 @@ use fatality::Split;
mod metrics;
mod validators_buffer;

use validators_buffer::{ValidatorGroupsBuffer, VALIDATORS_BUFFER_CAPACITY};
use validators_buffer::{
ResetValidatorDelay, ValidatorGroupsBuffer, RESET_VALIDATOR_INTEREST_TIMEOUT,
VALIDATORS_BUFFER_CAPACITY,
};

pub use metrics::Metrics;

Expand All @@ -79,17 +82,6 @@ const COST_APPARENT_FLOOD: Rep =
/// For considerations on this value, see: https://github.com/paritytech/polkadot/issues/4386
const MAX_UNSHARED_UPLOAD_TIME: Duration = Duration::from_millis(150);

/// Ensure that collator issues a connection request at least once every this many seconds.
/// Usually it's done when advertising new collation. However, if the core stays occupied or
/// it's not our turn to produce a candidate, it's important to disconnect from previous
/// peers.
///
/// Validators are obtained from [`ValidatorGroupsBuffer::validators_to_connect`].
const RECONNECT_TIMEOUT: Duration = Duration::from_secs(12);

/// How often to check for reconnect timeout.
const RECONNECT_POLL: Duration = Duration::from_secs(1);

/// Info about validators we are currently connected to.
///
/// It keeps track to which validators we advertised our collation.
Expand Down Expand Up @@ -228,9 +220,11 @@ struct State {
/// Tracks which validators we want to stay connected to.
validator_groups_buf: ValidatorGroupsBuffer,

/// Timestamp of the last connection request to a non-empty list of validators,
/// `None` otherwise.
last_connected_at: Option<Instant>,
/// Timeouts for fetching collations.
///
/// Validators are expected to fetch a collation within a time limit,
/// otherwise we force reset their interest in an advertisement.
reset_validator_delays: FuturesUnordered<ResetValidatorDelay>,

/// Metrics.
metrics: Metrics,
Expand Down Expand Up @@ -264,7 +258,7 @@ impl State {
our_validators_groups: Default::default(),
peer_ids: Default::default(),
validator_groups_buf: ValidatorGroupsBuffer::with_capacity(VALIDATORS_BUFFER_CAPACITY),
last_connected_at: None,
reset_validator_delays: Default::default(),
waiting_collation_fetches: Default::default(),
active_collation_fetches: Default::default(),
}
Expand Down Expand Up @@ -376,7 +370,7 @@ async fn distribute_collation<Context>(
);

// Update a set of connected validators if necessary.
state.last_connected_at = connect_to_validators(ctx, &state.validator_groups_buf).await;
connect_to_validators(ctx, &state.validator_groups_buf).await;

state.our_validators_groups.insert(relay_parent, ValidatorGroup::new());

Expand Down Expand Up @@ -487,19 +481,20 @@ async fn declare<Context>(ctx: &mut Context, state: &mut State, peer: PeerId) {
}
}

/// Updates a set of connected validators based on their advertisement-bits
/// in a validators buffer.
///
/// Returns current timestamp if the connection request was non-empty, `None`
/// otherwise.
/// Updates a set of connected validators based on their advertisements
/// interests in a validators buffer.
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
async fn connect_to_validators<Context>(
ctx: &mut Context,
validator_groups_buf: &ValidatorGroupsBuffer,
) -> Option<Instant> {
) {
let validator_ids = validator_groups_buf.validators_to_connect();
let is_disconnect = validator_ids.is_empty();

if is_disconnect {
gum::debug!(target: LOG_TARGET, "Disconnected from all validators");
}

// ignore address resolution failure
// will reissue a new request on new collation
let (failed, _) = oneshot::channel();
Expand All @@ -509,8 +504,6 @@ async fn connect_to_validators<Context>(
failed,
})
.await;

(!is_disconnect).then_some(Instant::now())
}

/// Advertise collation to the given `peer`.
Expand Down Expand Up @@ -572,6 +565,16 @@ async fn advertise_collation<Context>(
validators.advertised_to_peer(&state.peer_ids, &peer);
}

// Make sure to disconnect from validator if the collation
// wasn't requested within a timeout.
if let Some(authority_ids) = state.peer_ids.get(&peer) {
state.reset_validator_delays.push(ResetValidatorDelay::new(
relay_parent,
authority_ids.clone(),
RESET_VALIDATOR_INTEREST_TIMEOUT,
));
}

state.metrics.on_advertisment_made();
}

Expand Down Expand Up @@ -889,7 +892,7 @@ async fn handle_network_msg<Context>(
},
OurViewChange(view) => {
gum::trace!(target: LOG_TARGET, ?view, "Own view change");
handle_our_view_change(state, view).await?;
handle_our_view_change(ctx, state, view).await?;
},
PeerMessage(remote, Versioned::V1(msg)) => {
handle_incoming_peer_message(ctx, runtime, state, remote, msg).await?;
Expand All @@ -903,7 +906,12 @@ async fn handle_network_msg<Context>(
}

/// Handles our view changes.
async fn handle_our_view_change(state: &mut State, view: OurView) -> Result<()> {
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
async fn handle_our_view_change<Context>(
ctx: &mut Context,
state: &mut State,
view: OurView,
) -> Result<()> {
for removed in state.view.difference(&view) {
gum::debug!(target: LOG_TARGET, relay_parent = ?removed, "Removing relay parent because our view changed.");

Expand Down Expand Up @@ -938,6 +946,8 @@ async fn handle_our_view_change(state: &mut State, view: OurView) -> Result<()>
}

state.view = view;
// Update a set of connected validators if necessary.
connect_to_validators(ctx, &state.validator_groups_buf).await;

Ok(())
}
Expand All @@ -956,9 +966,6 @@ pub(crate) async fn run<Context>(
let mut state = State::new(local_peer_id, collator_pair, metrics);
let mut runtime = RuntimeInfo::new(None);

let reconnect_stream = super::tick_stream(RECONNECT_POLL);
pin_mut!(reconnect_stream);

loop {
let recv_req = req_receiver.recv(|| vec![COST_INVALID_REQUEST]).fuse();
pin_mut!(recv_req);
Expand Down Expand Up @@ -1013,22 +1020,11 @@ pub(crate) async fn run<Context>(
send_collation(&mut state, next, receipt, pov).await;
}
},
_ = reconnect_stream.next() => {
let now = Instant::now();
if state
.last_connected_at
.map_or(false, |timestamp| now - timestamp > RECONNECT_TIMEOUT)
{
// Returns `None` if connection request is empty.
state.last_connected_at =
connect_to_validators(&mut ctx, &state.validator_groups_buf).await;

gum::debug!(
target: LOG_TARGET,
timeout = ?RECONNECT_TIMEOUT,
"Timeout hit, sent a connection request. Disconnected from all validators = {}",
state.last_connected_at.is_none(),
);
(relay_parent, authority_ids) = state.reset_validator_delays.select_next_some() => {
for authority_id in &authority_ids {
// Does nothing if the collation was already fetched or
// went out of view.
state.validator_groups_buf.reset_validator_interest(relay_parent, authority_id);
}
},
in_req = recv_req => {
Expand Down
10 changes: 10 additions & 0 deletions node/network/collator-protocol/src/collator_side/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,11 @@ impl TestState {
)),
)
.await;

assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ConnectToValidators { .. })
);
}
}

Expand Down Expand Up @@ -294,6 +299,11 @@ async fn setup_system(virtual_overseer: &mut VirtualOverseer, test_state: &TestS
])),
)
.await;

assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ConnectToValidators { .. })
);
}

/// Result of [`distribute_collation`]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,17 @@
//! The bitwise OR over known advertisements gives us validators indices for connection request.
use std::{
collections::{HashMap, VecDeque},
collections::{HashMap, HashSet, VecDeque},
future::Future,
num::NonZeroUsize,
ops::Range,
pin::Pin,
task::{Context, Poll},
time::Duration,
};

use bitvec::{bitvec, vec::BitVec};
use futures::FutureExt;

use polkadot_primitives::v2::{AuthorityDiscoveryId, GroupIndex, Hash, SessionIndex};

Expand Down Expand Up @@ -212,6 +217,46 @@ impl ValidatorGroupsBuffer {
}
}

/// If an advertised collation is not fetched by the validator within this timeout,
/// we should force-reset its interest with respect to relay parent.
pub const RESET_VALIDATOR_INTEREST_TIMEOUT: Duration = Duration::from_secs(6);

/// Future that when awaited returns advertisement's relay parent
/// a corresponding validator should no longer be interested in.
///
/// If a relay parent doesn't go out of view and the validator hasn't
/// fetched a collation after some timeout, we shouldn't attempt to connect
/// to this peer anymore. For example, when a validator already fetched PoV
/// from another peer.
pub struct ResetValidatorDelay {
/// Advertisement relay parent.
relay_parent: Hash,
/// Discovery ids of the validator.
authority_ids: HashSet<AuthorityDiscoveryId>,
/// Delay future.
fut: futures_timer::Delay,
}

impl ResetValidatorDelay {
pub fn new(
relay_parent: Hash,
authority_ids: HashSet<AuthorityDiscoveryId>,
delay: Duration,
) -> Self {
Self { relay_parent, authority_ids, fut: futures_timer::Delay::new(delay) }
}
}

impl Future for ResetValidatorDelay {
type Output = (Hash, HashSet<AuthorityDiscoveryId>);

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.fut
.poll_unpin(cx)
.map(|_| (self.relay_parent, std::mem::take(&mut self.authority_ids)))
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit 2ed7f7a

Please sign in to comment.