Skip to content
Snippets Groups Projects
Unverified Commit e08452e9 authored by Max Inden's avatar Max Inden
Browse files

client/authority-discovery: Limit number of connections to authorities

Instead of connecting to all sentry nodes of all authorities, with this
patch the authority discovery module does the following:

- Choose one sentry node per authority at random.

- Choose MAX_NUM_AUTHORITY_CONN out of the above at random.

The module uses randomness to prevent hot spots, e.g. all nodes trying
to connect to a single node. If the authority discovery module would
choose the nodes to connect to at random on each new address that it
learns of, the node would go through a lot of connection churn.  Instead
it creates a random seed at start up and uses this seed for its RNG on
each update cycle.
parent e3962b68
Branches
No related merge requests found
......@@ -4927,6 +4927,7 @@ dependencies = [
"parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
"prost 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"prost-build 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
"sc-client-api 2.0.0",
"sc-keystore 2.0.0",
"sc-network 0.8.0",
......
......@@ -9,21 +9,22 @@ build = "build.rs"
prost-build = "0.5.0"
[dependencies]
sp-authority-discovery = { version = "2.0.0", path = "../../primitives/authority-discovery" }
bytes = "0.4.12"
sc-client-api = { version = "2.0.0", path = "../api" }
codec = { package = "parity-scale-codec", default-features = false, version = "1.0.3" }
derive_more = "0.99.2"
futures = "0.3.1"
futures-timer = "2.0"
sc-keystore = { version = "2.0.0", path = "../keystore" }
libp2p = { version = "0.13.0", default-features = false, features = ["secp256k1", "libp2p-websocket"] }
log = "0.4.8"
sc-network = { version = "0.8", path = "../network" }
sp-core = { version = "2.0.0", path = "../../primitives/core" }
sp-blockchain = { version = "2.0.0", path = "../../primitives/blockchain" }
prost = "0.5.0"
rand = "0.7.2"
sc-client-api = { version = "2.0.0", path = "../api" }
sc-keystore = { version = "2.0.0", path = "../keystore" }
sc-network = { version = "0.8", path = "../network" }
serde_json = "1.0.41"
sp-authority-discovery = { version = "2.0.0", path = "../../primitives/authority-discovery" }
sp-blockchain = { version = "2.0.0", path = "../../primitives/blockchain" }
sp-core = { version = "2.0.0", path = "../../primitives/core" }
sp-runtime = { version = "2.0.0", path = "../../primitives/runtime" }
[dev-dependencies]
......
......@@ -22,6 +22,10 @@ pub type Result<T> = std::result::Result<T, Error>;
/// Error type for the authority discovery module.
#[derive(Debug, derive_more::Display, derive_more::From)]
pub enum Error {
/// Received dht value found event with records with different keys.
ReceivingDhtValueFoundEventWithDifferentKeys,
/// Received dht value found event with no records.
ReceivingDhtValueFoundEventWithNoRecords,
/// Failed to verify a dht payload with the given signature.
VerifyingDhtPayload,
/// Failed to hash the authority id to be used as a dht key.
......
......@@ -45,7 +45,6 @@
//! 4. Adds the retrieved external addresses as priority nodes to the peerset.
use std::collections::{HashMap, HashSet};
use std::convert::TryInto;
use std::iter::FromIterator;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::Arc;
......@@ -55,19 +54,18 @@ use futures::task::{Context, Poll};
use futures::{Future, FutureExt, Stream, StreamExt};
use futures_timer::Delay;
use sp_authority_discovery::{
AuthorityDiscoveryApi, AuthorityId, AuthoritySignature, AuthorityPair
};
use sc_client_api::blockchain::HeaderBackend;
use codec::{Decode, Encode};
use error::{Error, Result};
use log::{debug, error, log_enabled, warn};
use libp2p::Multiaddr;
use log::{debug, error, log_enabled, warn};
use prost::Message;
use rand::{Rng, rngs::StdRng, SeedableRng, seq::SliceRandom};
use sc_client_api::blockchain::HeaderBackend;
use sc_network::specialization::NetworkSpecialization;
use sc_network::{DhtEvent, ExHashT};
use sp_authority_discovery::{AuthorityDiscoveryApi, AuthorityId, AuthoritySignature, AuthorityPair};
use sp_core::crypto::{key_types, Pair};
use sp_core::traits::BareCryptoStorePtr;
use prost::Message;
use sp_runtime::generic::BlockId;
use sp_runtime::traits::{Block as BlockT, ProvideRuntimeApi};
......@@ -86,11 +84,10 @@ const LIBP2P_KADEMLIA_BOOTSTRAP_TIME: Duration = Duration::from_secs(30);
/// discovery module.
const AUTHORITIES_PRIORITY_GROUP_NAME: &'static str = "authorities";
/// The maximum number of sentry node public addresses that we accept per authority.
/// The maximum number of authority connections initialized through the authority discovery module.
///
/// Everything above this threshold should be dropped to prevent a single authority from filling up
/// our peer set priority group.
const MAX_NUM_SENTRY_ADDRESSES_PER_AUTHORITY: usize = 5;
/// In other words the maximum size of the `authority` peer set priority group.
const MAX_NUM_AUTHORITY_CONN: usize = 10;
/// An `AuthorityDiscovery` makes a given authority discoverable and discovers other authorities.
pub struct AuthorityDiscovery<Client, Network, Block>
......@@ -122,12 +119,21 @@ where
/// Interval on which to query for addresses of other authorities.
query_interval: Interval,
/// The network peerset interface for priority groups lets us only set an entire group, but we
/// retrieve the addresses of other authorities one by one from the network. To use the peerset
/// interface we need to cache the addresses and always overwrite the entire peerset priority
/// group. To ensure this map doesn't grow indefinitely `purge_old_authorities_from_cache`
/// function is called each time we add a new entry.
/// Cache of Multiaddresses of authority nodes or their sentry nodes.
//
// The network peerset interface for priority groups lets us only set an entire group, but we
// retrieve the addresses of other authorities one by one from the network. To use the peerset
// interface we need to cache the addresses and always overwrite the entire peerset priority
// group. To ensure this map doesn't grow indefinitely `purge_old_authorities_from_cache`
// function is called each time we add a new entry.
address_cache: HashMap<AuthorityId, Vec<Multiaddr>>,
/// Random number to seed address selection RNG.
///
/// A node should only try to connect to a subset of all authorities. To choose this subset one
/// uses randomness. The choice should differ between nodes to prevent hot spots, but not within
/// each node between each update to prevent connection churn. Thus before each selection we
/// seed an RNG with the same seed.
rand_addr_selection_seed: u64,
phantom: PhantomData<Block>,
}
......@@ -180,15 +186,6 @@ where
}
}).collect::<Vec<Multiaddr>>();
if addrs.len() > MAX_NUM_SENTRY_ADDRESSES_PER_AUTHORITY {
warn!(
target: "sub-authority-discovery",
"More than MAX_NUM_SENTRY_ADDRESSES_PER_AUTHORITY ({:?}) were specified. Other \
nodes will likely ignore the remainder.",
MAX_NUM_SENTRY_ADDRESSES_PER_AUTHORITY,
);
}
Some(addrs)
} else {
None
......@@ -196,6 +193,7 @@ where
let address_cache = HashMap::new();
let rand_addr_selection_seed = rand::thread_rng().gen();
AuthorityDiscovery {
client,
......@@ -206,6 +204,7 @@ where
publish_interval,
query_interval,
address_cache,
rand_addr_selection_seed,
phantom: PhantomData,
}
}
......@@ -302,31 +301,41 @@ where
&mut self,
values: Vec<(libp2p::kad::record::Key, Vec<u8>)>,
) -> Result<()> {
debug!(target: "sub-authority-discovery", "Got Dht value from network.");
let block_id = BlockId::hash(self.client.info().best_hash);
// From the Dht we only get the hashed authority id. In order to retrieve the actual
// authority id and to ensure it is actually an authority, we match the hash against the
// hash of the authority id of all other authorities.
let authorities = self.client.runtime_api().authorities(&block_id)?;
self.purge_old_authorities_from_cache(&authorities);
let authorities = authorities
.into_iter()
.map(|id| hash_authority_id(id.as_ref()).map(|h| (h, id)))
.collect::<Result<HashMap<_, _>>>()?;
// Ensure `values` is not empty and all its keys equal.
let remote_key = values.iter().fold(Ok(None), |acc, (key, _)| {
match acc {
Ok(None) => Ok(Some(key.clone())),
Ok(Some(ref prev_key)) if prev_key != key => Err(
Error::ReceivingDhtValueFoundEventWithDifferentKeys
),
x @ Ok(_) => x,
Err(e) => Err(e),
}
})?.ok_or(Error::ReceivingDhtValueFoundEventWithNoRecords)?;
let authorities = {
let block_id = BlockId::hash(self.client.info().best_hash);
// From the Dht we only get the hashed authority id. In order to retrieve the actual
// authority id and to ensure it is actually an authority, we match the hash against the
// hash of the authority id of all other authorities.
let authorities = self.client.runtime_api().authorities(&block_id)?;
self.purge_old_authorities_from_cache(&authorities);
authorities
.into_iter()
.map(|id| hash_authority_id(id.as_ref()).map(|h| (h, id)))
.collect::<Result<HashMap<_, _>>>()?
};
for (key, value) in values.iter() {
// Check if the event origins from an authority in the current authority set.
let authority_id: &AuthorityId = authorities
.get(key)
.ok_or(Error::MatchingHashedAuthorityIdWithAuthorityId)?;
// Check if the event origins from an authority in the current authority set.
let authority_id: &AuthorityId = authorities
.get(&remote_key)
.ok_or(Error::MatchingHashedAuthorityIdWithAuthorityId)?;
let mut remote_addresses: Vec<Multiaddr> = values.into_iter().map(|(_k, v)| {
let schema::SignedAuthorityAddresses {
signature,
addresses,
} = schema::SignedAuthorityAddresses::decode(value).map_err(Error::DecodingProto)?;
} = schema::SignedAuthorityAddresses::decode(v).map_err(Error::DecodingProto)?;
let signature = AuthoritySignature::decode(&mut &signature[..])
.map_err(Error::EncodingDecodingScale)?;
......@@ -334,7 +343,7 @@ where
return Err(Error::VerifyingDhtPayload);
}
let mut addresses: Vec<libp2p::Multiaddr> = schema::AuthorityAddresses::decode(addresses)
let addresses: Vec<libp2p::Multiaddr> = schema::AuthorityAddresses::decode(addresses)
.map(|a| a.addresses)
.map_err(Error::DecodingProto)?
.into_iter()
......@@ -342,38 +351,19 @@ where
.collect::<std::result::Result<_, _>>()
.map_err(Error::ParsingMultiaddress)?;
if addresses.len() > MAX_NUM_SENTRY_ADDRESSES_PER_AUTHORITY {
warn!(
target: "sub-authority-discovery",
"Got more than MAX_NUM_SENTRY_ADDRESSES_PER_AUTHORITY ({:?}) for Authority \
'{:?}' from DHT, dropping the remainder.",
MAX_NUM_SENTRY_ADDRESSES_PER_AUTHORITY, authority_id,
);
addresses = addresses.into_iter()
.take(MAX_NUM_SENTRY_ADDRESSES_PER_AUTHORITY)
.collect();
}
self.address_cache.insert(authority_id.clone(), addresses);
Ok(addresses)
})
// TODO: Can we do this nicer?
.collect::<Result<Vec<Vec<Multiaddr>>>>()?
.into_iter().flatten().collect();
if !remote_addresses.is_empty() {
// TODO: Handle unwrap
remote_addresses.sort_by(|a, b| a.as_ref().partial_cmp(b.as_ref()).unwrap());
self.address_cache.insert(authority_id.clone(), remote_addresses);
self.update_peer_set_priority_group()?;
}
// Let's update the peerset priority group with all the addresses we have in our cache.
let addresses = HashSet::from_iter(
self.address_cache
.iter()
.map(|(_peer_id, addresses)| addresses.clone())
.flatten(),
);
debug!(
target: "sub-authority-discovery",
"Applying priority group {:#?} to peerset.", addresses,
);
self.network
.set_priority_group(AUTHORITIES_PRIORITY_GROUP_NAME.to_string(), addresses)
.map_err(Error::SettingPeersetPriorityGroup)?;
Ok(())
}
......@@ -427,6 +417,41 @@ where
Ok(intersection)
}
/// Update the peer set 'authority' priority group.
//
// Each node should connect to a subset of all authorities. In order to prevent hot spots, this
// selection is based on randomness. Selecting randomly each time we change the address cache
// would result in connection churn. Instead a node generates a seed on startup and uses this
// seed for a new rng on each update. (One could as well use ones peer id as a seed. Given that
// the peer id is publicly known, it would make this process predictable by others, which might
// be used as an attack.)
fn update_peer_set_priority_group(&self) -> Result<()>{
let mut rng = StdRng::seed_from_u64(self.rand_addr_selection_seed);
let mut addresses = self.address_cache
.iter()
.map(|(_peer_id, addresses)| addresses.choose(&mut rng)
.expect("an empty address vector is never inserted into the cache"))
.cloned()
.collect::<Vec<Multiaddr>>();
addresses.dedup();
// TODO: Handle unwrap
addresses.sort_by(|a, b| a.as_ref().partial_cmp(b.as_ref()).unwrap());
addresses = addresses.choose_multiple(&mut rng, MAX_NUM_AUTHORITY_CONN).cloned().collect();
debug!(
target: "sub-authority-discovery",
"Applying priority group {:?} to peerset.", addresses,
);
self.network
.set_priority_group(AUTHORITIES_PRIORITY_GROUP_NAME.to_string(), addresses.into_iter().collect())
.map_err(Error::SettingPeersetPriorityGroup)?;
Ok(())
}
}
impl<Client, Network, Block> Future for AuthorityDiscovery<Client, Network, Block>
......@@ -546,16 +571,16 @@ fn interval_at(start: Instant, duration: Duration) -> Interval {
#[cfg(test)]
mod tests {
use super::*;
use sp_api::{ApiExt, Core, RuntimeVersion, StorageProof};
use futures::channel::mpsc::channel;
use futures::executor::block_on;
use futures::future::poll_fn;
use sp_api::{ApiExt, Core, RuntimeVersion, StorageProof};
use sp_core::{ExecutionContext, NativeOrEncoded, testing::KeyStore};
use sp_runtime::traits::Zero;
use sp_runtime::traits::{ApiRef, Block as BlockT, NumberFor, ProvideRuntimeApi};
use std::sync::{Arc, Mutex};
use sp_test_primitives::Block;
use std::{iter::FromIterator, sync::{Arc, Mutex}};
use super::*;
#[test]
fn interval_at_with_start_now() {
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment