From addd8628adca59be754a6809367ec51f69eede08 Mon Sep 17 00:00:00 2001 From: Alin Dima <alindima99@pm.me> Date: Tue, 1 Aug 2023 13:39:15 +0300 Subject: [PATCH] network: optimize update procedure for listen_addrs and external_addrs (#14689) * network: optimize listen_address update procedure * network: optimize external_addr update procedure * replace on_swarm_event with add/remove --- substrate/client/network/src/behaviour.rs | 10 ++++-- substrate/client/network/src/peer_info.rs | 33 ++++++++++++++++++-- substrate/client/network/src/service.rs | 38 +++++++++++------------ 3 files changed, 57 insertions(+), 24 deletions(-) diff --git a/substrate/client/network/src/behaviour.rs b/substrate/client/network/src/behaviour.rs index 4799807d2cb..e9b55a48e6d 100644 --- a/substrate/client/network/src/behaviour.rs +++ b/substrate/client/network/src/behaviour.rs @@ -34,9 +34,10 @@ use libp2p::{ identity::PublicKey, kad::RecordKey, swarm::NetworkBehaviour, PeerId, }; +use parking_lot::Mutex; use sc_network_common::role::{ObservedRole, Roles}; use sp_runtime::traits::Block as BlockT; -use std::{collections::HashSet, time::Duration}; +use std::{collections::HashSet, sync::Arc, time::Duration}; pub use crate::request_responses::{InboundFailure, OutboundFailure, RequestId, ResponseFailure}; @@ -174,10 +175,15 @@ impl<B: BlockT> Behaviour<B> { request_response_protocols: Vec<ProtocolConfig>, peerset: PeersetHandle, connection_limits: ConnectionLimits, + external_addresses: Arc<Mutex<HashSet<Multiaddr>>>, ) -> Result<Self, request_responses::RegisterError> { Ok(Self { substrate, - peer_info: peer_info::PeerInfoBehaviour::new(user_agent, local_public_key), + peer_info: peer_info::PeerInfoBehaviour::new( + user_agent, + local_public_key, + external_addresses, + ), discovery: disco_config.finish(), connection_limits: libp2p::connection_limits::Behaviour::new(connection_limits), request_responses: request_responses::RequestResponsesBehaviour::new( diff --git a/substrate/client/network/src/peer_info.rs b/substrate/client/network/src/peer_info.rs index d7f9e6858fa..4a9908e8e1c 100644 --- a/substrate/client/network/src/peer_info.rs +++ b/substrate/client/network/src/peer_info.rs @@ -43,11 +43,13 @@ use libp2p::{ Multiaddr, PeerId, }; use log::{debug, error, trace}; +use parking_lot::Mutex; use smallvec::SmallVec; use std::{ - collections::hash_map::Entry, + collections::{hash_map::Entry, HashSet}, pin::Pin, + sync::Arc, task::{Context, Poll}, time::{Duration, Instant}, }; @@ -67,6 +69,8 @@ pub struct PeerInfoBehaviour { nodes_info: FnvHashMap<PeerId, NodeInfo>, /// Interval at which we perform garbage collection in `nodes_info`. garbage_collect: Pin<Box<dyn Stream<Item = ()> + Send>>, + /// Record keeping of external addresses. Data is queried by the `NetworkService`. + external_addresses: ExternalAddresses, } /// Information about a node we're connected to. @@ -91,9 +95,31 @@ impl NodeInfo { } } +/// Utility struct for tracking external addresses. The data is shared with the `NetworkService`. +#[derive(Debug, Clone, Default)] +pub struct ExternalAddresses { + addresses: Arc<Mutex<HashSet<Multiaddr>>>, +} + +impl ExternalAddresses { + /// Add an external address. + pub fn add(&mut self, addr: Multiaddr) { + self.addresses.lock().insert(addr); + } + + /// Remove an external address. + pub fn remove(&mut self, addr: &Multiaddr) { + self.addresses.lock().remove(addr); + } +} + impl PeerInfoBehaviour { /// Builds a new `PeerInfoBehaviour`. - pub fn new(user_agent: String, local_public_key: PublicKey) -> Self { + pub fn new( + user_agent: String, + local_public_key: PublicKey, + external_addresses: Arc<Mutex<HashSet<Multiaddr>>>, + ) -> Self { let identify = { let cfg = IdentifyConfig::new("/substrate/1.0".to_string(), local_public_key) .with_agent_version(user_agent) @@ -107,6 +133,7 @@ impl PeerInfoBehaviour { identify, nodes_info: FnvHashMap::default(), garbage_collect: Box::pin(interval(GARBAGE_COLLECT_INTERVAL)), + external_addresses: ExternalAddresses { addresses: external_addresses }, } } @@ -367,6 +394,7 @@ impl NetworkBehaviour for PeerInfoBehaviour { FromSwarm::ExpiredListenAddr(e) => { self.ping.on_swarm_event(FromSwarm::ExpiredListenAddr(e)); self.identify.on_swarm_event(FromSwarm::ExpiredListenAddr(e)); + self.external_addresses.remove(e.addr); }, FromSwarm::NewExternalAddrCandidate(e) => { self.ping.on_swarm_event(FromSwarm::NewExternalAddrCandidate(e)); @@ -375,6 +403,7 @@ impl NetworkBehaviour for PeerInfoBehaviour { FromSwarm::ExternalAddrConfirmed(e) => { self.ping.on_swarm_event(FromSwarm::ExternalAddrConfirmed(e)); self.identify.on_swarm_event(FromSwarm::ExternalAddrConfirmed(e)); + self.external_addresses.add(e.addr.clone()); }, FromSwarm::AddressChange(e @ AddressChange { peer_id, old, new, .. }) => { self.ping.on_swarm_event(FromSwarm::AddressChange(e)); diff --git a/substrate/client/network/src/service.rs b/substrate/client/network/src/service.rs index 28ad85074c4..9d18bdfc621 100644 --- a/substrate/client/network/src/service.rs +++ b/substrate/client/network/src/service.rs @@ -104,9 +104,9 @@ pub struct NetworkService<B: BlockT + 'static, H: ExHashT> { /// Number of peers we're connected to. num_connected: Arc<AtomicUsize>, /// The local external addresses. - external_addresses: Arc<Mutex<Vec<Multiaddr>>>, + external_addresses: Arc<Mutex<HashSet<Multiaddr>>>, /// Listen addresses. Do **NOT** include a trailing `/p2p/` with our `PeerId`. - listen_addresses: Arc<Mutex<Vec<Multiaddr>>>, + listen_addresses: Arc<Mutex<HashSet<Multiaddr>>>, /// Local copy of the `PeerId` of the local node. local_peer_id: PeerId, /// The `KeyPair` that defines the `PeerId` of the local node. @@ -301,6 +301,7 @@ where })?; let num_connected = Arc::new(AtomicUsize::new(0)); + let external_addresses = Arc::new(Mutex::new(HashSet::new())); // Build the swarm. let (mut swarm, bandwidth): (Swarm<Behaviour<B>>, _) = { @@ -354,6 +355,7 @@ where .with_max_established_incoming(Some( crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING, )), + external_addresses.clone(), ); match result { @@ -412,13 +414,12 @@ where Swarm::<Behaviour<B>>::add_external_address(&mut swarm, addr.clone()); } - let external_addresses = Arc::new(Mutex::new(Vec::new())); - let listen_addresses = Arc::new(Mutex::new(Vec::new())); + let listen_addresses = Arc::new(Mutex::new(HashSet::new())); let peers_notifications_sinks = Arc::new(Mutex::new(HashMap::new())); let service = Arc::new(NetworkService { bandwidth, - external_addresses: external_addresses.clone(), + external_addresses, listen_addresses: listen_addresses.clone(), num_connected: num_connected.clone(), peerset: peerset_handle, @@ -434,7 +435,6 @@ where }); Ok(NetworkWorker { - external_addresses, listen_addresses, num_connected, network_service: swarm, @@ -694,12 +694,12 @@ where { /// Returns the local external addresses. fn external_addresses(&self) -> Vec<Multiaddr> { - self.external_addresses.lock().clone() + self.external_addresses.lock().iter().cloned().collect() } /// Returns the listener addresses (without trailing `/p2p/` with our `PeerId`). fn listen_addresses(&self) -> Vec<Multiaddr> { - self.listen_addresses.lock().clone() + self.listen_addresses.lock().iter().cloned().collect() } /// Returns the local Peer ID. @@ -1123,9 +1123,7 @@ where H: ExHashT, { /// Updated by the `NetworkWorker` and loaded by the `NetworkService`. - external_addresses: Arc<Mutex<Vec<Multiaddr>>>, - /// Updated by the `NetworkWorker` and loaded by the `NetworkService`. - listen_addresses: Arc<Mutex<Vec<Multiaddr>>>, + listen_addresses: Arc<Mutex<HashSet<Multiaddr>>>, /// Updated by the `NetworkWorker` and loaded by the `NetworkService`. num_connected: Arc<AtomicUsize>, /// The network service that can be extracted and shared through the codebase. @@ -1182,18 +1180,10 @@ where }, }; - // Update the variables shared with the `NetworkService`. + // Update the `num_connected` count shared with the `NetworkService`. let num_connected_peers = self.network_service.behaviour_mut().user_protocol_mut().num_connected_peers(); self.num_connected.store(num_connected_peers, Ordering::Relaxed); - { - let external_addresses = self.network_service.external_addresses().cloned().collect(); - *self.external_addresses.lock() = external_addresses; - - let listen_addresses = - self.network_service.listeners().map(ToOwned::to_owned).collect(); - *self.listen_addresses.lock() = listen_addresses; - } if let Some(metrics) = self.metrics.as_ref() { if let Some(buckets) = self.network_service.behaviour_mut().num_entries_per_kbucket() { @@ -1602,12 +1592,14 @@ where if let Some(metrics) = self.metrics.as_ref() { metrics.listeners_local_addresses.inc(); } + self.listen_addresses.lock().insert(address.clone()); }, SwarmEvent::ExpiredListenAddr { address, .. } => { info!(target: "sub-libp2p", "📪 No longer listening on {}", address); if let Some(metrics) = self.metrics.as_ref() { metrics.listeners_local_addresses.dec(); } + self.listen_addresses.lock().remove(&address); }, SwarmEvent::OutgoingConnectionError { connection_id, peer_id, error } => { if let Some(peer_id) = peer_id { @@ -1712,6 +1704,12 @@ where if let Some(metrics) = self.metrics.as_ref() { metrics.listeners_local_addresses.sub(addresses.len() as u64); } + let mut listen_addresses = self.listen_addresses.lock(); + for addr in &addresses { + listen_addresses.remove(addr); + } + drop(listen_addresses); + let addrs = addresses.into_iter().map(|a| a.to_string()).collect::<Vec<_>>().join(", "); match reason { -- GitLab