Unverified Commit c0d681cc authored by Andronik Ordian's avatar Andronik Ordian Committed by GitHub
Browse files

validator-discovery: don't remove multiaddr of requested `PeerId`s (#4036)

* validator-discovery: remove from peer set before inserting

* bump spec versions

* rework into a companion

* fmt

* fix

* fix

* one more time

* one more try

* one more try

* Revert "one more try"

This reverts commit ab6568d3.

* one more try

* one more try

* Revert "one more try"

This reverts commit 8d7369f7

.

* fix a warning

* fix another warn

* correct log

* fix compilation

* ffs

* less cloning

* Apply suggestions from code review
Co-authored-by: Pierre Krieger's avatarPierre Krieger <pierre.krieger1708@gmail.com>

* add comments and a small refactoring

* use set_reserved_peers

* cargo update -p sp-io

* rename added to num_peers

* update Substrate
Co-authored-by: Pierre Krieger's avatarPierre Krieger <pierre.krieger1708@gmail.com>
Co-authored-by: parity-processbot <>
parent 1f8021af
Pipeline #161284 passed with stages
in 45 minutes and 33 seconds
This diff is collapsed.
......@@ -81,18 +81,14 @@ pub trait Network: Clone + Send + 'static {
/// Ask the network to keep a substream open with these nodes and not disconnect from them
/// until removed from the protocol's peer set.
/// Note that `out_peers` setting has no effect on this.
async fn add_to_peers_set(
async fn set_reserved_peers(
&mut self,
protocol: Cow<'static, str>,
multiaddresses: HashSet<Multiaddr>,
) -> Result<(), String>;
/// Cancels the effects of `add_to_peers_set`.
async fn remove_from_peers_set(
&mut self,
protocol: Cow<'static, str>,
multiaddresses: HashSet<Multiaddr>,
) -> Result<(), String>;
/// Removes the peers for the protocol's peer set (both reserved and non-reserved).
async fn remove_from_peers_set(&mut self, protocol: Cow<'static, str>, peers: Vec<PeerId>);
/// Send a request to a remote peer.
async fn start_request<AD: AuthorityDiscovery>(
......@@ -118,25 +114,16 @@ impl Network for Arc<NetworkService<Block, Hash>> {
NetworkService::event_stream(self, "polkadot-network-bridge").boxed()
}
async fn add_to_peers_set(
async fn set_reserved_peers(
&mut self,
protocol: Cow<'static, str>,
multiaddresses: HashSet<Multiaddr>,
) -> Result<(), String> {
sc_network::NetworkService::add_peers_to_reserved_set(&**self, protocol, multiaddresses)
sc_network::NetworkService::set_reserved_peers(&**self, protocol, multiaddresses)
}
async fn remove_from_peers_set(
&mut self,
protocol: Cow<'static, str>,
multiaddresses: HashSet<Multiaddr>,
) -> Result<(), String> {
sc_network::NetworkService::remove_peers_from_reserved_set(
&**self,
protocol.clone(),
multiaddresses.clone(),
)?;
sc_network::NetworkService::remove_from_peers_set(&**self, protocol, multiaddresses)
async fn remove_from_peers_set(&mut self, protocol: Cow<'static, str>, peers: Vec<PeerId>) {
sc_network::NetworkService::remove_peers_from_reserved_set(&**self, protocol, peers);
}
fn report_peer(&self, who: PeerId, cost_benefit: Rep) {
......
......@@ -98,7 +98,7 @@ impl Network for TestNetwork {
.boxed()
}
async fn add_to_peers_set(
async fn set_reserved_peers(
&mut self,
_protocol: Cow<'static, str>,
_: HashSet<Multiaddr>,
......@@ -106,13 +106,7 @@ impl Network for TestNetwork {
Ok(())
}
async fn remove_from_peers_set(
&mut self,
_protocol: Cow<'static, str>,
_: HashSet<Multiaddr>,
) -> Result<(), String> {
Ok(())
}
async fn remove_from_peers_set(&mut self, _protocol: Cow<'static, str>, _: Vec<PeerId>) {}
async fn start_request<AD: AuthorityDiscovery>(
&self,
......
......@@ -23,10 +23,13 @@ use std::collections::HashSet;
use futures::channel::oneshot;
use sc_network::multiaddr::Multiaddr;
use sc_network::multiaddr::{self, Multiaddr};
pub use polkadot_node_network_protocol::authority_discovery::AuthorityDiscovery;
use polkadot_node_network_protocol::peer_set::{PeerSet, PerPeerSet};
use polkadot_node_network_protocol::{
peer_set::{PeerSet, PerPeerSet},
PeerId,
};
use polkadot_primitives::v1::AuthorityDiscoveryId;
const LOG_TARGET: &str = "parachain::validator-discovery";
......@@ -39,7 +42,7 @@ pub(super) struct Service<N, AD> {
#[derive(Default)]
struct StatePerPeerSet {
previously_requested: HashSet<Multiaddr>,
previously_requested: HashSet<PeerId>,
}
impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
......@@ -47,7 +50,7 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
Self { state: Default::default(), _phantom: PhantomData }
}
/// Connect to already resolved addresses:
/// Connect to already resolved addresses.
pub async fn on_resolved_request(
&mut self,
newly_requested: HashSet<Multiaddr>,
......@@ -55,31 +58,32 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
mut network_service: N,
) -> N {
let state = &mut self.state[peer_set];
// clean up revoked requests
let multiaddr_to_remove: HashSet<_> =
state.previously_requested.difference(&newly_requested).cloned().collect();
let multiaddr_to_add: HashSet<_> =
newly_requested.difference(&state.previously_requested).cloned().collect();
state.previously_requested = newly_requested;
let new_peer_ids: HashSet<PeerId> = extract_peer_ids(newly_requested.iter().cloned());
let num_peers = new_peer_ids.len();
let peers_to_remove: Vec<PeerId> =
state.previously_requested.difference(&new_peer_ids).cloned().collect();
let removed = peers_to_remove.len();
state.previously_requested = new_peer_ids;
tracing::debug!(
target: LOG_TARGET,
?peer_set,
added = multiaddr_to_add.len(),
removed = multiaddr_to_remove.len(),
?num_peers,
?removed,
"New ConnectToValidators resolved request",
);
// ask the network to connect to these nodes and not disconnect
// from them until removed from the set
if let Err(e) = network_service
.add_to_peers_set(peer_set.into_protocol_name(), multiaddr_to_add)
.set_reserved_peers(peer_set.into_protocol_name(), newly_requested)
.await
{
tracing::warn!(target: LOG_TARGET, err = ?e, "AuthorityDiscoveryService returned an invalid multiaddress");
}
// the addresses are known to be valid
let _ = network_service
.remove_from_peers_set(peer_set.into_protocol_name(), multiaddr_to_remove)
.remove_from_peers_set(peer_set.into_protocol_name(), peers_to_remove)
.await;
network_service
......@@ -136,6 +140,15 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
}
}
fn extract_peer_ids(multiaddr: impl Iterator<Item = Multiaddr>) -> HashSet<PeerId> {
multiaddr
.filter_map(|mut addr| match addr.pop() {
Some(multiaddr::Protocol::P2p(key)) => PeerId::from_multihash(key).ok(),
_ => None,
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
......@@ -158,7 +171,7 @@ mod tests {
#[derive(Default, Clone)]
struct TestNetwork {
peers_set: HashSet<Multiaddr>,
peers_set: HashSet<PeerId>,
}
#[derive(Default, Clone, Debug)]
......@@ -171,9 +184,14 @@ mod tests {
fn new() -> Self {
let peer_ids = known_peer_ids();
let authorities = known_authorities();
let multiaddr = known_multiaddr();
let multiaddr = known_multiaddr().into_iter().zip(peer_ids.iter().cloned()).map(
|(mut addr, peer_id)| {
addr.push(multiaddr::Protocol::P2p(peer_id.into()));
addr
},
);
Self {
by_authority_id: authorities.iter().cloned().zip(multiaddr.into_iter()).collect(),
by_authority_id: authorities.iter().cloned().zip(multiaddr).collect(),
by_peer_id: peer_ids.into_iter().zip(authorities.into_iter()).collect(),
}
}
......@@ -185,22 +203,21 @@ mod tests {
panic!()
}
async fn add_to_peers_set(
async fn set_reserved_peers(
&mut self,
_protocol: Cow<'static, str>,
multiaddresses: HashSet<Multiaddr>,
) -> Result<(), String> {
self.peers_set.extend(multiaddresses.into_iter());
self.peers_set = extract_peer_ids(multiaddresses.into_iter());
Ok(())
}
async fn remove_from_peers_set(
&mut self,
_protocol: Cow<'static, str>,
multiaddresses: HashSet<Multiaddr>,
) -> Result<(), String> {
self.peers_set.retain(|elem| !multiaddresses.contains(elem));
Ok(())
peers: Vec<PeerId>,
) {
self.peers_set.retain(|elem| !peers.contains(elem));
}
async fn start_request<AD: AuthorityDiscovery>(
......@@ -281,9 +298,14 @@ mod tests {
let state = &service.state[PeerSet::Validation];
assert_eq!(state.previously_requested.len(), 1);
assert!(state
.previously_requested
.contains(ads.by_authority_id.get(&authority_ids[1]).unwrap()));
let peer_1 = extract_peer_ids(
vec![ads.by_authority_id.get(&authority_ids[1]).unwrap().clone()].into_iter(),
)
.iter()
.cloned()
.next()
.unwrap();
assert!(state.previously_requested.contains(&peer_1));
});
}
......@@ -310,9 +332,14 @@ mod tests {
let state = &service.state[PeerSet::Validation];
assert_eq!(state.previously_requested.len(), 1);
assert!(state
.previously_requested
.contains(ads.by_authority_id.get(&authority_ids[0]).unwrap()));
let peer_0 = extract_peer_ids(
vec![ads.by_authority_id.get(&authority_ids[0]).unwrap().clone()].into_iter(),
)
.iter()
.cloned()
.next()
.unwrap();
assert!(state.previously_requested.contains(&peer_0));
let failed = failed_rx.await.unwrap();
assert_eq!(failed, 1);
......
......@@ -256,6 +256,7 @@ pub fn new_test_ext() -> TestExternalities {
ext
}
#[cfg(feature = "runtime-benchmarks")]
pub fn new_test_ext_with_offset(n: BlockNumber) -> TestExternalities {
LeaseOffset::set(n);
new_test_ext()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment