diff --git a/substrate/.maintain/sentry-node/docker-compose.yml b/substrate/.maintain/sentry-node/docker-compose.yml index 37f0bea6b49856db4da2640742fc12bb2353ea78..0ca42613b4576e4405d14c78e154fc8ed6b614b1 100644 --- a/substrate/.maintain/sentry-node/docker-compose.yml +++ b/substrate/.maintain/sentry-node/docker-compose.yml @@ -47,8 +47,8 @@ services: - "--reserved-nodes" - "/dns4/sentry-a/tcp/30333/p2p/QmV7EhW6J6KgmNdr558RH1mPx2xGGznW7At4BhXzntRFsi" # Not only bind to localhost. - - "--ws-external" - - "--rpc-external" + - "--unsafe-ws-external" + - "--unsafe-rpc-external" # - "--log" # - "sub-libp2p=trace" # - "--log" @@ -88,8 +88,8 @@ services: - "--rpc-cors" - "all" # Not only bind to localhost. - - "--ws-external" - - "--rpc-external" + - "--unsafe-ws-external" + - "--unsafe-rpc-external" - "--log" - "sub-authority-discovery=trace" - "--sentry" @@ -121,8 +121,8 @@ services: - "--rpc-cors" - "all" # Not only bind to localhost. - - "--ws-external" - - "--rpc-external" + - "--unsafe-ws-external" + - "--unsafe-rpc-external" - "--log" - "sub-authority-discovery=trace" diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 5113c6a5ada7649048873d60225d1e8cb0bbadae..4e7410a9e618d3f44d2ce8dc4b3220a90b6e1306 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -4933,9 +4933,10 @@ dependencies = [ "libp2p 0.13.2 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "parity-scale-codec 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", - "parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", "prost 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "prost-build 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "quickcheck 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", "sc-client-api 2.0.0", "sc-keystore 2.0.0", "sc-network 0.8.0", diff --git a/substrate/client/authority-discovery/Cargo.toml b/substrate/client/authority-discovery/Cargo.toml index fc1ac957858634397b7005958fe0cb400cc8f1bb..6ad25299f09433b9bc8fb56c098ddac64d9abe38 100644 --- a/substrate/client/authority-discovery/Cargo.toml +++ b/substrate/client/authority-discovery/Cargo.toml @@ -9,26 +9,27 @@ build = "build.rs" prost-build = "0.5.0" [dependencies] -sp-authority-discovery = { version = "2.0.0", path = "../../primitives/authority-discovery" } bytes = "0.4.12" -sc-client-api = { version = "2.0.0", path = "../api" } codec = { package = "parity-scale-codec", default-features = false, version = "1.0.3" } derive_more = "0.99.2" futures = "0.3.1" futures-timer = "2.0" -sc-keystore = { version = "2.0.0", path = "../keystore" } libp2p = { version = "0.13.2", default-features = false, features = ["secp256k1", "libp2p-websocket"] } log = "0.4.8" -sc-network = { version = "0.8", path = "../network" } -sp-core = { version = "2.0.0", path = "../../primitives/core" } -sp-blockchain = { version = "2.0.0", path = "../../primitives/blockchain" } prost = "0.5.0" +rand = "0.7.2" +sc-client-api = { version = "2.0.0", path = "../api" } +sc-keystore = { version = "2.0.0", path = "../keystore" } +sc-network = { version = "0.8", path = "../network" } serde_json = "1.0.41" +sp-authority-discovery = { version = "2.0.0", path = "../../primitives/authority-discovery" } +sp-blockchain = { version = "2.0.0", path = "../../primitives/blockchain" } +sp-core = { version = "2.0.0", path = "../../primitives/core" } sp-runtime = { version = "2.0.0", path = "../../primitives/runtime" } [dev-dependencies] env_logger = "0.7.0" -parking_lot = "0.9.0" +quickcheck = "0.9.0" sc-peerset = { version = "2.0.0", path = "../peerset" } -sp-test-primitives = { version = "2.0.0", path = "../../primitives/test-primitives" } sp-api = { version = "2.0.0", path = "../../primitives/api" } +sp-test-primitives = { version = "2.0.0", path = "../../primitives/test-primitives" } diff --git a/substrate/client/authority-discovery/src/addr_cache.rs b/substrate/client/authority-discovery/src/addr_cache.rs new file mode 100644 index 0000000000000000000000000000000000000000..357048de5490dd750b58371b24e63b6cec8a183e --- /dev/null +++ b/substrate/client/authority-discovery/src/addr_cache.rs @@ -0,0 +1,200 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see <http://www.gnu.org/licenses/>. + +use rand::{rngs::StdRng, seq::SliceRandom, Rng, SeedableRng}; +use std::{ + clone::Clone, + cmp::{Eq, Ord, PartialEq}, + collections::BTreeMap, + convert::AsRef, + hash::Hash, +}; + +/// The maximum number of authority connections initialized through the authority discovery module. +/// +/// In other words the maximum size of the `authority` peer set priority group. +const MAX_NUM_AUTHORITY_CONN: usize = 10; + +/// Cache of Multiaddresses of authority nodes or their sentry nodes. +// +// The network peerset interface for priority groups lets us only set an entire group, but we +// retrieve the addresses of other authorities one by one from the network. To use the peerset +// interface we need to cache the addresses and always overwrite the entire peerset priority +// group. To ensure this map doesn't grow indefinitely `purge_old_authorities_from_cache` +// function is called each time we add a new entry. +pub(super) struct AddrCache<Id, Addr> { + cache: BTreeMap<Id, Vec<Addr>>, + + /// Random number to seed address selection RNG. + /// + /// A node should only try to connect to a subset of all authorities. To choose this subset one + /// uses randomness. The choice should differ between nodes to prevent hot spots, but not within + /// each node between each update to prevent connection churn. Thus before each selection we + /// seed an RNG with the same seed. + rand_addr_selection_seed: u64, +} + +impl<Id, Addr> AddrCache<Id, Addr> +where + Id: Clone + Eq + Hash + Ord, + Addr: Clone + PartialEq + AsRef<[u8]>, +{ + pub fn new() -> Self { + AddrCache { + cache: BTreeMap::new(), + rand_addr_selection_seed: rand::thread_rng().gen(), + } + } + + pub fn insert(&mut self, id: Id, mut addresses: Vec<Addr>) { + if addresses.is_empty() { + return; + } + + addresses.sort_unstable_by(|a, b| a.as_ref().cmp(b.as_ref())); + self.cache.insert(id, addresses); + } + + // Each node should connect to a subset of all authorities. In order to prevent hot spots, this + // selection is based on randomness. Selecting randomly each time we alter the address cache + // would result in connection churn. To reduce this churn a node generates a seed on startup and + // uses this seed for a new rng on each update. (One could as well use ones peer id as a seed. + // Given that the peer id is publicly known, it would make this process predictable by others, + // which might be used as an attack.) + pub fn get_subset(&self) -> Vec<Addr> { + let mut rng = StdRng::seed_from_u64(self.rand_addr_selection_seed); + + let mut addresses = self + .cache + .iter() + .map(|(_peer_id, addresses)| { + addresses + .choose(&mut rng) + .expect("an empty address vector is never inserted into the cache") + }) + .cloned() + .collect::<Vec<Addr>>(); + + addresses.dedup(); + addresses.sort_unstable_by(|a, b| a.as_ref().cmp(b.as_ref())); + + addresses + .choose_multiple(&mut rng, MAX_NUM_AUTHORITY_CONN) + .cloned() + .collect() + } + + pub fn retain_ids(&mut self, ids: &Vec<Id>) { + let to_remove = self + .cache + .iter() + .filter(|(id, _addresses)| !ids.contains(id)) + .map(|entry| entry.0) + .cloned() + .collect::<Vec<Id>>(); + + for key in to_remove { + self.cache.remove(&key); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use quickcheck::{QuickCheck, TestResult}; + + #[test] + fn returns_addresses_of_same_authorities_on_repeated_calls() { + fn property(input: Vec<(u32, Vec<String>)>) -> TestResult { + // Expect less than 1000 authorities. + if input.len() > 1000 { + return TestResult::discard(); + } + + // Expect less than 100 addresses per authority. + for i in &input { + if i.1.len() > 100 { + return TestResult::discard(); + } + } + + let mut c = AddrCache::new(); + + for (id, addresses) in input { + c.insert(id, addresses); + } + + let result = c.get_subset(); + assert!(result.len() <= MAX_NUM_AUTHORITY_CONN); + + for _ in 1..100 { + assert_eq!(c.get_subset(), result); + } + + TestResult::passed() + } + + QuickCheck::new() + .max_tests(10) + .quickcheck(property as fn(Vec<(u32, Vec<String>)>) -> TestResult) + } + + #[test] + fn returns_same_addresses_of_first_authority_when_second_authority_changes() { + let mut c = AddrCache::new(); + + // Insert addresses of first authority. + let addresses = (1..100) + .map(|i| format!("{:?}", i)) + .collect::<Vec<String>>(); + c.insert(1, addresses); + let first_subset = c.get_subset(); + assert_eq!(1, first_subset.len()); + + // Insert address of second authority. + c.insert(2, vec!["a".to_string()]); + let second_subset = c.get_subset(); + assert_eq!(2, second_subset.len()); + + // Expect same address of first authority. + assert!(second_subset.contains(&first_subset[0])); + + // Alter address of second authority. + c.insert(2, vec!["b".to_string()]); + let second_subset = c.get_subset(); + assert_eq!(2, second_subset.len()); + + // Expect same address of first authority. + assert!(second_subset.contains(&first_subset[0])); + } + + #[test] + fn retains_only_entries_of_provided_ids() { + let mut cache = AddrCache::new(); + + cache.insert(1, vec![vec![10]]); + cache.insert(2, vec![vec![20]]); + cache.insert(3, vec![vec![30]]); + + cache.retain_ids(&vec![1, 3]); + + let mut subset = cache.get_subset(); + subset.sort(); + + assert_eq!(vec![vec![10], vec![30]], subset); + } +} diff --git a/substrate/client/authority-discovery/src/error.rs b/substrate/client/authority-discovery/src/error.rs index fdbd5b31fe2c1e6badb715072a08c9600bde42d0..b999df5d971567b1d6e6a051c6d72fcf8ab7d043 100644 --- a/substrate/client/authority-discovery/src/error.rs +++ b/substrate/client/authority-discovery/src/error.rs @@ -22,6 +22,10 @@ pub type Result<T> = std::result::Result<T, Error>; /// Error type for the authority discovery module. #[derive(Debug, derive_more::Display, derive_more::From)] pub enum Error { + /// Received dht value found event with records with different keys. + ReceivingDhtValueFoundEventWithDifferentKeys, + /// Received dht value found event with no records. + ReceivingDhtValueFoundEventWithNoRecords, /// Failed to verify a dht payload with the given signature. VerifyingDhtPayload, /// Failed to hash the authority id to be used as a dht key. diff --git a/substrate/client/authority-discovery/src/lib.rs b/substrate/client/authority-discovery/src/lib.rs index fe3da18ca642c3c89f7c4a6bc274e93998f52cec..3896100c01b5b022ba5237ec5087557a74585f70 100644 --- a/substrate/client/authority-discovery/src/lib.rs +++ b/substrate/client/authority-discovery/src/lib.rs @@ -45,7 +45,6 @@ //! 4. Adds the retrieved external addresses as priority nodes to the peerset. use std::collections::{HashMap, HashSet}; use std::convert::TryInto; -use std::iter::FromIterator; use std::marker::PhantomData; use std::pin::Pin; use std::sync::Arc; @@ -55,26 +54,26 @@ use futures::task::{Context, Poll}; use futures::{Future, FutureExt, Stream, StreamExt}; use futures_timer::Delay; -use sp_authority_discovery::{ - AuthorityDiscoveryApi, AuthorityId, AuthoritySignature, AuthorityPair -}; -use sc_client_api::blockchain::HeaderBackend; use codec::{Decode, Encode}; use error::{Error, Result}; -use log::{debug, error, log_enabled, warn}; use libp2p::Multiaddr; +use log::{debug, error, log_enabled, warn}; +use prost::Message; +use sc_client_api::blockchain::HeaderBackend; use sc_network::specialization::NetworkSpecialization; use sc_network::{DhtEvent, ExHashT, NetworkStateInfo}; +use sp_authority_discovery::{AuthorityDiscoveryApi, AuthorityId, AuthoritySignature, AuthorityPair}; use sp_core::crypto::{key_types, Pair}; use sp_core::traits::BareCryptoStorePtr; -use prost::Message; use sp_runtime::generic::BlockId; use sp_runtime::traits::{Block as BlockT, ProvideRuntimeApi}; +use addr_cache::AddrCache; #[cfg(test)] mod tests; mod error; +mod addr_cache; /// Dht payload schemas generated from Protobuf definitions via Prost crate in build.rs. mod schema { include!(concat!(env!("OUT_DIR"), "/authority_discovery.rs")); @@ -89,12 +88,6 @@ const LIBP2P_KADEMLIA_BOOTSTRAP_TIME: Duration = Duration::from_secs(30); /// discovery module. const AUTHORITIES_PRIORITY_GROUP_NAME: &'static str = "authorities"; -/// The maximum number of sentry node public addresses that we accept per authority. -/// -/// Everything above this threshold should be dropped to prevent a single authority from filling up -/// our peer set priority group. -const MAX_NUM_SENTRY_ADDRESSES_PER_AUTHORITY: usize = 5; - /// An `AuthorityDiscovery` makes a given authority discoverable and discovers other authorities. pub struct AuthorityDiscovery<Client, Network, Block> where @@ -124,12 +117,7 @@ where /// Interval on which to query for addresses of other authorities. query_interval: Interval, - /// The network peerset interface for priority groups lets us only set an entire group, but we - /// retrieve the addresses of other authorities one by one from the network. To use the peerset - /// interface we need to cache the addresses and always overwrite the entire peerset priority - /// group. To ensure this map doesn't grow indefinitely `purge_old_authorities_from_cache` - /// function is called each time we add a new entry. - address_cache: HashMap<AuthorityId, Vec<Multiaddr>>, + addr_cache: addr_cache::AddrCache<AuthorityId, Multiaddr>, phantom: PhantomData<Block>, } @@ -182,22 +170,12 @@ where } }).collect::<Vec<Multiaddr>>(); - if addrs.len() > MAX_NUM_SENTRY_ADDRESSES_PER_AUTHORITY { - warn!( - target: "sub-authority-discovery", - "More than MAX_NUM_SENTRY_ADDRESSES_PER_AUTHORITY ({:?}) were specified. Other \ - nodes will likely ignore the remainder.", - MAX_NUM_SENTRY_ADDRESSES_PER_AUTHORITY, - ); - } - Some(addrs) } else { None }; - - let address_cache = HashMap::new(); + let addr_cache = AddrCache::new(); AuthorityDiscovery { client, @@ -207,7 +185,7 @@ where key_store, publish_interval, query_interval, - address_cache, + addr_cache, phantom: PhantomData, } } @@ -304,86 +282,70 @@ where &mut self, values: Vec<(libp2p::kad::record::Key, Vec<u8>)>, ) -> Result<()> { - debug!(target: "sub-authority-discovery", "Got Dht value from network."); - - let block_id = BlockId::hash(self.client.info().best_hash); - - // From the Dht we only get the hashed authority id. In order to retrieve the actual - // authority id and to ensure it is actually an authority, we match the hash against the - // hash of the authority id of all other authorities. - let authorities = self.client.runtime_api().authorities(&block_id)?; - self.purge_old_authorities_from_cache(&authorities); - - let authorities = authorities - .into_iter() - .map(|id| hash_authority_id(id.as_ref()).map(|h| (h, id))) - .collect::<Result<HashMap<_, _>>>()?; - - for (key, value) in values.iter() { - // Check if the event origins from an authority in the current authority set. - let authority_id: &AuthorityId = authorities - .get(key) - .ok_or(Error::MatchingHashedAuthorityIdWithAuthorityId)?; - - let schema::SignedAuthorityAddresses { - signature, - addresses, - } = schema::SignedAuthorityAddresses::decode(value).map_err(Error::DecodingProto)?; - let signature = AuthoritySignature::decode(&mut &signature[..]) - .map_err(Error::EncodingDecodingScale)?; - - if !AuthorityPair::verify(&signature, &addresses, authority_id) { - return Err(Error::VerifyingDhtPayload); + // Ensure `values` is not empty and all its keys equal. + let remote_key = values.iter().fold(Ok(None), |acc, (key, _)| { + match acc { + Ok(None) => Ok(Some(key.clone())), + Ok(Some(ref prev_key)) if prev_key != key => Err( + Error::ReceivingDhtValueFoundEventWithDifferentKeys + ), + x @ Ok(_) => x, + Err(e) => Err(e), } - - let mut addresses: Vec<libp2p::Multiaddr> = schema::AuthorityAddresses::decode(addresses) - .map(|a| a.addresses) - .map_err(Error::DecodingProto)? + })?.ok_or(Error::ReceivingDhtValueFoundEventWithNoRecords)?; + + let authorities = { + let block_id = BlockId::hash(self.client.info().best_hash); + // From the Dht we only get the hashed authority id. In order to retrieve the actual + // authority id and to ensure it is actually an authority, we match the hash against the + // hash of the authority id of all other authorities. + let authorities = self.client.runtime_api().authorities(&block_id)?; + self.addr_cache.retain_ids(&authorities); + authorities .into_iter() - .map(|a| a.try_into()) - .collect::<std::result::Result<_, _>>() - .map_err(Error::ParsingMultiaddress)?; - - if addresses.len() > MAX_NUM_SENTRY_ADDRESSES_PER_AUTHORITY { - warn!( - target: "sub-authority-discovery", - "Got more than MAX_NUM_SENTRY_ADDRESSES_PER_AUTHORITY ({:?}) for Authority \ - '{:?}' from DHT, dropping the remainder.", - MAX_NUM_SENTRY_ADDRESSES_PER_AUTHORITY, authority_id, - ); - addresses = addresses.into_iter() - .take(MAX_NUM_SENTRY_ADDRESSES_PER_AUTHORITY) - .collect(); - } + .map(|id| hash_authority_id(id.as_ref()).map(|h| (h, id))) + .collect::<Result<HashMap<_, _>>>()? + }; - self.address_cache.insert(authority_id.clone(), addresses); - } + // Check if the event origins from an authority in the current authority set. + let authority_id: &AuthorityId = authorities + .get(&remote_key) + .ok_or(Error::MatchingHashedAuthorityIdWithAuthorityId)?; + + let remote_addresses: Vec<Multiaddr> = values.into_iter() + .map(|(_k, v)| { + let schema::SignedAuthorityAddresses { + signature, + addresses, + } = schema::SignedAuthorityAddresses::decode(v).map_err(Error::DecodingProto)?; + let signature = AuthoritySignature::decode(&mut &signature[..]) + .map_err(Error::EncodingDecodingScale)?; + + if !AuthorityPair::verify(&signature, &addresses, authority_id) { + return Err(Error::VerifyingDhtPayload); + } - // Let's update the peerset priority group with all the addresses we have in our cache. + let addresses: Vec<libp2p::Multiaddr> = schema::AuthorityAddresses::decode(addresses) + .map(|a| a.addresses) + .map_err(Error::DecodingProto)? + .into_iter() + .map(|a| a.try_into()) + .collect::<std::result::Result<_, _>>() + .map_err(Error::ParsingMultiaddress)?; - let addresses = HashSet::from_iter( - self.address_cache - .iter() - .map(|(_peer_id, addresses)| addresses.clone()) - .flatten(), - ); + Ok(addresses) + }) + .collect::<Result<Vec<Vec<Multiaddr>>>>()? + .into_iter().flatten().collect(); - debug!( - target: "sub-authority-discovery", - "Applying priority group {:#?} to peerset.", addresses, - ); - self.network - .set_priority_group(AUTHORITIES_PRIORITY_GROUP_NAME.to_string(), addresses) - .map_err(Error::SettingPeersetPriorityGroup)?; + if !remote_addresses.is_empty() { + self.addr_cache.insert(authority_id.clone(), remote_addresses); + self.update_peer_set_priority_group()?; + } Ok(()) } - fn purge_old_authorities_from_cache(&mut self, current_authorities: &Vec<AuthorityId>) { - self.address_cache - .retain(|peer_id, _addresses| current_authorities.contains(peer_id)) - } - /// Retrieve all local authority discovery private keys that are within the current authority /// set. fn get_priv_keys_within_authority_set(&mut self) -> Result<Vec<AuthorityPair>> { @@ -429,6 +391,22 @@ where Ok(intersection) } + + /// Update the peer set 'authority' priority group. + // + fn update_peer_set_priority_group(&self) -> Result<()>{ + let addresses = self.addr_cache.get_subset(); + + debug!( + target: "sub-authority-discovery", + "Applying priority group {:?} to peerset.", addresses, + ); + self.network + .set_priority_group(AUTHORITIES_PRIORITY_GROUP_NAME.to_string(), addresses.into_iter().collect()) + .map_err(Error::SettingPeersetPriorityGroup)?; + + Ok(()) + } } impl<Client, Network, Block> Future for AuthorityDiscovery<Client, Network, Block> diff --git a/substrate/client/authority-discovery/src/tests.rs b/substrate/client/authority-discovery/src/tests.rs index bd81e791db14eb48f38b08ac51c7cc3999c5b5ec..b0e841b594f37ef9d1d1f19994770573e32a08a7 100644 --- a/substrate/client/authority-discovery/src/tests.rs +++ b/substrate/client/authority-discovery/src/tests.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see <http://www.gnu.org/licenses/>. -use std::sync::{Arc, Mutex}; +use std::{iter::FromIterator, sync::{Arc, Mutex}}; use futures::channel::mpsc::channel; use futures::executor::block_on;