Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Attempt to relieve pressure on mpsc_network_worker (#13725)
Browse files Browse the repository at this point in the history
* Attempt to relieve pressure on `mpsc_network_worker`

`SyncingEngine` interacting with `NetworkWorker` can put a lot of strain
on the channel if the number of inbound connections is high. This is
because `SyncingEngine` is notified of each inbound substream which it
then can either accept or reject and this causes a lot of message
exchange on the already busy channel.

Use a direct channel pair between `Protocol` and `SyncingEngine`
to exchange notification events. It is a temporary change to alleviate
the problems caused by syncing being an independent protocol and the
fix will be removed once `NotificationService` is implemented.

* Apply review comments

* fixes

* trigger ci

* Fix tests

Verify that both peers have a connection now that the validation goes
through `SyncingEngine`. Depending on how the tasks are scheduled,
one of them might not have the peer registered in `SyncingEngine` at which
point the test won't make any progress because block announcement received
from an unknown peer is discarded.

Move polling of `ChainSync` at the end of the function so that if a block
announcement causes a block request to be sent, that can be sent in the
same call to `SyncingEngine::poll()`.

---------

Co-authored-by: parity-processbot <>
  • Loading branch information
altonen authored Mar 30, 2023
1 parent 14f1a39 commit 4499829
Show file tree
Hide file tree
Showing 12 changed files with 259 additions and 153 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion client/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
//! See the documentation of [`Params`].
pub use crate::{
protocol::NotificationsSink,
request_responses::{
IncomingRequest, OutgoingResponse, ProtocolConfig as RequestResponseConfig,
},
Expand All @@ -31,7 +32,12 @@ pub use crate::{
use codec::Encode;
use libp2p::{identity::Keypair, multiaddr, Multiaddr, PeerId};
use prometheus_endpoint::Registry;
pub use sc_network_common::{role::Role, sync::warp::WarpSyncProvider, ExHashT};
pub use sc_network_common::{
role::{Role, Roles},
sync::warp::WarpSyncProvider,
ExHashT,
};
use sc_utils::mpsc::TracingUnboundedSender;
use zeroize::Zeroize;

use sp_runtime::traits::Block as BlockT;
Expand Down Expand Up @@ -714,6 +720,9 @@ pub struct Params<Block: BlockT> {
/// Block announce protocol configuration
pub block_announce_config: NonDefaultSetConfig,

/// TX channel for direct communication with `SyncingEngine` and `Protocol`.
pub tx: TracingUnboundedSender<crate::event::SyncEvent<Block>>,

/// Request response protocol configurations
pub request_response_protocol_configs: Vec<RequestResponseConfig>,
}
Expand Down
47 changes: 45 additions & 2 deletions client/network/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
//! Network event types. These are are not the part of the protocol, but rather
//! events that happen on the network like DHT get/put results received.
use crate::types::ProtocolName;
use crate::{types::ProtocolName, NotificationsSink};

use bytes::Bytes;
use futures::channel::oneshot;
use libp2p::{core::PeerId, kad::record::Key};

use sc_network_common::role::ObservedRole;
use sc_network_common::{role::ObservedRole, sync::message::BlockAnnouncesHandshake};
use sp_runtime::traits::Block as BlockT;

/// Events generated by DHT as a response to get_value and put_value requests.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -90,3 +92,44 @@ pub enum Event {
messages: Vec<(ProtocolName, Bytes)>,
},
}

/// Event sent to `SyncingEngine`
// TODO: remove once `NotificationService` is implemented.
pub enum SyncEvent<B: BlockT> {
/// Opened a substream with the given node with the given notifications protocol.
///
/// The protocol is always one of the notification protocols that have been registered.
NotificationStreamOpened {
/// Node we opened the substream with.
remote: PeerId,
/// Received handshake.
received_handshake: BlockAnnouncesHandshake<B>,
/// Notification sink.
sink: NotificationsSink,
/// Channel for reporting accept/reject of the substream.
tx: oneshot::Sender<bool>,
},

/// Closed a substream with the given node. Always matches a corresponding previous
/// `NotificationStreamOpened` message.
NotificationStreamClosed {
/// Node we closed the substream with.
remote: PeerId,
},

/// Notification sink was replaced.
NotificationSinkReplaced {
/// Node we closed the substream with.
remote: PeerId,
/// Notification sink.
sink: NotificationsSink,
},

/// Received one or more messages from the given node using the given protocol.
NotificationsReceived {
/// Node we received the message from.
remote: PeerId,
/// Concerned protocol and associated message.
messages: Vec<Bytes>,
},
}
6 changes: 3 additions & 3 deletions client/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ pub mod request_responses;
pub mod types;
pub mod utils;

pub use event::{DhtEvent, Event};
pub use event::{DhtEvent, Event, SyncEvent};
#[doc(inline)]
pub use libp2p::{multiaddr, Multiaddr, PeerId};
pub use request_responses::{IfDisconnected, RequestFailure, RequestResponseConfig};
Expand All @@ -278,8 +278,8 @@ pub use service::{
NetworkStatusProvider, NetworkSyncForkRequest, NotificationSender as NotificationSenderT,
NotificationSenderError, NotificationSenderReady,
},
DecodingError, Keypair, NetworkService, NetworkWorker, NotificationSender, OutboundFailure,
PublicKey,
DecodingError, Keypair, NetworkService, NetworkWorker, NotificationSender, NotificationsSink,
OutboundFailure, PublicKey,
};
pub use types::ProtocolName;

Expand Down
108 changes: 90 additions & 18 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::{

use bytes::Bytes;
use codec::{DecodeAll, Encode};
use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt};
use libp2p::{
core::connection::ConnectionId,
swarm::{
Expand All @@ -35,11 +36,14 @@ use libp2p::{
use log::{debug, error, warn};

use sc_network_common::{role::Roles, sync::message::BlockAnnouncesHandshake};
use sc_utils::mpsc::TracingUnboundedSender;
use sp_runtime::traits::Block as BlockT;

use std::{
collections::{HashMap, HashSet, VecDeque},
future::Future,
iter,
pin::Pin,
task::Poll,
};

Expand Down Expand Up @@ -68,6 +72,9 @@ mod rep {
pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
}

type PendingSyncSubstreamValidation =
Pin<Box<dyn Future<Output = Result<(PeerId, Roles), PeerId>> + Send>>;

// Lock must always be taken in order declared here.
pub struct Protocol<B: BlockT> {
/// Pending list of messages to return from `poll` as a priority.
Expand All @@ -87,6 +94,8 @@ pub struct Protocol<B: BlockT> {
bad_handshake_substreams: HashSet<(PeerId, sc_peerset::SetId)>,
/// Connected peers.
peers: HashMap<PeerId, Roles>,
sync_substream_validations: FuturesUnordered<PendingSyncSubstreamValidation>,
tx: TracingUnboundedSender<crate::event::SyncEvent<B>>,
_marker: std::marker::PhantomData<B>,
}

Expand All @@ -96,6 +105,7 @@ impl<B: BlockT> Protocol<B> {
roles: Roles,
network_config: &config::NetworkConfiguration,
block_announces_protocol: config::NonDefaultSetConfig,
tx: TracingUnboundedSender<crate::event::SyncEvent<B>>,
) -> error::Result<(Self, sc_peerset::PeersetHandle, Vec<(PeerId, Multiaddr)>)> {
let mut known_addresses = Vec::new();

Expand Down Expand Up @@ -179,6 +189,8 @@ impl<B: BlockT> Protocol<B> {
.collect(),
bad_handshake_substreams: Default::default(),
peers: HashMap::new(),
sync_substream_validations: FuturesUnordered::new(),
tx,
// TODO: remove when `BlockAnnouncesHandshake` is moved away from `Protocol`
_marker: Default::default(),
};
Expand Down Expand Up @@ -418,6 +430,23 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
return Poll::Ready(NetworkBehaviourAction::CloseConnection { peer_id, connection }),
};

while let Poll::Ready(Some(validation_result)) =
self.sync_substream_validations.poll_next_unpin(cx)
{
match validation_result {
Ok((peer, roles)) => {
self.peers.insert(peer, roles);
},
Err(peer) => {
log::debug!(
target: "sub-libp2p",
"`SyncingEngine` rejected stream"
);
self.behaviour.disconnect_peer(&peer, HARDCODED_PEERSETS_SYNC);
},
}
}

let outcome = match event {
NotificationsOut::CustomProtocolOpen {
peer_id,
Expand All @@ -440,16 +469,29 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
best_hash: handshake.best_hash,
genesis_hash: handshake.genesis_hash,
};
self.peers.insert(peer_id, roles);

CustomMessageOutcome::NotificationStreamOpened {
remote: peer_id,
protocol: self.notification_protocols[usize::from(set_id)].clone(),
negotiated_fallback,
received_handshake: handshake.encode(),
roles,
notifications_sink,
}
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(
crate::SyncEvent::NotificationStreamOpened {
remote: peer_id,
received_handshake: handshake,
sink: notifications_sink,
tx,
},
);
self.sync_substream_validations.push(Box::pin(async move {
match rx.await {
Ok(accepted) =>
if accepted {
Ok((peer_id, roles))
} else {
Err(peer_id)
},
Err(_) => Err(peer_id),
}
}));

CustomMessageOutcome::None
},
Ok(msg) => {
debug!(
Expand All @@ -469,15 +511,27 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
let roles = handshake.roles;
self.peers.insert(peer_id, roles);

CustomMessageOutcome::NotificationStreamOpened {
remote: peer_id,
protocol: self.notification_protocols[usize::from(set_id)]
.clone(),
negotiated_fallback,
received_handshake,
roles,
notifications_sink,
}
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(
crate::SyncEvent::NotificationStreamOpened {
remote: peer_id,
received_handshake: handshake,
sink: notifications_sink,
tx,
},
);
self.sync_substream_validations.push(Box::pin(async move {
match rx.await {
Ok(accepted) =>
if accepted {
Ok((peer_id, roles))
} else {
Err(peer_id)
},
Err(_) => Err(peer_id),
}
}));
CustomMessageOutcome::None
},
Err(err2) => {
log::debug!(
Expand Down Expand Up @@ -535,6 +589,12 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
NotificationsOut::CustomProtocolReplaced { peer_id, notifications_sink, set_id } =>
if self.bad_handshake_substreams.contains(&(peer_id, set_id)) {
CustomMessageOutcome::None
} else if set_id == HARDCODED_PEERSETS_SYNC {
let _ = self.tx.unbounded_send(crate::SyncEvent::NotificationSinkReplaced {
remote: peer_id,
sink: notifications_sink,
});
CustomMessageOutcome::None
} else {
CustomMessageOutcome::NotificationStreamReplaced {
remote: peer_id,
Expand All @@ -548,6 +608,12 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
// handshake. The outer layers have never received an opening event about this
// substream, and consequently shouldn't receive a closing event either.
CustomMessageOutcome::None
} else if set_id == HARDCODED_PEERSETS_SYNC {
let _ = self.tx.unbounded_send(crate::SyncEvent::NotificationStreamClosed {
remote: peer_id,
});
self.peers.remove(&peer_id);
CustomMessageOutcome::None
} else {
CustomMessageOutcome::NotificationStreamClosed {
remote: peer_id,
Expand All @@ -558,6 +624,12 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
NotificationsOut::Notification { peer_id, set_id, message } => {
if self.bad_handshake_substreams.contains(&(peer_id, set_id)) {
CustomMessageOutcome::None
} else if set_id == HARDCODED_PEERSETS_SYNC {
let _ = self.tx.unbounded_send(crate::SyncEvent::NotificationsReceived {
remote: peer_id,
messages: vec![message.freeze()],
});
CustomMessageOutcome::None
} else {
let protocol_name = self.notification_protocols[usize::from(set_id)].clone();
CustomMessageOutcome::NotificationsReceived {
Expand Down
6 changes: 4 additions & 2 deletions client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::{
network_state::{
NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer,
},
protocol::{self, NotificationsSink, NotifsHandlerError, Protocol, Ready},
protocol::{self, NotifsHandlerError, Protocol, Ready},
request_responses::{IfDisconnected, RequestFailure},
service::{
signature::{Signature, SigningError},
Expand Down Expand Up @@ -91,6 +91,7 @@ use std::{

pub use behaviour::{InboundFailure, OutboundFailure, ResponseFailure};
pub use libp2p::identity::{error::DecodingError, Keypair, PublicKey};
pub use protocol::NotificationsSink;

mod metrics;
mod out_events;
Expand Down Expand Up @@ -146,7 +147,7 @@ where
/// Returns a `NetworkWorker` that implements `Future` and must be regularly polled in order
/// for the network processing to advance. From it, you can extract a `NetworkService` using
/// `worker.service()`. The `NetworkService` can be shared through the codebase.
pub fn new<Block: BlockT>(mut params: Params<Block>) -> Result<Self, Error> {
pub fn new(mut params: Params<B>) -> Result<Self, Error> {
// Private and public keys configuration.
let local_identity = params.network_config.node_key.clone().into_keypair()?;
let local_public = local_identity.public();
Expand Down Expand Up @@ -227,6 +228,7 @@ where
From::from(&params.role),
&params.network_config,
params.block_announce_config,
params.tx,
)?;

// List of multiaddresses that we know in the network.
Expand Down
Loading

1 comment on commit 4499829

@Polkadot-Forum
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This commit has been mentioned on Polkadot Forum. There might be relevant details there:

https://forum.polkadot.network/t/polkadot-release-analysis-v0-9-41-v0-9-42/2828/1

Please sign in to comment.