From e08452e9170c696c77f307a9bb5b805070327187 Mon Sep 17 00:00:00 2001
From: Max Inden <mail@max-inden.de>
Date: Fri, 20 Dec 2019 22:07:21 +0100
Subject: [PATCH] client/authority-discovery: Limit number of connections to
 authorities

Instead of connecting to all sentry nodes of all authorities, with this
patch the authority discovery module does the following:

- Choose one sentry node per authority at random.

- Choose MAX_NUM_AUTHORITY_CONN out of the above at random.

The module uses randomness to prevent hot spots, e.g. all nodes trying
to connect to a single node. If the authority discovery module would
choose the nodes to connect to at random on each new address that it
learns of, the node would go through a lot of connection churn.  Instead
it creates a random seed at start up and uses this seed for its RNG on
each update cycle.
---
 Cargo.lock                              |   1 +
 client/authority-discovery/Cargo.toml   |  13 +-
 client/authority-discovery/src/error.rs |   4 +
 client/authority-discovery/src/lib.rs   | 183 ++++++++++++++----------
 4 files changed, 116 insertions(+), 85 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 67fa81af0e0..c65a4cfc4be 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4927,6 +4927,7 @@ dependencies = [
  "parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
  "prost 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
  "prost-build 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "rand 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
  "sc-client-api 2.0.0",
  "sc-keystore 2.0.0",
  "sc-network 0.8.0",
diff --git a/client/authority-discovery/Cargo.toml b/client/authority-discovery/Cargo.toml
index 3564ad477d4..d6f6ac1edc2 100644
--- a/client/authority-discovery/Cargo.toml
+++ b/client/authority-discovery/Cargo.toml
@@ -9,21 +9,22 @@ build = "build.rs"
 prost-build = "0.5.0"
 
 [dependencies]
-sp-authority-discovery = { version = "2.0.0", path = "../../primitives/authority-discovery" }
 bytes = "0.4.12"
-sc-client-api = { version = "2.0.0", path = "../api" }
 codec = { package = "parity-scale-codec", default-features = false, version = "1.0.3" }
 derive_more = "0.99.2"
 futures = "0.3.1"
 futures-timer = "2.0"
-sc-keystore = { version = "2.0.0", path = "../keystore" }
 libp2p = { version = "0.13.0", default-features = false, features = ["secp256k1", "libp2p-websocket"] }
 log = "0.4.8"
-sc-network = { version = "0.8", path = "../network" }
-sp-core = { version = "2.0.0", path = "../../primitives/core" }
-sp-blockchain = { version = "2.0.0", path = "../../primitives/blockchain" }
 prost = "0.5.0"
+rand = "0.7.2"
+sc-client-api = { version = "2.0.0", path = "../api" }
+sc-keystore = { version = "2.0.0", path = "../keystore" }
+sc-network = { version = "0.8", path = "../network" }
 serde_json = "1.0.41"
+sp-authority-discovery = { version = "2.0.0", path = "../../primitives/authority-discovery" }
+sp-blockchain = { version = "2.0.0", path = "../../primitives/blockchain" }
+sp-core = { version = "2.0.0", path = "../../primitives/core" }
 sp-runtime = { version = "2.0.0", path = "../../primitives/runtime" }
 
 [dev-dependencies]
diff --git a/client/authority-discovery/src/error.rs b/client/authority-discovery/src/error.rs
index fdbd5b31fe2..b999df5d971 100644
--- a/client/authority-discovery/src/error.rs
+++ b/client/authority-discovery/src/error.rs
@@ -22,6 +22,10 @@ pub type Result<T> = std::result::Result<T, Error>;
 /// Error type for the authority discovery module.
 #[derive(Debug, derive_more::Display, derive_more::From)]
 pub enum Error {
+	/// Received dht value found event with records with different keys.
+	ReceivingDhtValueFoundEventWithDifferentKeys,
+	/// Received dht value found event with no records.
+	ReceivingDhtValueFoundEventWithNoRecords,
 	/// Failed to verify a dht payload with the given signature.
 	VerifyingDhtPayload,
 	/// Failed to hash the authority id to be used as a dht key.
diff --git a/client/authority-discovery/src/lib.rs b/client/authority-discovery/src/lib.rs
index d8cb074395b..f6270b57b5b 100644
--- a/client/authority-discovery/src/lib.rs
+++ b/client/authority-discovery/src/lib.rs
@@ -45,7 +45,6 @@
 //!    4. Adds the retrieved external addresses as priority nodes to the peerset.
 use std::collections::{HashMap, HashSet};
 use std::convert::TryInto;
-use std::iter::FromIterator;
 use std::marker::PhantomData;
 use std::pin::Pin;
 use std::sync::Arc;
@@ -55,19 +54,18 @@ use futures::task::{Context, Poll};
 use futures::{Future, FutureExt, Stream, StreamExt};
 use futures_timer::Delay;
 
-use sp_authority_discovery::{
-	AuthorityDiscoveryApi, AuthorityId, AuthoritySignature, AuthorityPair
-};
-use sc_client_api::blockchain::HeaderBackend;
 use codec::{Decode, Encode};
 use error::{Error, Result};
-use log::{debug, error, log_enabled, warn};
 use libp2p::Multiaddr;
+use log::{debug, error, log_enabled, warn};
+use prost::Message;
+use rand::{Rng, rngs::StdRng, SeedableRng, seq::SliceRandom};
+use sc_client_api::blockchain::HeaderBackend;
 use sc_network::specialization::NetworkSpecialization;
 use sc_network::{DhtEvent, ExHashT};
+use sp_authority_discovery::{AuthorityDiscoveryApi, AuthorityId, AuthoritySignature, AuthorityPair};
 use sp_core::crypto::{key_types, Pair};
 use sp_core::traits::BareCryptoStorePtr;
-use prost::Message;
 use sp_runtime::generic::BlockId;
 use sp_runtime::traits::{Block as BlockT, ProvideRuntimeApi};
 
@@ -86,11 +84,10 @@ const LIBP2P_KADEMLIA_BOOTSTRAP_TIME: Duration = Duration::from_secs(30);
 /// discovery module.
 const AUTHORITIES_PRIORITY_GROUP_NAME: &'static str = "authorities";
 
-/// The maximum number of sentry node public addresses that we accept per authority.
+/// The maximum number of authority connections initialized through the authority discovery module.
 ///
-/// Everything above this threshold should be dropped to prevent a single authority from filling up
-/// our peer set priority group.
-const MAX_NUM_SENTRY_ADDRESSES_PER_AUTHORITY: usize = 5;
+/// In other words the maximum size of the `authority` peer set priority group.
+const MAX_NUM_AUTHORITY_CONN: usize = 10;
 
 /// An `AuthorityDiscovery` makes a given authority discoverable and discovers other authorities.
 pub struct AuthorityDiscovery<Client, Network, Block>
@@ -122,12 +119,21 @@ where
 	/// Interval on which to query for addresses of other authorities.
 	query_interval: Interval,
 
-	/// The network peerset interface for priority groups lets us only set an entire group, but we
-	/// retrieve the addresses of other authorities one by one from the network. To use the peerset
-	/// interface we need to cache the addresses and always overwrite the entire peerset priority
-	/// group. To ensure this map doesn't grow indefinitely `purge_old_authorities_from_cache`
-	/// function is called each time we add a new entry.
+	/// Cache of Multiaddresses of authority nodes or their sentry nodes.
+	//
+	// The network peerset interface for priority groups lets us only set an entire group, but we
+	// retrieve the addresses of other authorities one by one from the network. To use the peerset
+	// interface we need to cache the addresses and always overwrite the entire peerset priority
+	// group. To ensure this map doesn't grow indefinitely `purge_old_authorities_from_cache`
+	// function is called each time we add a new entry.
 	address_cache: HashMap<AuthorityId, Vec<Multiaddr>>,
+	/// Random number to seed address selection RNG.
+	///
+	/// A node should only try to connect to a subset of all authorities. To choose this subset one
+	/// uses randomness. The choice should differ between nodes to prevent hot spots, but not within
+	/// each node between each update to prevent connection churn. Thus before each selection we
+	/// seed an RNG with the same seed.
+	rand_addr_selection_seed: u64,
 
 	phantom: PhantomData<Block>,
 }
@@ -180,15 +186,6 @@ where
 				}
 			}).collect::<Vec<Multiaddr>>();
 
-			if addrs.len() > MAX_NUM_SENTRY_ADDRESSES_PER_AUTHORITY {
-				warn!(
-					target: "sub-authority-discovery",
-					"More than MAX_NUM_SENTRY_ADDRESSES_PER_AUTHORITY ({:?}) were specified. Other \
-					nodes will likely ignore the remainder.",
-					MAX_NUM_SENTRY_ADDRESSES_PER_AUTHORITY,
-				);
-			}
-
 			Some(addrs)
 		} else {
 			None
@@ -196,6 +193,7 @@ where
 
 
 		let address_cache = HashMap::new();
+		let rand_addr_selection_seed = rand::thread_rng().gen();
 
 		AuthorityDiscovery {
 			client,
@@ -206,6 +204,7 @@ where
 			publish_interval,
 			query_interval,
 			address_cache,
+			rand_addr_selection_seed,
 			phantom: PhantomData,
 		}
 	}
@@ -302,31 +301,41 @@ where
 		&mut self,
 		values: Vec<(libp2p::kad::record::Key, Vec<u8>)>,
 	) -> Result<()> {
-		debug!(target: "sub-authority-discovery", "Got Dht value from network.");
-
-		let block_id = BlockId::hash(self.client.info().best_hash);
-
-		// From the Dht we only get the hashed authority id. In order to retrieve the actual
-		// authority id and to ensure it is actually an authority, we match the hash against the
-		// hash of the authority id of all other authorities.
-		let authorities = self.client.runtime_api().authorities(&block_id)?;
-		self.purge_old_authorities_from_cache(&authorities);
-
-		let authorities = authorities
-			.into_iter()
-			.map(|id| hash_authority_id(id.as_ref()).map(|h| (h, id)))
-			.collect::<Result<HashMap<_, _>>>()?;
+		// Ensure `values` is not empty and all its keys equal.
+		let remote_key = values.iter().fold(Ok(None), |acc, (key, _)| {
+			match acc {
+				Ok(None) => Ok(Some(key.clone())),
+				Ok(Some(ref prev_key)) if prev_key != key => Err(
+					Error::ReceivingDhtValueFoundEventWithDifferentKeys
+				),
+				x @ Ok(_) => x,
+				Err(e) => Err(e),
+			}
+		})?.ok_or(Error::ReceivingDhtValueFoundEventWithNoRecords)?;
+
+		let authorities = {
+			let block_id = BlockId::hash(self.client.info().best_hash);
+			// From the Dht we only get the hashed authority id. In order to retrieve the actual
+			// authority id and to ensure it is actually an authority, we match the hash against the
+			// hash of the authority id of all other authorities.
+			let authorities = self.client.runtime_api().authorities(&block_id)?;
+			self.purge_old_authorities_from_cache(&authorities);
+			authorities
+				.into_iter()
+				.map(|id| hash_authority_id(id.as_ref()).map(|h| (h, id)))
+				.collect::<Result<HashMap<_, _>>>()?
+		};
 
-		for (key, value) in values.iter() {
-			// Check if the event origins from an authority in the current authority set.
-			let authority_id: &AuthorityId = authorities
-				.get(key)
-				.ok_or(Error::MatchingHashedAuthorityIdWithAuthorityId)?;
+		// Check if the event origins from an authority in the current authority set.
+		let authority_id: &AuthorityId = authorities
+			.get(&remote_key)
+			.ok_or(Error::MatchingHashedAuthorityIdWithAuthorityId)?;
 
+		let mut remote_addresses: Vec<Multiaddr> = values.into_iter().map(|(_k, v)| {
 			let schema::SignedAuthorityAddresses {
 				signature,
 				addresses,
-			} = schema::SignedAuthorityAddresses::decode(value).map_err(Error::DecodingProto)?;
+			} = schema::SignedAuthorityAddresses::decode(v).map_err(Error::DecodingProto)?;
 			let signature = AuthoritySignature::decode(&mut &signature[..])
 				.map_err(Error::EncodingDecodingScale)?;
 
@@ -334,7 +343,7 @@ where
 				return Err(Error::VerifyingDhtPayload);
 			}
 
-			let mut addresses: Vec<libp2p::Multiaddr> = schema::AuthorityAddresses::decode(addresses)
+			let addresses: Vec<libp2p::Multiaddr> = schema::AuthorityAddresses::decode(addresses)
 				.map(|a| a.addresses)
 				.map_err(Error::DecodingProto)?
 				.into_iter()
@@ -342,38 +351,19 @@ where
 				.collect::<std::result::Result<_, _>>()
 				.map_err(Error::ParsingMultiaddress)?;
 
-			if addresses.len() > MAX_NUM_SENTRY_ADDRESSES_PER_AUTHORITY {
-				warn!(
-					target: "sub-authority-discovery",
-					"Got more than MAX_NUM_SENTRY_ADDRESSES_PER_AUTHORITY ({:?}) for Authority \
-					'{:?}' from DHT, dropping the remainder.",
-					MAX_NUM_SENTRY_ADDRESSES_PER_AUTHORITY,	authority_id,
-				);
-				addresses = addresses.into_iter()
-					.take(MAX_NUM_SENTRY_ADDRESSES_PER_AUTHORITY)
-					.collect();
-			}
-
-			self.address_cache.insert(authority_id.clone(), addresses);
+			Ok(addresses)
+		})
+			// TODO: Can we do this nicer?
+			.collect::<Result<Vec<Vec<Multiaddr>>>>()?
+			.into_iter().flatten().collect();
+
+		if !remote_addresses.is_empty() {
+			// TODO: Handle unwrap
+			remote_addresses.sort_by(|a, b| a.as_ref().partial_cmp(b.as_ref()).unwrap());
+			self.address_cache.insert(authority_id.clone(), remote_addresses);
+			self.update_peer_set_priority_group()?;
 		}
 
-		// Let's update the peerset priority group with all the addresses we have in our cache.
-
-		let addresses = HashSet::from_iter(
-			self.address_cache
-				.iter()
-				.map(|(_peer_id, addresses)| addresses.clone())
-				.flatten(),
-		);
-
-		debug!(
-			target: "sub-authority-discovery",
-			"Applying priority group {:#?} to peerset.", addresses,
-		);
-		self.network
-			.set_priority_group(AUTHORITIES_PRIORITY_GROUP_NAME.to_string(), addresses)
-			.map_err(Error::SettingPeersetPriorityGroup)?;
-
 		Ok(())
 	}
 
@@ -427,6 +417,41 @@ where
 
 		Ok(intersection)
 	}
+
+	/// Update the peer set 'authority' priority group.
+	//
+	// Each node should connect to a subset of all authorities. In order to prevent hot spots, this
+	// selection is based on randomness. Selecting randomly each time we change the address cache
+	// would result in connection churn. Instead a node generates a seed on startup and uses this
+	// seed for a new rng on each update. (One could as well use ones peer id as a seed. Given that
+	// the peer id is publicly known, it would make this process predictable by others, which might
+	// be used as an attack.)
+	fn update_peer_set_priority_group(&self) -> Result<()>{
+		let mut rng = StdRng::seed_from_u64(self.rand_addr_selection_seed);
+
+		let mut addresses = self.address_cache
+			.iter()
+			.map(|(_peer_id, addresses)| addresses.choose(&mut rng)
+				 .expect("an empty address vector is never inserted into the cache"))
+			.cloned()
+			.collect::<Vec<Multiaddr>>();
+
+		addresses.dedup();
+		// TODO: Handle unwrap
+		addresses.sort_by(|a, b| a.as_ref().partial_cmp(b.as_ref()).unwrap());
+
+		addresses = addresses.choose_multiple(&mut rng, MAX_NUM_AUTHORITY_CONN).cloned().collect();
+
+		debug!(
+			target: "sub-authority-discovery",
+			"Applying priority group {:?} to peerset.", addresses,
+		);
+		self.network
+			.set_priority_group(AUTHORITIES_PRIORITY_GROUP_NAME.to_string(), addresses.into_iter().collect())
+			.map_err(Error::SettingPeersetPriorityGroup)?;
+
+		Ok(())
+	}
 }
 
 impl<Client, Network, Block> Future for AuthorityDiscovery<Client, Network, Block>
@@ -546,16 +571,16 @@ fn interval_at(start: Instant, duration: Duration) -> Interval {
 
 #[cfg(test)]
 mod tests {
-	use super::*;
-	use sp_api::{ApiExt, Core, RuntimeVersion, StorageProof};
 	use futures::channel::mpsc::channel;
 	use futures::executor::block_on;
 	use futures::future::poll_fn;
+	use sp_api::{ApiExt, Core, RuntimeVersion, StorageProof};
 	use sp_core::{ExecutionContext, NativeOrEncoded, testing::KeyStore};
 	use sp_runtime::traits::Zero;
 	use sp_runtime::traits::{ApiRef, Block as BlockT, NumberFor, ProvideRuntimeApi};
-	use std::sync::{Arc, Mutex};
 	use sp_test_primitives::Block;
+	use std::{iter::FromIterator, sync::{Arc, Mutex}};
+	use super::*;
 
 	#[test]
 	fn interval_at_with_start_now() {
-- 
GitLab