diff --git a/.maintain/sentry-node/docker-compose.yml b/.maintain/sentry-node/docker-compose.yml index dd2aee6995473ad2f4204180568313fe8d271a39..37f0bea6b49856db4da2640742fc12bb2353ea78 100644 --- a/.maintain/sentry-node/docker-compose.yml +++ b/.maintain/sentry-node/docker-compose.yml @@ -38,13 +38,12 @@ services: - "--base-path" - "/tmp/alice" - "--chain=local" - - "--key" - - "//Alice" - "--port" - "30333" - "--validator" - - "--name" - - "AlicesNode" + - "--alice" + - "--sentry-nodes" + - "/dns4/sentry-a/tcp/30333/p2p/QmV7EhW6J6KgmNdr558RH1mPx2xGGznW7At4BhXzntRFsi" - "--reserved-nodes" - "/dns4/sentry-a/tcp/30333/p2p/QmV7EhW6J6KgmNdr558RH1mPx2xGGznW7At4BhXzntRFsi" # Not only bind to localhost. @@ -54,6 +53,8 @@ services: # - "sub-libp2p=trace" # - "--log" # - "afg=trace" + - "--log" + - "sub-authority-discovery=trace" - "--no-telemetry" - "--rpc-cors" - "all" @@ -74,28 +75,24 @@ services: - "--base-path" - "/tmp/sentry" - "--chain=local" - # Don't configure a key, as sentry-a is not a validator. - # - "--key" - # - "//Charlie" - "--port" - "30333" - # sentry-a is not a validator. - # - "--validator" - - "--name" - - "CharliesNode" + - "--charlie" - "--bootnodes" - "/dns4/validator-a/tcp/30333/p2p/QmRpheLN4JWdAnY7HGJfWFNbfkQCb6tFf4vvA6hgjMZKrR" - "--bootnodes" - "/dns4/validator-b/tcp/30333/p2p/QmSVnNf9HwVMT1Y4cK1P6aoJcEZjmoTXpjKBmAABLMnZEk" + - "--reserved-nodes" + - "/dns4/validator-a/tcp/30333/p2p/QmRpheLN4JWdAnY7HGJfWFNbfkQCb6tFf4vvA6hgjMZKrR" - "--no-telemetry" - "--rpc-cors" - "all" # Not only bind to localhost. - "--ws-external" - "--rpc-external" - # Make sure sentry-a still participates as a grandpa voter to forward - # grandpa finality gossip messages. - - "--grandpa-voter" + - "--log" + - "sub-authority-discovery=trace" + - "--sentry" validator-b: image: parity/substrate @@ -112,13 +109,10 @@ services: - "--base-path" - "/tmp/bob" - "--chain=local" - - "--key" - - "//Bob" - "--port" - "30333" - "--validator" - - "--name" - - "BobsNode" + - "--bob" - "--bootnodes" - "/dns4/validator-a/tcp/30333/p2p/QmRpheLN4JWdAnY7HGJfWFNbfkQCb6tFf4vvA6hgjMZKrR" - "--bootnodes" @@ -129,6 +123,8 @@ services: # Not only bind to localhost. - "--ws-external" - "--rpc-external" + - "--log" + - "sub-authority-discovery=trace" ui: image: polkadot-js/apps diff --git a/bin/node/cli/src/service.rs b/bin/node/cli/src/service.rs index 6dd08addc6d529da0ed731857a14e17c92cde81c..229050f818b67ec0fb53bf53aaa9a68d625cc90d 100644 --- a/bin/node/cli/src/service.rs +++ b/bin/node/cli/src/service.rs @@ -125,12 +125,14 @@ macro_rules! new_full { is_authority, force_authoring, name, - disable_grandpa + disable_grandpa, + sentry_nodes, ) = ( $config.roles.is_authority(), $config.force_authoring, $config.name.clone(), - $config.disable_grandpa + $config.disable_grandpa, + $config.network.sentry_nodes.clone(), ); // sentry nodes announce themselves as authorities to the network @@ -194,6 +196,7 @@ macro_rules! new_full { let authority_discovery = authority_discovery::AuthorityDiscovery::new( service.client(), service.network(), + sentry_nodes, service.keystore(), future03_dht_event_rx, ); diff --git a/client/authority-discovery/src/lib.rs b/client/authority-discovery/src/lib.rs index cd436237306e429231b8bd0bda809827fa26d415..9fb82f30c684dc5e162bfd1cf1450f663db13350 100644 --- a/client/authority-discovery/src/lib.rs +++ b/client/authority-discovery/src/lib.rs @@ -18,8 +18,9 @@ //! Substrate authority discovery. //! -//! This crate enables Substrate authorities to directly connect to other authorities. [`AuthorityDiscovery`] implements -//! the Future trait. By polling [`AuthorityDiscovery`] an authority: +//! This crate enables Substrate authorities to directly connect to other authorities. +//! [`AuthorityDiscovery`] implements the Future trait. By polling [`AuthorityDiscovery`] an +//! authority: //! //! //! 1. **Makes itself discoverable** @@ -54,11 +55,14 @@ use futures::task::{Context, Poll}; use futures::{Future, FutureExt, Stream, StreamExt}; use futures_timer::Delay; -use authority_discovery_primitives::{AuthorityDiscoveryApi, AuthorityId, AuthoritySignature, AuthorityPair}; +use authority_discovery_primitives::{ + AuthorityDiscoveryApi, AuthorityId, AuthoritySignature, AuthorityPair +}; use client_api::blockchain::HeaderBackend; use codec::{Decode, Encode}; use error::{Error, Result}; use log::{debug, error, log_enabled, warn}; +use libp2p::Multiaddr; use network::specialization::NetworkSpecialization; use network::{DhtEvent, ExHashT}; use primitives::crypto::{key_types, Pair}; @@ -78,7 +82,8 @@ mod schema { /// Upper bound estimation on how long one should wait before accessing the Kademlia DHT. const LIBP2P_KADEMLIA_BOOTSTRAP_TIME: Duration = Duration::from_secs(30); -/// Name of the Substrate peerset priority group for authorities discovered through the authority discovery module. +/// Name of the Substrate peerset priority group for authorities discovered through the authority +/// discovery module. const AUTHORITIES_PRIORITY_GROUP_NAME: &'static str = "authorities"; /// An `AuthorityDiscovery` makes a given authority discoverable and discovers other authorities. @@ -93,6 +98,14 @@ where client: Arc<Client>, network: Arc<Network>, + /// List of sentry node public addresses. + // + // There are 3 states: + // - None: No addresses were specified. + // - Some(vec![]): Addresses were specified, but none could be parsed as proper + // Multiaddresses. + // - Some(vec![a, b, c, ...]): Valid addresses were specified. + sentry_nodes: Option<Vec<Multiaddr>>, /// Channel we receive Dht events on. dht_event_rx: Pin<Box<dyn Stream<Item = DhtEvent> + Send>>, @@ -103,11 +116,12 @@ where /// Interval on which to query for addresses of other authorities. query_interval: Interval, - /// The network peerset interface for priority groups lets us only set an entire group, but we retrieve the - /// addresses of other authorities one by one from the network. To use the peerset interface we need to cache the - /// addresses and always overwrite the entire peerset priority group. To ensure this map doesn't grow indefinitely - /// `purge_old_authorities_from_cache` function is called each time we add a new entry. - address_cache: HashMap<AuthorityId, Vec<libp2p::Multiaddr>>, + /// The network peerset interface for priority groups lets us only set an entire group, but we + /// retrieve the addresses of other authorities one by one from the network. To use the peerset + /// interface we need to cache the addresses and always overwrite the entire peerset priority + /// group. To ensure this map doesn't grow indefinitely `purge_old_authorities_from_cache` + /// function is called each time we add a new entry. + address_cache: HashMap<AuthorityId, Vec<Multiaddr>>, phantom: PhantomData<Block>, } @@ -121,32 +135,54 @@ where Self: Future<Output = ()>, { /// Return a new authority discovery. + /// + /// Note: When specifying `sentry_nodes` this module will not advertise the public addresses of + /// the node itself but only the public addresses of its sentry nodes. pub fn new( client: Arc<Client>, network: Arc<Network>, + sentry_nodes: Vec<String>, key_store: BareCryptoStorePtr, dht_event_rx: Pin<Box<dyn Stream<Item = DhtEvent> + Send>>, ) -> Self { - // Kademlia's default time-to-live for Dht records is 36h, republishing records every 24h. Given that a node - // could restart at any point in time, one can not depend on the republishing process, thus publishing own - // external addresses should happen on an interval < 36h. + // Kademlia's default time-to-live for Dht records is 36h, republishing records every 24h. + // Given that a node could restart at any point in time, one can not depend on the + // republishing process, thus publishing own external addresses should happen on an interval + // < 36h. let publish_interval = interval_at( Instant::now() + LIBP2P_KADEMLIA_BOOTSTRAP_TIME, Duration::from_secs(12 * 60 * 60), ); - // External addresses of other authorities can change at any given point in time. The interval on which to query - // for external addresses of other authorities is a trade off between efficiency and performance. + // External addresses of other authorities can change at any given point in time. The + // interval on which to query for external addresses of other authorities is a trade off + // between efficiency and performance. let query_interval = interval_at( Instant::now() + LIBP2P_KADEMLIA_BOOTSTRAP_TIME, Duration::from_secs(10 * 60), ); + let sentry_nodes = if !sentry_nodes.is_empty() { + Some(sentry_nodes.into_iter().filter_map(|a| match a.parse() { + Ok(addr) => Some(addr), + Err(e) => { + error!( + target: "sub-authority-discovery", + "Failed to parse sentry node public address '{:?}', continuing anyways.", e, + ); + None + } + }).collect()) + } else { + None + }; + let address_cache = HashMap::new(); AuthorityDiscovery { client, network, + sentry_nodes, dht_event_rx, key_store, publish_interval, @@ -156,18 +192,20 @@ where } } - fn publish_own_ext_addresses(&mut self) -> Result<()> { - let addresses = self - .network - .external_addresses() - .into_iter() - .map(|a| { - a.with(libp2p::core::multiaddr::Protocol::P2p( + /// Publish either our own or if specified the public addresses of our sentry nodes. + fn publish_ext_addresses(&mut self) -> Result<()> { + let addresses = match &self.sentry_nodes { + Some(addrs) => addrs.clone().into_iter() + .map(|a| a.to_vec()) + .collect(), + None => self.network.external_addresses() + .into_iter() + .map(|a| a.with(libp2p::core::multiaddr::Protocol::P2p( self.network.local_peer_id().into(), - )) - }) - .map(|a| a.to_vec()) - .collect(); + ))) + .map(|a| a.to_vec()) + .collect(), + }; let mut serialized_addresses = vec![]; schema::AuthorityAddresses { addresses } @@ -217,7 +255,10 @@ where DhtEvent::ValueFound(v) => { if log_enabled!(log::Level::Debug) { let hashes = v.iter().map(|(hash, _value)| hash.clone()); - debug!(target: "sub-authority-discovery", "Value for hash '{:?}' found on Dht.", hashes); + debug!( + target: "sub-authority-discovery", + "Value for hash '{:?}' found on Dht.", hashes, + ); } self.handle_dht_value_found_event(v)?; @@ -247,8 +288,9 @@ where let block_id = BlockId::hash(self.client.info().best_hash); - // From the Dht we only get the hashed authority id. In order to retrieve the actual authority id and to ensure - // it is actually an authority, we match the hash against the hash of the authority id of all other authorities. + // From the Dht we only get the hashed authority id. In order to retrieve the actual + // authority id and to ensure it is actually an authority, we match the hash against the + // hash of the authority id of all other authorities. let authorities = self.client.runtime_api().authorities(&block_id)?; self.purge_old_authorities_from_cache(&authorities); @@ -267,7 +309,8 @@ where signature, addresses, } = schema::SignedAuthorityAddresses::decode(value).map_err(Error::DecodingProto)?; - let signature = AuthoritySignature::decode(&mut &signature[..]).map_err(Error::EncodingDecodingScale)?; + let signature = AuthoritySignature::decode(&mut &signature[..]) + .map_err(Error::EncodingDecodingScale)?; if !AuthorityPair::verify(&signature, &addresses, authority_id) { return Err(Error::VerifyingDhtPayload); @@ -293,7 +336,10 @@ where .flatten(), ); - debug!(target: "sub-authority-discovery", "Applying priority group {:#?} to peerset.", addresses); + debug!( + target: "sub-authority-discovery", + "Applying priority group {:#?} to peerset.", addresses, + ); self.network .set_priority_group(AUTHORITIES_PRIORITY_GROUP_NAME.to_string(), addresses) .map_err(Error::SettingPeersetPriorityGroup)?; @@ -368,20 +414,20 @@ where self.handle_dht_events(cx)?; if let Poll::Ready(_) = self.publish_interval.poll_next_unpin(cx) { - // Make sure to call interval.poll until it returns Async::NotReady once. Otherwise, in case one of the - // function calls within this block do a `return`, we don't call `interval.poll` again and thereby the - // underlying Tokio task is never registered with Tokio's Reactor to be woken up on the next interval - // tick. + // Make sure to call interval.poll until it returns Async::NotReady once. Otherwise, + // in case one of the function calls within this block do a `return`, we don't call + // `interval.poll` again and thereby the underlying Tokio task is never registered + // with Tokio's Reactor to be woken up on the next interval tick. while let Poll::Ready(_) = self.publish_interval.poll_next_unpin(cx) {} - self.publish_own_ext_addresses()?; + self.publish_ext_addresses()?; } if let Poll::Ready(_) = self.query_interval.poll_next_unpin(cx) { - // Make sure to call interval.poll until it returns Async::NotReady once. Otherwise, in case one of the - // function calls within this block do a `return`, we don't call `interval.poll` again and thereby the - // underlying Tokio task is never registered with Tokio's Reactor to be woken up on the next interval - // tick. + // Make sure to call interval.poll until it returns Async::NotReady once. Otherwise, + // in case one of the function calls within this block do a `return`, we don't call + // `interval.poll` again and thereby the underlying Tokio task is never registered + // with Tokio's Reactor to be woken up on the next interval tick. while let Poll::Ready(_) = self.query_interval.poll_next_unpin(cx) {} self.request_addresses_of_others()?; @@ -395,13 +441,15 @@ where Err(e) => error!(target: "sub-authority-discovery", "Poll failure: {:?}", e), }; - // Make sure to always return NotReady as this is a long running task with the same lifetime as the node itself. + // Make sure to always return NotReady as this is a long running task with the same lifetime + // as the node itself. Poll::Pending } } -/// NetworkProvider provides AuthorityDiscovery with all necessary hooks into the underlying Substrate networking. Using -/// this trait abstraction instead of NetworkService directly is necessary to unit test AuthorityDiscovery. +/// NetworkProvider provides AuthorityDiscovery with all necessary hooks into the underlying +/// Substrate networking. Using this trait abstraction instead of NetworkService directly is +/// necessary to unit test AuthorityDiscovery. pub trait NetworkProvider { /// Returns the local external addresses. fn external_addresses(&self) -> Vec<libp2p::Multiaddr>; @@ -457,14 +505,11 @@ fn hash_authority_id(id: &[u8]) -> Result<libp2p::kad::record::Key> { } fn interval_at(start: Instant, duration: Duration) -> Interval { - let stream = futures::stream::unfold((), move |_| { - let wait_time = start.saturating_duration_since(Instant::now()); + let stream = futures::stream::unfold(start, move |next| { + let time_until_next = next.saturating_duration_since(Instant::now()); - futures::future::join( - Delay::new(wait_time), - Delay::new(duration) - ).map(|_| Some(((), ()))) - }).map(drop); + Delay::new(time_until_next).map(move |_| Some(((), next + duration))) + }); Box::new(stream) } @@ -482,6 +527,67 @@ mod tests { use std::sync::{Arc, Mutex}; use test_client::runtime::Block; + #[test] + fn interval_at_with_start_now() { + let start = Instant::now(); + + let mut interval = interval_at( + std::time::Instant::now(), + std::time::Duration::from_secs(10), + ); + + futures::executor::block_on(async { + interval.next().await; + }); + + assert!( + Instant::now().saturating_duration_since(start) < Duration::from_secs(1), + "Expected low resolution instant interval to fire within less than a second.", + ); + } + + #[test] + fn interval_at_is_queuing_events() { + let start = Instant::now(); + + let interval = interval_at( + std::time::Instant::now(), + std::time::Duration::from_millis(10), + ); + + // Let's wait for 100ms, thus 10 elements should be queued up. + std::thread::sleep(Duration::from_millis(100)); + + futures::executor::block_on(async { + interval.take(10).collect::<Vec<()>>().await; + }); + + // Make sure we did not just wait for yet another 100ms (10 elements). + assert!( + Instant::now().saturating_duration_since(start) < Duration::from_millis(150), + "Expect interval to /queue/ events when not polled for a while.", + ); + } + + #[test] + fn interval_at_with_initial_delay() { + let start = Instant::now(); + + let mut interval = interval_at( + std::time::Instant::now() + Duration::from_millis(100), + std::time::Duration::from_secs(10), + ); + + futures::executor::block_on(async { + interval.next().await; + }); + + assert!( + Instant::now().saturating_duration_since(start) > Duration::from_millis(100), + "Expected interval with initial delay not to fire right away.", + ); + } + #[derive(Clone)] struct TestApi { authorities: Vec<AuthorityId>, @@ -612,7 +718,8 @@ mod tests { #[derive(Default)] struct TestNetwork { - // Whenever functions on `TestNetwork` are called, the function arguments are added to the vectors below. + // Whenever functions on `TestNetwork` are called, the function arguments are added to the + // vectors below. pub put_value_call: Arc<Mutex<Vec<(libp2p::kad::record::Key, Vec<u8>)>>>, pub get_value_call: Arc<Mutex<Vec<libp2p::kad::record::Key>>>, pub set_priority_group_call: Arc<Mutex<Vec<(String, HashSet<libp2p::Multiaddr>)>>>, @@ -645,17 +752,20 @@ mod tests { } #[test] - fn publish_own_ext_addresses_puts_record_on_dht() { + fn publish_ext_addresses_puts_record_on_dht() { let (_dht_event_tx, dht_event_rx) = channel(1000); let network: Arc<TestNetwork> = Arc::new(Default::default()); let key_store = KeyStore::new(); - let public = key_store.write().sr25519_generate_new(key_types::AUTHORITY_DISCOVERY, None).unwrap(); + let public = key_store.write() + .sr25519_generate_new(key_types::AUTHORITY_DISCOVERY, None) + .unwrap(); let test_api = Arc::new(TestApi {authorities: vec![public.into()]}); - let mut authority_discovery = - AuthorityDiscovery::new(test_api, network.clone(), key_store, dht_event_rx.boxed()); + let mut authority_discovery = AuthorityDiscovery::new( + test_api, network.clone(), vec![], key_store, dht_event_rx.boxed(), + ); - authority_discovery.publish_own_ext_addresses().unwrap(); + authority_discovery.publish_ext_addresses().unwrap(); // Expect authority discovery to put a new record onto the dht. assert_eq!(network.put_value_call.lock().unwrap().len(), 1); @@ -676,8 +786,9 @@ mod tests { let network: Arc<TestNetwork> = Arc::new(Default::default()); let key_store = KeyStore::new(); - let mut authority_discovery = - AuthorityDiscovery::new(test_api, network.clone(), key_store, dht_event_rx.boxed()); + let mut authority_discovery = AuthorityDiscovery::new( + test_api, network.clone(), vec![], key_store, dht_event_rx.boxed(), + ); authority_discovery.request_addresses_of_others().unwrap(); @@ -695,8 +806,9 @@ mod tests { let network: Arc<TestNetwork> = Arc::new(Default::default()); let key_store = KeyStore::new(); - let mut authority_discovery = - AuthorityDiscovery::new(test_api, network.clone(), key_store, dht_event_rx.boxed()); + let mut authority_discovery = AuthorityDiscovery::new( + test_api, network.clone(), vec![], key_store, dht_event_rx.boxed(), + ); // Create sample dht event. diff --git a/client/cli/src/lib.rs b/client/cli/src/lib.rs index 25ec6a17b1af8ca368e6e49cca0495df68159212..0f0edc2ba036f09d9a85b19a7d87495c2796dd20 100644 --- a/client/cli/src/lib.rs +++ b/client/cli/src/lib.rs @@ -674,12 +674,14 @@ fn fill_network_configuration( config.boot_nodes.extend(cli.bootnodes.into_iter()); config.config_path = Some(config_path.to_string_lossy().into()); config.net_config_path = config.config_path.clone(); - config.reserved_nodes.extend(cli.reserved_nodes.into_iter()); + config.reserved_nodes.extend(cli.reserved_nodes.into_iter()); if cli.reserved_only { config.non_reserved_mode = NonReservedPeerMode::Deny; } + config.sentry_nodes.extend(cli.sentry_nodes.into_iter()); + for addr in cli.listen_addr.iter() { let addr = addr.parse().ok().ok_or(error::Error::InvalidListenMultiaddress)?; config.listen_addresses.push(addr); diff --git a/client/cli/src/params.rs b/client/cli/src/params.rs index 58d7cb3ca8a72ab64ba7b9254e1e70864303fa8a..d81abaa7248eeff70748a138de8ab2330aae0a5d 100644 --- a/client/cli/src/params.rs +++ b/client/cli/src/params.rs @@ -177,6 +177,14 @@ pub struct NetworkConfigurationParams { #[structopt(long = "reserved-only")] pub reserved_only: bool, + /// Specify a list of sentry node public addresses. + #[structopt( + long = "sentry-nodes", + value_name = "URL", + conflicts_with_all = &[ "sentry" ] + )] + pub sentry_nodes: Vec<String>, + /// Listen on this multiaddress. #[structopt(long = "listen-addr", value_name = "LISTEN_ADDR")] pub listen_addr: Vec<String>, diff --git a/client/network/src/config.rs b/client/network/src/config.rs index a59ab97887296c2be73e2147d02cdd8cb9ff7161..9a55be7fe9ebdbdadfc25709761ffe809657c230 100644 --- a/client/network/src/config.rs +++ b/client/network/src/config.rs @@ -255,6 +255,8 @@ pub struct NetworkConfiguration { pub reserved_nodes: Vec<String>, /// The non-reserved peer mode. pub non_reserved_mode: NonReservedPeerMode, + /// List of sentry node public addresses. + pub sentry_nodes: Vec<String>, /// Client identifier. Sent over the wire for debugging purposes. pub client_version: String, /// Name of the node. Sent over the wire for debugging purposes. @@ -278,6 +280,7 @@ impl Default for NetworkConfiguration { out_peers: 75, reserved_nodes: Vec::new(), non_reserved_mode: NonReservedPeerMode::Accept, + sentry_nodes: Vec::new(), client_version: "unknown".into(), node_name: "unknown".into(), transport: TransportConfig::Normal { diff --git a/client/service/test/src/lib.rs b/client/service/test/src/lib.rs index d9a5d417c4694fbb71faed702dc66724afe3a07b..e3b46c77200a2f4bfeaff6e4235c41e66b1fb377 100644 --- a/client/service/test/src/lib.rs +++ b/client/service/test/src/lib.rs @@ -155,6 +155,7 @@ fn node_config<G, E: Clone> ( out_peers: 450, reserved_nodes: vec![], non_reserved_mode: NonReservedPeerMode::Accept, + sentry_nodes: vec![], client_version: "network/test/0.1".to_owned(), node_name: "unknown".to_owned(), transport: TransportConfig::Normal {