Skip to content

Commit

Permalink
Rewrite network protocol/service to use channels (paritytech#1340)
Browse files Browse the repository at this point in the history
* rewrite network protocol/service to use channels

* remove use of unwrap

* re-introduce with_spec

* remove unnecessary mut

* remove unused param

* improve with_spec, add with_gossip

* rename job to task

* style: re-add comma

* remove extra string allocs

* rename use of channel

* turn TODO into FIXME

* remove mut in match

* remove Self in new

* pass headers by value to network service

* remove network sender from service

* remove TODO

* better expect

* rationalize use of network sender in ondemand
  • Loading branch information
gterzian authored and bkchr committed Feb 6, 2019
1 parent be41ec8 commit 64cde6f
Show file tree
Hide file tree
Showing 19 changed files with 1,306 additions and 895 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 2 additions & 4 deletions core/consensus/aura/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -780,9 +780,7 @@ mod tests {
let environ = Arc::new(DummyFactory(client.clone()));
import_notifications.push(
client.import_notification_stream()
.take_while(|n| {
Ok(!(n.origin != BlockOrigin::Own && n.header.number() < &5))
})
.take_while(|n| Ok(!(n.origin != BlockOrigin::Own && n.header.number() < &5)))
.for_each(move |_| Ok(()))
);

Expand Down Expand Up @@ -816,7 +814,7 @@ mod tests {
let drive_to_completion = ::tokio::timer::Interval::new_interval(TEST_ROUTING_INTERVAL)
.for_each(move |_| {
net.lock().send_import_notifications();
net.lock().sync();
net.lock().route_fast();
Ok(())
})
.map(|_| ())
Expand Down
32 changes: 21 additions & 11 deletions core/finality-grandpa/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ extern crate env_logger;
extern crate parity_codec_derive;

use futures::prelude::*;
use futures::sync::mpsc;
use futures::sync::{self, mpsc};
use client::{
BlockchainEvents, CallExecutor, Client, backend::Backend,
error::Error as ClientError,
Expand Down Expand Up @@ -249,18 +249,18 @@ pub trait Network<Block: BlockT>: Clone {
}

/// Bridge between NetworkService, gossiping consensus messages and Grandpa
pub struct NetworkBridge<B: BlockT, S: network::specialization::NetworkSpecialization<B>, H: ExHashT> {
service: Arc<NetworkService<B, S, H>>
pub struct NetworkBridge<B: BlockT, S: network::specialization::NetworkSpecialization<B>> {
service: Arc<NetworkService<B, S>>
}

impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>, H: ExHashT> NetworkBridge<B, S, H> {
impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>> NetworkBridge<B, S> {
/// Create a new NetworkBridge to the given NetworkService
pub fn new(service: Arc<NetworkService<B, S, H>>) -> Self {
pub fn new(service: Arc<NetworkService<B, S>>) -> Self {
NetworkBridge { service }
}
}

impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>, H: ExHashT> Clone for NetworkBridge<B, S, H> {
impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>,> Clone for NetworkBridge<B, S> {
fn clone(&self) -> Self {
NetworkBridge {
service: Arc::clone(&self.service)
Expand All @@ -276,10 +276,15 @@ fn commit_topic<B: BlockT>(set_id: u64) -> B::Hash {
<<B::Header as HeaderT>::Hashing as HashT>::hash(format!("{}-COMMITS", set_id).as_bytes())
}

impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>, H: ExHashT> Network<B> for NetworkBridge<B, S, H> {
impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>,> Network<B> for NetworkBridge<B, S> {
type In = mpsc::UnboundedReceiver<ConsensusMessage>;
fn messages_for(&self, round: u64, set_id: u64) -> Self::In {
self.service.consensus_gossip().write().messages_for(message_topic::<B>(round, set_id))
let (tx, rx) = sync::oneshot::channel();
self.service.with_gossip(move |gossip, _| {
let inner_rx = gossip.messages_for(message_topic::<B>(round, set_id));
let _ = tx.send(inner_rx);
});
rx.wait().ok().expect("1. Network is running, 2. it should handle the above closure successfully")
}

fn send_message(&self, round: u64, set_id: u64, message: Vec<u8>) {
Expand All @@ -289,16 +294,21 @@ impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>, H: ExHashT

fn drop_round_messages(&self, round: u64, set_id: u64) {
let topic = message_topic::<B>(round, set_id);
self.service.consensus_gossip().write().collect_garbage_for_topic(topic);
self.service.with_gossip(move |gossip, _| gossip.collect_garbage(|t| t == &topic));
}

fn drop_set_messages(&self, set_id: u64) {
let topic = commit_topic::<B>(set_id);
self.service.consensus_gossip().write().collect_garbage_for_topic(topic);
self.service.with_gossip(move |gossip, _| gossip.collect_garbage(|t| t == &topic));
}

fn commit_messages(&self, set_id: u64) -> Self::In {
self.service.consensus_gossip().write().messages_for(commit_topic::<B>(set_id))
let (tx, rx) = sync::oneshot::channel();
self.service.with_gossip(move |gossip, _| {
let inner_rx = gossip.messages_for(commit_topic::<B>(set_id));
let _ = tx.send(inner_rx);
});
rx.wait().ok().expect("1. Network is running, 2. it should handle the above closure successfully")
}

fn send_commit(&self, _round: u64, set_id: u64, message: Vec<u8>) {
Expand Down
28 changes: 9 additions & 19 deletions core/finality-grandpa/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,7 @@ impl MessageRouting {
fn drop_messages(&self, topic: Hash) {
let inner = self.inner.lock();
let peer = inner.peer(self.peer_id);
let mut gossip = peer.consensus_gossip().write();
peer.with_spec(move |_, _| {
gossip.collect_garbage_for_topic(topic);
});
peer.consensus_gossip_collect_garbage_for(topic);
}
}

Expand Down Expand Up @@ -192,10 +189,7 @@ impl Network<Block> for MessageRouting {
fn messages_for(&self, round: u64, set_id: u64) -> Self::In {
let inner = self.inner.lock();
let peer = inner.peer(self.peer_id);
let mut gossip = peer.consensus_gossip().write();
let messages = peer.with_spec(move |_, _| {
gossip.messages_for(make_topic(round, set_id))
});
let messages = peer.consensus_gossip_messages_for(make_topic(round, set_id));

let messages = messages.map_err(
move |_| panic!("Messages for round {} dropped too early", round)
Expand All @@ -205,9 +199,8 @@ impl Network<Block> for MessageRouting {
}

fn send_message(&self, round: u64, set_id: u64, message: Vec<u8>) {
let mut inner = self.inner.lock();
let inner = self.inner.lock();
inner.peer(self.peer_id).gossip_message(make_topic(round, set_id), message, false);
inner.route_until_complete();
}

fn drop_round_messages(&self, round: u64, set_id: u64) {
Expand All @@ -223,10 +216,7 @@ impl Network<Block> for MessageRouting {
fn commit_messages(&self, set_id: u64) -> Self::In {
let inner = self.inner.lock();
let peer = inner.peer(self.peer_id);
let mut gossip = peer.consensus_gossip().write();
let messages = peer.with_spec(move |_, _| {
gossip.messages_for(make_commit_topic(set_id))
});
let messages = peer.consensus_gossip_messages_for(make_commit_topic(set_id));

let messages = messages.map_err(
move |_| panic!("Commit messages for set {} dropped too early", set_id)
Expand All @@ -236,9 +226,8 @@ impl Network<Block> for MessageRouting {
}

fn send_commit(&self, _round: u64, set_id: u64, message: Vec<u8>) {
let mut inner = self.inner.lock();
let inner = self.inner.lock();
inner.peer(self.peer_id).gossip_message(make_commit_topic(set_id), message, false);
inner.route_until_complete();
}

fn announce(&self, _round: u64, _set_id: u64, _block: H256) {
Expand Down Expand Up @@ -420,7 +409,7 @@ fn run_to_completion(blocks: u64, net: Arc<Mutex<GrandpaTestNet>>, peers: &[Keyr
.map_err(|_| ());

let drive_to_completion = ::tokio::timer::Interval::new_interval(TEST_ROUTING_INTERVAL)
.for_each(move |_| { net.lock().route_until_complete(); Ok(()) })
.for_each(move |_| { net.lock().route_fast(); Ok(()) })
.map(|_| ())
.map_err(|_| ());

Expand Down Expand Up @@ -506,7 +495,7 @@ fn finalize_3_voters_1_observer() {
.map_err(|_| ());

let drive_to_completion = ::tokio::timer::Interval::new_interval(TEST_ROUTING_INTERVAL)
.for_each(move |_| { net.lock().route_until_complete(); Ok(()) })
.for_each(move |_| { net.lock().route_fast(); Ok(()) })
.map(|_| ())
.map_err(|_| ());

Expand Down Expand Up @@ -667,6 +656,7 @@ fn transition_3_voters_twice_1_observer() {
.for_each(move |_| {
net.lock().send_import_notifications();
net.lock().send_finality_notifications();
net.lock().route_fast();
Ok(())
})
.map(|_| ())
Expand Down Expand Up @@ -776,7 +766,7 @@ fn sync_justifications_on_change_blocks() {
// the last peer should get the justification by syncing from other peers
assert!(net.lock().peer(3).client().justification(&BlockId::Number(21)).unwrap().is_none());
while net.lock().peer(3).client().justification(&BlockId::Number(21)).unwrap().is_none() {
net.lock().sync_steps(100);
net.lock().route_fast();
}
}

Expand Down
17 changes: 17 additions & 0 deletions core/network-libp2p/src/service_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ pub enum ServiceEvent {
protocol: ProtocolId,
/// Version of the protocol that was opened.
version: u8,
/// Node debug info
debug_info: String,
},

/// A custom protocol substream has been closed.
Expand All @@ -162,6 +164,8 @@ pub enum ServiceEvent {
node_index: NodeIndex,
/// Protocol that has been closed.
protocol: ProtocolId,
/// Node debug info
debug_info: String,
},

/// Sustom protocol substreams has been closed.
Expand All @@ -172,6 +176,8 @@ pub enum ServiceEvent {
node_index: NodeIndex,
/// Protocols that have been closed.
protocols: Vec<ProtocolId>,
/// Node debug info
debug_info: String,
},

/// Receives a message on a custom protocol stream.
Expand Down Expand Up @@ -348,6 +354,15 @@ impl Service {
}
}

/// Get debug info for a given peer.
pub fn peer_debug_info(&self, who: NodeIndex) -> String {
if let (Some(peer_id), Some(addr)) = (self.peer_id_of_node(who), self.node_endpoint(who)) {
format!("{:?} through {:?}", peer_id, addr)
} else {
"unknown".to_string()
}
}

/// Returns the `NodeIndex` of a peer, or assigns one if none exists.
fn index_of_peer_or_assign(&mut self, peer: PeerId, endpoint: ConnectedPoint) -> NodeIndex {
match self.index_by_id.entry(peer) {
Expand Down Expand Up @@ -385,6 +400,7 @@ impl Service {
node_index,
protocol: protocol_id,
version,
debug_info: self.peer_debug_info(node_index),
})))
}
Ok(Async::Ready(Some(BehaviourOut::CustomProtocolClosed { protocol_id, peer_id, result }))) => {
Expand All @@ -393,6 +409,7 @@ impl Service {
break Ok(Async::Ready(Some(ServiceEvent::ClosedCustomProtocol {
node_index,
protocol: protocol_id,
debug_info: self.peer_debug_info(node_index),
})))
}
Ok(Async::Ready(Some(BehaviourOut::CustomMessage { protocol_id, peer_id, data }))) => {
Expand Down
12 changes: 6 additions & 6 deletions core/network-libp2p/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,21 +96,21 @@ impl NetworkConfiguration {
}

/// The severity of misbehaviour of a peer that is reported.
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum Severity<'a> {
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum Severity {
/// Peer is timing out. Could be bad connectivity of overload of work on either of our sides.
Timeout,
/// Peer has been notably useless. E.g. unable to answer a request that we might reasonably consider
/// it could answer.
Useless(&'a str),
Useless(String),
/// Peer has behaved in an invalid manner. This doesn't necessarily need to be Byzantine, but peer
/// must have taken concrete action in order to behave in such a way which is wantanly invalid.
Bad(&'a str),
Bad(String),
}

impl<'a> fmt::Display for Severity<'a> {
impl fmt::Display for Severity {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match *self {
match self {
Severity::Timeout => write!(fmt, "Timeout"),
Severity::Useless(r) => write!(fmt, "Useless ({})", r),
Severity::Bad(r) => write!(fmt, "Bad ({})", r),
Expand Down
1 change: 1 addition & 0 deletions core/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ authors = ["Parity Technologies <admin@parity.io>"]
[lib]

[dependencies]
crossbeam-channel = "0.3.6"
log = "0.4"
parking_lot = "0.7.1"
error-chain = "0.12"
Expand Down
79 changes: 0 additions & 79 deletions core/network/src/io.rs

This file was deleted.

5 changes: 3 additions & 2 deletions core/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
//! Substrate-specific P2P networking: synchronizing blocks, propagating BFT messages.
//! Allows attachment of an optional subprotocol for chain-specific requests.
#[macro_use]
extern crate crossbeam_channel;
extern crate linked_hash_map;
extern crate lru_cache;
extern crate parking_lot;
Expand Down Expand Up @@ -51,7 +53,6 @@ mod service;
mod sync;
#[macro_use]
mod protocol;
mod io;
mod chain;
mod blocks;
mod on_demand;
Expand All @@ -65,7 +66,7 @@ pub mod specialization;
pub mod test;

pub use chain::Client as ClientHandle;
pub use service::{Service, FetchFuture, TransactionPool, ManageNetwork, SyncProvider, ExHashT};
pub use service::{Service, FetchFuture, TransactionPool, ManageNetwork, NetworkMsg, SyncProvider, ExHashT};
pub use protocol::{ProtocolStatus, PeerInfo, Context};
pub use sync::{Status as SyncStatus, SyncState};
pub use network_libp2p::{
Expand Down
Loading

0 comments on commit 64cde6f

Please sign in to comment.