Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

client/authority-discovery: Remove sentry node logic #7368

Merged
5 commits merged into from
Oct 26, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 5 additions & 17 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use grandpa::{self, FinalityProofProvider as GrandpaFinalityProofProvider};
use node_primitives::Block;
use node_runtime::RuntimeApi;
use sc_service::{
config::{Role, Configuration}, error::{Error as ServiceError},
config::{Configuration}, error::{Error as ServiceError},
RpcHandlers, TaskManager,
};
use sp_inherents::InherentDataProviders;
Expand Down Expand Up @@ -257,21 +257,10 @@ pub fn new_full_base(
}

// Spawn authority discovery module.
if matches!(role, Role::Authority{..} | Role::Sentry {..}) {
let (sentries, authority_discovery_role) = match role {
sc_service::config::Role::Authority { ref sentry_nodes } => (
sentry_nodes.clone(),
sc_authority_discovery::Role::Authority (
keystore_container.keystore(),
),
),
sc_service::config::Role::Sentry {..} => (
vec![],
sc_authority_discovery::Role::Sentry,
),
_ => unreachable!("Due to outer matches! constraint; qed.")
};

if role.is_authority() {
let authority_discovery_role = sc_authority_discovery::Role::PublishAndDiscover(
keystore_container.keystore(),
);
let dht_event_stream = network.event_stream("authority-discovery")
.filter_map(|e| async move { match e {
Event::Dht(e) => Some(e),
Expand All @@ -280,7 +269,6 @@ pub fn new_full_base(
let (authority_discovery_worker, _service) = sc_authority_discovery::new_worker_and_service(
client.clone(),
network.clone(),
sentries,
Box::pin(dht_event_stream),
authority_discovery_role,
prometheus_registry.clone(),
Expand Down
7 changes: 4 additions & 3 deletions client/authority-discovery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use futures::channel::{mpsc, oneshot};
use futures::Stream;

use sc_client_api::blockchain::HeaderBackend;
use sc_network::{config::MultiaddrWithPeerId, DhtEvent, Multiaddr, PeerId};
use sc_network::{DhtEvent, Multiaddr, PeerId};
use sp_authority_discovery::{AuthorityDiscoveryApi, AuthorityId};
use sp_runtime::traits::Block as BlockT;
use sp_api::ProvideRuntimeApi;
Expand All @@ -44,10 +44,11 @@ mod tests;
mod worker;

/// Create a new authority discovery [`Worker`] and [`Service`].
///
/// See the struct documentation of each for more details.
pub fn new_worker_and_service<Client, Network, Block, DhtEventStream>(
client: Arc<Client>,
network: Arc<Network>,
sentry_nodes: Vec<MultiaddrWithPeerId>,
dht_event_rx: DhtEventStream,
role: Role,
prometheus_registry: Option<prometheus_endpoint::Registry>,
Expand All @@ -62,7 +63,7 @@ where
let (to_worker, from_service) = mpsc::channel(0);

let worker = Worker::new(
from_service, client, network, sentry_nodes, dht_event_rx, role, prometheus_registry,
from_service, client, network, dht_event_rx, role, prometheus_registry,
);
let service = Service::new(to_worker);

Expand Down
6 changes: 1 addition & 5 deletions client/authority-discovery/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,7 @@ impl Service {
/// [`crate::Worker`] failed.
///
/// [`Multiaddr`]s returned always include a [`PeerId`] via a
/// [`libp2p::core::multiaddr:Protocol::P2p`] component. [`Multiaddr`]s
/// might differ in their [`PeerId`], e.g. when each [`Multiaddr`]
/// represents a different sentry node. This might change once support for
/// sentry nodes is removed (see
/// https://github.com/paritytech/substrate/issues/6845).
/// [`libp2p::core::multiaddr:Protocol::P2p`] component.
pub async fn get_addresses_by_authority_id(&mut self, authority: AuthorityId) -> Option<Vec<Multiaddr>> {
ordian marked this conversation as resolved.
Show resolved Hide resolved
let (tx, rx) = oneshot::channel();

Expand Down
3 changes: 1 addition & 2 deletions client/authority-discovery/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,8 @@ fn get_addresses_and_authority_id() {
let (mut worker, mut service) = new_worker_and_service(
test_api,
network.clone(),
vec![],
Box::pin(dht_event_rx),
Role::Authority(key_store.into()),
Role::PublishAndDiscover(key_store.into()),
None,
);
worker.inject_addresses(remote_authority_id.clone(), vec![remote_addr.clone()]);
Expand Down
128 changes: 39 additions & 89 deletions client/authority-discovery/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,13 @@ use futures_timer::Delay;

use addr_cache::AddrCache;
use codec::Decode;
use either::Either;
use libp2p::{core::multiaddr, multihash::Multihash};
use log::{debug, error, log_enabled};
use prometheus_endpoint::{Counter, CounterVec, Gauge, Opts, U64, register};
use prost::Message;
use rand::{seq::SliceRandom, thread_rng};
use sc_client_api::blockchain::HeaderBackend;
use sc_network::{
config::MultiaddrWithPeerId,
DhtEvent,
ExHashT,
Multiaddr,
Expand Down Expand Up @@ -72,68 +70,47 @@ const MAX_ADDRESSES_PER_AUTHORITY: usize = 10;
/// Maximum number of in-flight DHT lookups at any given point in time.
const MAX_IN_FLIGHT_LOOKUPS: usize = 8;

/// Role an authority discovery module can run as.
/// Role an authority discovery [`Worker`] can run as.
pub enum Role {
/// Actual authority as well as a reference to its key store.
Authority(Arc<dyn CryptoStore>),
/// Sentry node that guards an authority.
///
/// No reference to its key store needed, as sentry nodes don't have an identity to sign
/// addresses with in the first place.
Sentry,
/// Publish own addresses and discover addresses of others.
PublishAndDiscover(Arc<dyn CryptoStore>),
/// Discover addresses of others.
Discover,
}

/// A [`Worker`] makes a given authority discoverable and discovers other
/// authorities.
///
/// The [`Worker`] implements the Future trait. By
/// polling [`Worker`] an authority:

/// An authority discovery [`Worker`] can publish the local node's addresses as well as discover
/// those of other nodes via a Kademlia DHT.
///
/// 1. **Makes itself discoverable**
/// When constructed with [`Role::PublishAndDiscover`] a [`Worker`] will
///
/// 1. Retrieves its external addresses (including peer id) or the ones of
/// its sentry nodes.
/// 1. Retrieve its external addresses (including peer id).
///
/// 2. Signs the above.
/// 2. Get the list of keys owned by the local node participating in the current authority set.
///
/// 3. Puts the signature and the addresses on the libp2p Kademlia DHT.
/// 3. Sign the addresses with the keys.
///
/// 4. Put addresses and signature as a record with the authority id as a key on a Kademlia DHT.
///
/// 2. **Discovers other authorities**
/// When constructed with either [`Role::PublishAndDiscover`] or [`Role::Publish`] a [`Worker`] will
///
/// 1. Retrieves the current and next set of authorities.
/// 1. Retrieve the current and next set of authorities.
///
/// 2. Starts DHT queries for the ids of the authorities.
/// 2. Start DHT queries for the ids of the authorities.
///
/// 3. Validates the signatures of the retrieved key value pairs.
/// 3. Validate the signatures of the retrieved key value pairs.
///
/// 4. Adds the retrieved external addresses as priority nodes to the
/// peerset.
/// 4. Add the retrieved external addresses as priority nodes to the
/// network peerset.
///
/// When run as a sentry node, the [`Worker`] does not publish
/// any addresses to the DHT but still discovers validators and sentry nodes of
/// validators, i.e. only step 2 (Discovers other authorities) is executed.
pub struct Worker<Client, Network, Block, DhtEventStream>
where
Block: BlockT + 'static,
Network: NetworkProvider,
Client: ProvideRuntimeApi<Block> + Send + Sync + 'static + HeaderBackend<Block>,
<Client as ProvideRuntimeApi<Block>>::Api: AuthorityDiscoveryApi<Block>,
{
/// Channel receiver for messages send by an [`Service`].
/// 5. Allow querying of the collected addresses via the [`crate::Service`].
pub struct Worker<Client, Network, Block, DhtEventStream> {
/// Channel receiver for messages send by a [`Service`].
from_service: Fuse<mpsc::Receiver<ServicetoWorkerMsg>>,

client: Arc<Client>,

network: Arc<Network>,
/// List of sentry node public addresses.
//
// There are 3 states:
// - None: No addresses were specified.
// - Some(vec![]): Addresses were specified, but none could be parsed as proper
// Multiaddresses.
// - Some(vec![a, b, c, ...]): Valid addresses were specified.
sentry_nodes: Option<Vec<Multiaddr>>,
/// Channel we receive Dht events on.
dht_event_rx: DhtEventStream,

Expand Down Expand Up @@ -168,15 +145,11 @@ where
AuthorityDiscoveryApi<Block, Error = sp_blockchain::Error>,
DhtEventStream: Stream<Item = DhtEvent> + Unpin,
{
/// Return a new [`Worker`].
///
/// Note: When specifying `sentry_nodes` this module will not advertise the public addresses of
/// the node itself but only the public addresses of its sentry nodes.
/// Construct a [`Worker`].
pub(crate) fn new(
from_service: mpsc::Receiver<ServicetoWorkerMsg>,
client: Arc<Client>,
network: Arc<Network>,
sentry_nodes: Vec<MultiaddrWithPeerId>,
dht_event_rx: DhtEventStream,
role: Role,
prometheus_registry: Option<prometheus_endpoint::Registry>,
Expand Down Expand Up @@ -206,12 +179,6 @@ where
query_interval_duration,
);

let sentry_nodes = if !sentry_nodes.is_empty() {
Some(sentry_nodes.into_iter().map(|ma| ma.concat()).collect::<Vec<_>>())
} else {
None
};

let addr_cache = AddrCache::new();

let metrics = match prometheus_registry {
Expand All @@ -231,7 +198,6 @@ where
from_service: from_service.fuse(),
client,
network,
sentry_nodes,
dht_event_rx,
publish_interval,
query_interval,
Expand Down Expand Up @@ -312,33 +278,23 @@ where
}

fn addresses_to_publish(&self) -> impl ExactSizeIterator<Item = Multiaddr> {
match &self.sentry_nodes {
Some(addrs) => Either::Left(addrs.clone().into_iter()),
None => {
let peer_id: Multihash = self.network.local_peer_id().into();
Either::Right(
self.network.external_addresses()
.into_iter()
.map(move |a| {
if a.iter().any(|p| matches!(p, multiaddr::Protocol::P2p(_))) {
a
} else {
a.with(multiaddr::Protocol::P2p(peer_id.clone()))
}
}),
)
}
}
let peer_id: Multihash = self.network.local_peer_id().into();
self.network.external_addresses()
.into_iter()
.map(move |a| {
if a.iter().any(|p| matches!(p, multiaddr::Protocol::P2p(_))) {
a
} else {
a.with(multiaddr::Protocol::P2p(peer_id.clone()))
}
})
}

/// Publish either our own or if specified the public addresses of our sentry nodes.
/// Publish own public addresses.
async fn publish_ext_addresses(&mut self) -> Result<()> {
let key_store = match &self.role {
Role::Authority(key_store) => key_store,
// Only authority nodes can put addresses (their own or the ones of their sentry nodes)
// on the Dht. Sentry nodes don't have a known identity to authenticate such addresses,
// thus `publish_ext_addresses` becomes a no-op.
Role::Sentry => return Ok(()),
Role::PublishAndDiscover(key_store) => key_store,
Role::Discover => return Ok(()),
};

let addresses = self.addresses_to_publish();
Expand Down Expand Up @@ -393,12 +349,12 @@ where
let id = BlockId::hash(self.client.info().best_hash);

let local_keys = match &self.role {
Role::Authority(key_store) => {
Role::PublishAndDiscover(key_store) => {
key_store.sr25519_public_keys(
key_types::AUTHORITY_DISCOVERY
).await.into_iter().collect::<HashSet<_>>()
},
Role::Sentry => HashSet::new(),
Role::Discover => HashSet::new(),
};

let mut authorities = self
Expand Down Expand Up @@ -795,13 +751,7 @@ impl Metrics {

// Helper functions for unit testing.
#[cfg(test)]
impl<Block, Client, Network, DhtEventStream> Worker<Client, Network, Block, DhtEventStream>
where
Block: BlockT + 'static,
Network: NetworkProvider,
Client: ProvideRuntimeApi<Block> + Send + Sync + 'static + HeaderBackend<Block>,
<Client as ProvideRuntimeApi<Block>>::Api: AuthorityDiscoveryApi<Block>,
{
impl<Block, Client, Network, DhtEventStream> Worker<Client, Network, Block, DhtEventStream> {
pub(crate) fn inject_addresses(&mut self, authority: AuthorityId, addresses: Vec<Multiaddr>) {
self.addr_cache.insert(authority, addresses);
}
Expand Down
Loading