From fa0142ac8fd44680494583baa21bca4281752052 Mon Sep 17 00:00:00 2001 From: Pierre Krieger <pierre.krieger1708@gmail.com> Date: Mon, 5 Apr 2021 21:46:39 +0200 Subject: [PATCH] Properly remove peers from sets and merge the two Network traits (#2821) * Properly remove peers from sets * Actually rename all, I guess * Merge the two Network traits * Rename function * Update node/network/bridge/src/network.rs Co-authored-by: Andronik Ordian <write@reusable.software> * Fix erroneous change * Update node/network/bridge/src/network.rs Co-authored-by: Andronik Ordian <write@reusable.software> --- polkadot/node/network/bridge/src/lib.rs | 25 ++++---- polkadot/node/network/bridge/src/network.rs | 19 ++++++- .../network/bridge/src/validator_discovery.rs | 57 +++++++++---------- 3 files changed, 55 insertions(+), 46 deletions(-) diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index 417135a5a24..efffc81234f 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -273,7 +273,7 @@ impl<N, AD> NetworkBridge<N, AD> { impl<Net, AD, Context> Subsystem<Context> for NetworkBridge<Net, AD> where - Net: Network + validator_discovery::Network + Sync, + Net: Network + Sync, AD: validator_discovery::AuthorityDiscovery, Context: SubsystemContext<Message=NetworkBridgeMessage>, { @@ -345,7 +345,7 @@ async fn handle_subsystem_messages<Context, N, AD>( ) -> Result<(), UnexpectedAbort> where Context: SubsystemContext<Message = NetworkBridgeMessage>, - N: Network + validator_discovery::Network, + N: Network, AD: validator_discovery::AuthorityDiscovery, { // This is kept sorted, descending, by block number. @@ -835,7 +835,7 @@ async fn run_network<N, AD>( mut ctx: impl SubsystemContext<Message=NetworkBridgeMessage>, ) -> SubsystemResult<()> where - N: Network + validator_discovery::Network, + N: Network, AD: validator_discovery::AuthorityDiscovery, { let shared = Shared::default(); @@ -1222,6 +1222,14 @@ mod tests { .boxed() } + async fn add_to_peers_set(&mut self, _protocol: Cow<'static, str>, _: HashSet<Multiaddr>) -> Result<(), String> { + Ok(()) + } + + async fn remove_from_peers_set(&mut self, _protocol: Cow<'static, str>, _: HashSet<Multiaddr>) -> Result<(), String> { + Ok(()) + } + fn action_sink<'a>(&'a mut self) -> Pin<Box<dyn Sink<NetworkAction, Error = SubsystemError> + Send + 'a>> { @@ -1232,17 +1240,6 @@ mod tests { } } - #[async_trait] - impl validator_discovery::Network for TestNetwork { - async fn add_peers_to_reserved_set(&mut self, _protocol: Cow<'static, str>, _: HashSet<Multiaddr>) -> Result<(), String> { - Ok(()) - } - - async fn remove_peers_from_reserved_set(&mut self, _protocol: Cow<'static, str>, _: HashSet<Multiaddr>) -> Result<(), String> { - Ok(()) - } - } - #[async_trait] impl validator_discovery::AuthorityDiscovery for TestAuthorityDiscovery { async fn get_addresses_by_authority_id(&mut self, _authority: AuthorityDiscoveryId) -> Option<Vec<Multiaddr>> { diff --git a/polkadot/node/network/bridge/src/network.rs b/polkadot/node/network/bridge/src/network.rs index 8d52b609233..2f6fa8d1f55 100644 --- a/polkadot/node/network/bridge/src/network.rs +++ b/polkadot/node/network/bridge/src/network.rs @@ -14,6 +14,8 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see <http://www.gnu.org/licenses/>. +use std::borrow::Cow; +use std::collections::HashSet; use std::pin::Pin; use std::sync::Arc; @@ -26,7 +28,7 @@ use parity_scale_codec::Encode; use sc_network::Event as NetworkEvent; use sc_network::{IfDisconnected, NetworkService, OutboundFailure, RequestFailure}; -use sc_network::config::parse_addr; +use sc_network::{config::parse_addr, multiaddr::Multiaddr}; use polkadot_node_network_protocol::{ peer_set::PeerSet, @@ -112,6 +114,13 @@ pub trait Network: Clone + Send + 'static { /// or [`COLLATION_PROTOCOL_NAME`](COLLATION_PROTOCOL_NAME) fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent>; + /// Ask the network to keep a substream open with these nodes and not disconnect from them + /// until removed from the protocol's peer set. + /// Note that `out_peers` setting has no effect on this. + async fn add_to_peers_set(&mut self, protocol: Cow<'static, str>, multiaddresses: HashSet<Multiaddr>) -> Result<(), String>; + /// Cancels the effects of `add_to_peers_set`. + async fn remove_from_peers_set(&mut self, protocol: Cow<'static, str>, multiaddresses: HashSet<Multiaddr>) -> Result<(), String>; + /// Get access to an underlying sink for all network actions. fn action_sink<'a>( &'a mut self, @@ -175,6 +184,14 @@ impl Network for Arc<NetworkService<Block, Hash>> { NetworkService::event_stream(self, "polkadot-network-bridge").boxed() } + async fn add_to_peers_set(&mut self, protocol: Cow<'static, str>, multiaddresses: HashSet<Multiaddr>) -> Result<(), String> { + sc_network::NetworkService::add_peers_to_reserved_set(&**self, protocol, multiaddresses) + } + + async fn remove_from_peers_set(&mut self, protocol: Cow<'static, str>, multiaddresses: HashSet<Multiaddr>) -> Result<(), String> { + sc_network::NetworkService::remove_from_peers_set(&**self, protocol, multiaddresses) + } + #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] fn action_sink<'a>( &'a mut self, diff --git a/polkadot/node/network/bridge/src/validator_discovery.rs b/polkadot/node/network/bridge/src/validator_discovery.rs index 4f34673a600..e6170d6c805 100644 --- a/polkadot/node/network/bridge/src/validator_discovery.rs +++ b/polkadot/node/network/bridge/src/validator_discovery.rs @@ -16,10 +16,10 @@ //! A validator discovery service for the Network Bridge. +use crate::Network; + use core::marker::PhantomData; -use std::borrow::Cow; use std::collections::{HashSet, HashMap, hash_map}; -use std::sync::Arc; use async_trait::async_trait; use futures::channel::mpsc; @@ -27,20 +27,11 @@ use futures::channel::mpsc; use sc_network::{config::parse_addr, multiaddr::Multiaddr}; use sc_authority_discovery::Service as AuthorityDiscoveryService; use polkadot_node_network_protocol::PeerId; -use polkadot_primitives::v1::{AuthorityDiscoveryId, Block, Hash}; +use polkadot_primitives::v1::AuthorityDiscoveryId; use polkadot_node_network_protocol::peer_set::{PeerSet, PerPeerSet}; const LOG_TARGET: &str = "parachain::validator-discovery"; -/// An abstraction over networking for the purposes of validator discovery service. -#[async_trait] -pub trait Network: Send + 'static { - /// Ask the network to connect to these nodes and not disconnect from them until removed from the priority group. - async fn add_peers_to_reserved_set(&mut self, protocol: Cow<'static, str>, multiaddresses: HashSet<Multiaddr>) -> Result<(), String>; - /// Remove the peers from the priority group. - async fn remove_peers_from_reserved_set(&mut self, protocol: Cow<'static, str>, multiaddresses: HashSet<Multiaddr>) -> Result<(), String>; -} - /// An abstraction over the authority discovery service. #[async_trait] pub trait AuthorityDiscovery: Send + 'static { @@ -50,17 +41,6 @@ pub trait AuthorityDiscovery: Send + 'static { async fn get_authority_id_by_peer_id(&mut self, peer_id: PeerId) -> Option<AuthorityDiscoveryId>; } -#[async_trait] -impl Network for Arc<sc_network::NetworkService<Block, Hash>> { - async fn add_peers_to_reserved_set(&mut self, protocol: Cow<'static, str>, multiaddresses: HashSet<Multiaddr>) -> Result<(), String> { - sc_network::NetworkService::add_peers_to_reserved_set(&**self, protocol, multiaddresses) - } - - async fn remove_peers_from_reserved_set(&mut self, protocol: Cow<'static, str>, multiaddresses: HashSet<Multiaddr>) -> Result<(), String> { - sc_network::NetworkService::remove_peers_from_reserved_set(&**self, protocol, multiaddresses) - } -} - #[async_trait] impl AuthorityDiscovery for AuthorityDiscoveryService { async fn get_addresses_by_authority_id(&mut self, authority: AuthorityDiscoveryId) -> Option<Vec<Multiaddr>> { @@ -300,14 +280,14 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> { // ask the network to connect to these nodes and not disconnect // from them until removed from the set - if let Err(e) = network_service.add_peers_to_reserved_set( + if let Err(e) = network_service.add_to_peers_set( peer_set.into_protocol_name(), multiaddr_to_add.clone(), ).await { tracing::warn!(target: LOG_TARGET, err = ?e, "AuthorityDiscoveryService returned an invalid multiaddress"); } // the addresses are known to be valid - let _ = network_service.remove_peers_from_reserved_set( + let _ = network_service.remove_from_peers_set( peer_set.into_protocol_name(), multiaddr_to_remove.clone() ).await; @@ -357,12 +337,14 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> { #[cfg(test)] mod tests { use super::*; + use crate::network::{Network, NetworkAction}; - use futures::stream::StreamExt as _; + use std::{borrow::Cow, pin::Pin}; + use futures::{sink::Sink, stream::{BoxStream, StreamExt as _}}; use sc_network::multiaddr::Protocol; - + use sc_network::{Event as NetworkEvent, IfDisconnected}; use sp_keyring::Sr25519Keyring; - + use polkadot_node_network_protocol::request_response::request::Requests; fn new_service() -> Service<TestNetwork, TestAuthorityDiscovery> { Service::new() @@ -372,7 +354,7 @@ mod tests { (TestNetwork::default(), TestAuthorityDiscovery::new()) } - #[derive(Default)] + #[derive(Default, Clone)] struct TestNetwork { peers_set: HashSet<Multiaddr>, } @@ -402,15 +384,28 @@ mod tests { #[async_trait] impl Network for TestNetwork { - async fn add_peers_to_reserved_set(&mut self, _protocol: Cow<'static, str>, multiaddresses: HashSet<Multiaddr>) -> Result<(), String> { + fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent> { + panic!() + } + + async fn add_to_peers_set(&mut self, _protocol: Cow<'static, str>, multiaddresses: HashSet<Multiaddr>) -> Result<(), String> { self.peers_set.extend(multiaddresses.into_iter()); Ok(()) } - async fn remove_peers_from_reserved_set(&mut self, _protocol: Cow<'static, str>, multiaddresses: HashSet<Multiaddr>) -> Result<(), String> { + async fn remove_from_peers_set(&mut self, _protocol: Cow<'static, str>, multiaddresses: HashSet<Multiaddr>) -> Result<(), String> { self.peers_set.retain(|elem| !multiaddresses.contains(elem)); Ok(()) } + + fn action_sink<'a>(&'a mut self) + -> Pin<Box<dyn Sink<NetworkAction, Error = polkadot_subsystem::SubsystemError> + Send + 'a>> + { + panic!() + } + + async fn start_request<AD: AuthorityDiscovery>(&self, _: &mut AD, _: Requests, _: IfDisconnected) { + } } #[async_trait] -- GitLab