Unverified Commit 417999f2 authored by Pierre Krieger's avatar Pierre Krieger Committed by GitHub
Browse files

Properly remove peers from sets and merge the two Network traits (#2821)



* Properly remove peers from sets

* Actually rename all, I guess

* Merge the two Network traits

* Rename function

* Update node/network/bridge/src/network.rs

Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>

* Fix erroneous change

* Update node/network/bridge/src/network.rs

Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>
parent c5df4fa3
Pipeline #132829 canceled with stages
in 5 minutes and 24 seconds
......@@ -273,7 +273,7 @@ impl<N, AD> NetworkBridge<N, AD> {
impl<Net, AD, Context> Subsystem<Context> for NetworkBridge<Net, AD>
where
Net: Network + validator_discovery::Network + Sync,
Net: Network + Sync,
AD: validator_discovery::AuthorityDiscovery,
Context: SubsystemContext<Message=NetworkBridgeMessage>,
{
......@@ -345,7 +345,7 @@ async fn handle_subsystem_messages<Context, N, AD>(
) -> Result<(), UnexpectedAbort>
where
Context: SubsystemContext<Message = NetworkBridgeMessage>,
N: Network + validator_discovery::Network,
N: Network,
AD: validator_discovery::AuthorityDiscovery,
{
// This is kept sorted, descending, by block number.
......@@ -835,7 +835,7 @@ async fn run_network<N, AD>(
mut ctx: impl SubsystemContext<Message=NetworkBridgeMessage>,
) -> SubsystemResult<()>
where
N: Network + validator_discovery::Network,
N: Network,
AD: validator_discovery::AuthorityDiscovery,
{
let shared = Shared::default();
......@@ -1222,6 +1222,14 @@ mod tests {
.boxed()
}
async fn add_to_peers_set(&mut self, _protocol: Cow<'static, str>, _: HashSet<Multiaddr>) -> Result<(), String> {
Ok(())
}
async fn remove_from_peers_set(&mut self, _protocol: Cow<'static, str>, _: HashSet<Multiaddr>) -> Result<(), String> {
Ok(())
}
fn action_sink<'a>(&'a mut self)
-> Pin<Box<dyn Sink<NetworkAction, Error = SubsystemError> + Send + 'a>>
{
......@@ -1232,17 +1240,6 @@ mod tests {
}
}
#[async_trait]
impl validator_discovery::Network for TestNetwork {
async fn add_peers_to_reserved_set(&mut self, _protocol: Cow<'static, str>, _: HashSet<Multiaddr>) -> Result<(), String> {
Ok(())
}
async fn remove_peers_from_reserved_set(&mut self, _protocol: Cow<'static, str>, _: HashSet<Multiaddr>) -> Result<(), String> {
Ok(())
}
}
#[async_trait]
impl validator_discovery::AuthorityDiscovery for TestAuthorityDiscovery {
async fn get_addresses_by_authority_id(&mut self, _authority: AuthorityDiscoveryId) -> Option<Vec<Multiaddr>> {
......
......@@ -14,6 +14,8 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::borrow::Cow;
use std::collections::HashSet;
use std::pin::Pin;
use std::sync::Arc;
......@@ -26,7 +28,7 @@ use parity_scale_codec::Encode;
use sc_network::Event as NetworkEvent;
use sc_network::{IfDisconnected, NetworkService, OutboundFailure, RequestFailure};
use sc_network::config::parse_addr;
use sc_network::{config::parse_addr, multiaddr::Multiaddr};
use polkadot_node_network_protocol::{
peer_set::PeerSet,
......@@ -112,6 +114,13 @@ pub trait Network: Clone + Send + 'static {
/// or [`COLLATION_PROTOCOL_NAME`](COLLATION_PROTOCOL_NAME)
fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent>;
/// Ask the network to keep a substream open with these nodes and not disconnect from them
/// until removed from the protocol's peer set.
/// Note that `out_peers` setting has no effect on this.
async fn add_to_peers_set(&mut self, protocol: Cow<'static, str>, multiaddresses: HashSet<Multiaddr>) -> Result<(), String>;
/// Cancels the effects of `add_to_peers_set`.
async fn remove_from_peers_set(&mut self, protocol: Cow<'static, str>, multiaddresses: HashSet<Multiaddr>) -> Result<(), String>;
/// Get access to an underlying sink for all network actions.
fn action_sink<'a>(
&'a mut self,
......@@ -175,6 +184,14 @@ impl Network for Arc<NetworkService<Block, Hash>> {
NetworkService::event_stream(self, "polkadot-network-bridge").boxed()
}
async fn add_to_peers_set(&mut self, protocol: Cow<'static, str>, multiaddresses: HashSet<Multiaddr>) -> Result<(), String> {
sc_network::NetworkService::add_peers_to_reserved_set(&**self, protocol, multiaddresses)
}
async fn remove_from_peers_set(&mut self, protocol: Cow<'static, str>, multiaddresses: HashSet<Multiaddr>) -> Result<(), String> {
sc_network::NetworkService::remove_from_peers_set(&**self, protocol, multiaddresses)
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn action_sink<'a>(
&'a mut self,
......
......@@ -16,10 +16,10 @@
//! A validator discovery service for the Network Bridge.
use crate::Network;
use core::marker::PhantomData;
use std::borrow::Cow;
use std::collections::{HashSet, HashMap, hash_map};
use std::sync::Arc;
use async_trait::async_trait;
use futures::channel::mpsc;
......@@ -27,20 +27,11 @@ use futures::channel::mpsc;
use sc_network::{config::parse_addr, multiaddr::Multiaddr};
use sc_authority_discovery::Service as AuthorityDiscoveryService;
use polkadot_node_network_protocol::PeerId;
use polkadot_primitives::v1::{AuthorityDiscoveryId, Block, Hash};
use polkadot_primitives::v1::AuthorityDiscoveryId;
use polkadot_node_network_protocol::peer_set::{PeerSet, PerPeerSet};
const LOG_TARGET: &str = "parachain::validator-discovery";
/// An abstraction over networking for the purposes of validator discovery service.
#[async_trait]
pub trait Network: Send + 'static {
/// Ask the network to connect to these nodes and not disconnect from them until removed from the priority group.
async fn add_peers_to_reserved_set(&mut self, protocol: Cow<'static, str>, multiaddresses: HashSet<Multiaddr>) -> Result<(), String>;
/// Remove the peers from the priority group.
async fn remove_peers_from_reserved_set(&mut self, protocol: Cow<'static, str>, multiaddresses: HashSet<Multiaddr>) -> Result<(), String>;
}
/// An abstraction over the authority discovery service.
#[async_trait]
pub trait AuthorityDiscovery: Send + 'static {
......@@ -50,17 +41,6 @@ pub trait AuthorityDiscovery: Send + 'static {
async fn get_authority_id_by_peer_id(&mut self, peer_id: PeerId) -> Option<AuthorityDiscoveryId>;
}
#[async_trait]
impl Network for Arc<sc_network::NetworkService<Block, Hash>> {
async fn add_peers_to_reserved_set(&mut self, protocol: Cow<'static, str>, multiaddresses: HashSet<Multiaddr>) -> Result<(), String> {
sc_network::NetworkService::add_peers_to_reserved_set(&**self, protocol, multiaddresses)
}
async fn remove_peers_from_reserved_set(&mut self, protocol: Cow<'static, str>, multiaddresses: HashSet<Multiaddr>) -> Result<(), String> {
sc_network::NetworkService::remove_peers_from_reserved_set(&**self, protocol, multiaddresses)
}
}
#[async_trait]
impl AuthorityDiscovery for AuthorityDiscoveryService {
async fn get_addresses_by_authority_id(&mut self, authority: AuthorityDiscoveryId) -> Option<Vec<Multiaddr>> {
......@@ -300,14 +280,14 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
// ask the network to connect to these nodes and not disconnect
// from them until removed from the set
if let Err(e) = network_service.add_peers_to_reserved_set(
if let Err(e) = network_service.add_to_peers_set(
peer_set.into_protocol_name(),
multiaddr_to_add.clone(),
).await {
tracing::warn!(target: LOG_TARGET, err = ?e, "AuthorityDiscoveryService returned an invalid multiaddress");
}
// the addresses are known to be valid
let _ = network_service.remove_peers_from_reserved_set(
let _ = network_service.remove_from_peers_set(
peer_set.into_protocol_name(),
multiaddr_to_remove.clone()
).await;
......@@ -357,12 +337,14 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
#[cfg(test)]
mod tests {
use super::*;
use crate::network::{Network, NetworkAction};
use futures::stream::StreamExt as _;
use std::{borrow::Cow, pin::Pin};
use futures::{sink::Sink, stream::{BoxStream, StreamExt as _}};
use sc_network::multiaddr::Protocol;
use sc_network::{Event as NetworkEvent, IfDisconnected};
use sp_keyring::Sr25519Keyring;
use polkadot_node_network_protocol::request_response::request::Requests;
fn new_service() -> Service<TestNetwork, TestAuthorityDiscovery> {
Service::new()
......@@ -372,7 +354,7 @@ mod tests {
(TestNetwork::default(), TestAuthorityDiscovery::new())
}
#[derive(Default)]
#[derive(Default, Clone)]
struct TestNetwork {
peers_set: HashSet<Multiaddr>,
}
......@@ -402,15 +384,28 @@ mod tests {
#[async_trait]
impl Network for TestNetwork {
async fn add_peers_to_reserved_set(&mut self, _protocol: Cow<'static, str>, multiaddresses: HashSet<Multiaddr>) -> Result<(), String> {
fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent> {
panic!()
}
async fn add_to_peers_set(&mut self, _protocol: Cow<'static, str>, multiaddresses: HashSet<Multiaddr>) -> Result<(), String> {
self.peers_set.extend(multiaddresses.into_iter());
Ok(())
}
async fn remove_peers_from_reserved_set(&mut self, _protocol: Cow<'static, str>, multiaddresses: HashSet<Multiaddr>) -> Result<(), String> {
async fn remove_from_peers_set(&mut self, _protocol: Cow<'static, str>, multiaddresses: HashSet<Multiaddr>) -> Result<(), String> {
self.peers_set.retain(|elem| !multiaddresses.contains(elem));
Ok(())
}
fn action_sink<'a>(&'a mut self)
-> Pin<Box<dyn Sink<NetworkAction, Error = polkadot_subsystem::SubsystemError> + Send + 'a>>
{
panic!()
}
async fn start_request<AD: AuthorityDiscovery>(&self, _: &mut AD, _: Requests, _: IfDisconnected) {
}
}
#[async_trait]
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment