Unverified Commit 0dc1fcdf authored by Andronik Ordian's avatar Andronik Ordian Committed by GitHub
Browse files

Companion PR for #7247 (incremental priority group updates) (#1800)

* validator discovery: use incremental updates for priority_group

* validator discovery: fix compilation

* validator discovery: remove Sync bound on Net

* "Update Substrate"

Co-authored-by: parity-processbot <>
parent bc7d1322
Pipeline #110079 passed with stages
in 24 minutes and 53 seconds
This diff is collapsed.
......@@ -840,8 +840,13 @@ mod tests {
}
}
#[async_trait]
impl validator_discovery::Network for TestNetwork {
fn set_priority_group(&self, _group_id: String, _multiaddresses: HashSet<Multiaddr>) -> Result<(), String> {
async fn add_to_priority_group(&mut self, _group_id: String, _multiaddresses: HashSet<Multiaddr>) -> Result<(), String> {
Ok(())
}
async fn remove_from_priority_group(&mut self, _group_id: String, _multiaddresses: HashSet<Multiaddr>) -> Result<(), String> {
Ok(())
}
}
......
......@@ -31,11 +31,12 @@ use polkadot_primitives::v1::{AuthorityDiscoveryId, Block, Hash};
const PRIORITY_GROUP: &'static str = "parachain_validators";
/// An abstraction over networking for the purposes of validator discovery service.
#[async_trait]
pub trait Network: Send + 'static {
/// Ask the network to connect to these nodes and not disconnect from them until removed from the priority group.
fn set_priority_group(&self, group_id: String, multiaddresses: HashSet<Multiaddr>) -> Result<(), String>;
// TODO (ordian): we might want to add `add_to_priority_group` and `remove_from_priority_group`
// https://github.com/paritytech/polkadot/issues/1763
async fn add_to_priority_group(&mut self, group_id: String, multiaddresses: HashSet<Multiaddr>) -> Result<(), String>;
/// Remove the peers from the priority group.
async fn remove_from_priority_group(&mut self, group_id: String, multiaddresses: HashSet<Multiaddr>) -> Result<(), String>;
}
/// An abstraction over the authority discovery service.
......@@ -47,9 +48,14 @@ pub trait AuthorityDiscovery: Send + 'static {
async fn get_authority_id_by_peer_id(&mut self, peer_id: PeerId) -> Option<AuthorityDiscoveryId>;
}
#[async_trait]
impl Network for Arc<sc_network::NetworkService<Block, Hash>> {
fn set_priority_group(&self, group_id: String, multiaddresses: HashSet<Multiaddr>) -> Result<(), String> {
sc_network::NetworkService::set_priority_group(&**self, group_id, multiaddresses)
async fn add_to_priority_group(&mut self, group_id: String, multiaddresses: HashSet<Multiaddr>) -> Result<(), String> {
sc_network::NetworkService::add_to_priority_group(&**self, group_id, multiaddresses).await
}
async fn remove_from_priority_group(&mut self, group_id: String, multiaddresses: HashSet<Multiaddr>) -> Result<(), String> {
sc_network::NetworkService::remove_from_priority_group(&**self, group_id, multiaddresses).await
}
}
......@@ -118,8 +124,6 @@ pub(super) struct Service<N, AD> {
// in the `connected_validators` map.
// Invariant: the value > 0 for non-revoked requests.
requested_validators: HashMap<AuthorityDiscoveryId, u64>,
// keep for the network priority_group updates
validator_multiaddresses: HashSet<Multiaddr>,
non_revoked_discovery_requests: Vec<NonRevokedConnectionRequestState>,
// PhantomData used to make the struct generic instead of having generic methods
network: PhantomData<N>,
......@@ -131,7 +135,6 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
Self {
connected_validators: HashMap::new(),
requested_validators: HashMap::new(),
validator_multiaddresses: HashSet::new(),
non_revoked_discovery_requests: Vec::new(),
network: PhantomData,
authority_discovery: PhantomData,
......@@ -150,7 +153,7 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
validator_ids: Vec<AuthorityDiscoveryId>,
mut connected: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>,
revoke: oneshot::Receiver<()>,
network_service: N,
mut network_service: N,
mut authority_discovery_service: AD,
) -> (N, AD) {
const MAX_ADDR_PER_PEER: usize = 3;
......@@ -204,6 +207,7 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
}
// collect multiaddress of validators
let mut multiaddr_to_add = HashSet::new();
for authority in validator_ids.iter().cloned() {
let result = authority_discovery_service.get_addresses_by_authority_id(authority).await;
if let Some(addresses) = result {
......@@ -213,7 +217,7 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
// They are going to be removed soon though:
// https://github.com/paritytech/substrate/issues/6845
for addr in addresses.into_iter().take(MAX_ADDR_PER_PEER) {
self.validator_multiaddresses.insert(addr);
multiaddr_to_add.insert(addr);
}
}
}
......@@ -238,25 +242,26 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
}
// multiaddresses to remove
let mut multiaddr_to_remove = HashSet::new();
for id in revoked_validators.into_iter() {
let result = authority_discovery_service.get_addresses_by_authority_id(id).await;
if let Some(addresses) = result {
for addr in addresses.into_iter().take(MAX_ADDR_PER_PEER) {
self.validator_multiaddresses.remove(&addr);
multiaddr_to_remove.insert(addr);
}
}
}
// ask the network to connect to these nodes and not disconnect
// from them until removed from the priority group
// TODO (ordian): this clones the whole set of multaddresses
// TODO (ordian): use add_to_priority_group for incremental updates?
if let Err(e) = network_service.set_priority_group(
if let Err(e) = network_service.add_to_priority_group(
PRIORITY_GROUP.to_owned(),
self.validator_multiaddresses.clone(),
) {
multiaddr_to_add,
).await {
log::warn!(target: super::TARGET, "AuthorityDiscoveryService returned an invalid multiaddress: {}", e);
}
// the addresses are known to be valid
let _ = network_service.remove_from_priority_group(PRIORITY_GROUP.to_owned(), multiaddr_to_remove).await;
let pending = validator_ids.iter()
.cloned()
......@@ -311,8 +316,7 @@ mod tests {
#[derive(Default)]
struct TestNetwork {
// Mutex is used because of &self signature of set_priority_group
priority_group: std::sync::Mutex<HashSet<Multiaddr>>,
priority_group: HashSet<Multiaddr>,
}
struct TestAuthorityDiscovery {
......@@ -337,10 +341,15 @@ mod tests {
}
}
#[async_trait]
impl Network for TestNetwork {
fn set_priority_group(&self, _group_id: String, multiaddresses: HashSet<Multiaddr>) -> Result<(), String> {
let mut group = self.priority_group.lock().unwrap();
*group = multiaddresses;
async fn add_to_priority_group(&mut self, _group_id: String, multiaddresses: HashSet<Multiaddr>) -> Result<(), String> {
self.priority_group.extend(multiaddresses.into_iter());
Ok(())
}
async fn remove_from_priority_group(&mut self, _group_id: String, multiaddresses: HashSet<Multiaddr>) -> Result<(), String> {
self.priority_group.retain(|elem| !multiaddresses.contains(elem));
Ok(())
}
}
......@@ -570,7 +579,7 @@ mod tests {
let _ = receiver.next().await.unwrap();
assert_eq!(service.non_revoked_discovery_requests.len(), 1);
assert_eq!(ns.priority_group.lock().unwrap().len(), 2);
assert_eq!(ns.priority_group.len(), 2);
// revoke the second request
revoke_tx.send(()).unwrap();
......@@ -588,7 +597,7 @@ mod tests {
let _ = receiver.next().await.unwrap();
assert_eq!(service.non_revoked_discovery_requests.len(), 1);
assert_eq!(ns.priority_group.lock().unwrap().len(), 1);
assert_eq!(ns.priority_group.len(), 1);
});
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment