diff --git a/Cargo.lock b/Cargo.lock index bdbf6ddac268592e1979dabe5f3357926ecebcd8..074b657e767b1cc121a471f5aee662fc2290ebda 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -15555,6 +15555,7 @@ dependencies = [ "futures-timer", "ip_network", "libp2p", + "linked_hash_set", "log", "multihash 0.18.1", "multihash-codetable", diff --git a/cumulus/client/relay-chain-minimal-node/src/lib.rs b/cumulus/client/relay-chain-minimal-node/src/lib.rs index 4bccca59fe3ea2eec816513778885f43c9504d51..6aea043713d873b77ecadc8b7efc7c7eb8c0039b 100644 --- a/cumulus/client/relay-chain-minimal-node/src/lib.rs +++ b/cumulus/client/relay-chain-minimal-node/src/lib.rs @@ -55,6 +55,7 @@ fn build_authority_discovery_service<Block: BlockT>( prometheus_registry: Option<Registry>, ) -> AuthorityDiscoveryService { let auth_disc_publish_non_global_ips = config.network.allow_non_globals_in_dht; + let auth_disc_public_addresses = config.network.public_addresses.clone(); let authority_discovery_role = sc_authority_discovery::Role::Discover; let dht_event_stream = network.event_stream("authority-discovery").filter_map(|e| async move { match e { @@ -65,6 +66,7 @@ fn build_authority_discovery_service<Block: BlockT>( let (worker, service) = sc_authority_discovery::new_worker_and_service_with_config( sc_authority_discovery::WorkerConfig { publish_non_global_ips: auth_disc_publish_non_global_ips, + public_addresses: auth_disc_public_addresses, // Require that authority discovery records are signed. strict_record_validation: true, ..Default::default() diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs index 83a0afc077e7edfb4a60fb8ed4109157f020695b..4f4ede537055ebf58d0c126890256f9be2dad830 100644 --- a/polkadot/node/service/src/lib.rs +++ b/polkadot/node/service/src/lib.rs @@ -807,6 +807,7 @@ pub fn new_full<OverseerGenerator: OverseerGen>( let shared_voter_state = rpc_setup; let auth_disc_publish_non_global_ips = config.network.allow_non_globals_in_dht; + let auth_disc_public_addresses = config.network.public_addresses.clone(); let mut net_config = sc_network::config::FullNetworkConfiguration::new(&config.network); let genesis_hash = client.block_hash(0).ok().flatten().expect("Genesis block exists; qed"); @@ -1061,6 +1062,7 @@ pub fn new_full<OverseerGenerator: OverseerGen>( let (worker, service) = sc_authority_discovery::new_worker_and_service_with_config( sc_authority_discovery::WorkerConfig { publish_non_global_ips: auth_disc_publish_non_global_ips, + public_addresses: auth_disc_public_addresses, // Require that authority discovery records are signed. strict_record_validation: true, ..Default::default() diff --git a/substrate/bin/node/cli/src/service.rs b/substrate/bin/node/cli/src/service.rs index 8f2aba6b44cd0a980771ec7d84eb383421551a6b..e4b425e6f96342ee02dd46e0ea31d7a26790677b 100644 --- a/substrate/bin/node/cli/src/service.rs +++ b/substrate/bin/node/cli/src/service.rs @@ -422,6 +422,7 @@ pub fn new_full_base( let shared_voter_state = rpc_setup; let auth_disc_publish_non_global_ips = config.network.allow_non_globals_in_dht; + let auth_disc_public_addresses = config.network.public_addresses.clone(); let mut net_config = sc_network::config::FullNetworkConfiguration::new(&config.network); let genesis_hash = client.block_hash(0).ok().flatten().expect("Genesis block exists; qed"); @@ -610,6 +611,7 @@ pub fn new_full_base( sc_authority_discovery::new_worker_and_service_with_config( sc_authority_discovery::WorkerConfig { publish_non_global_ips: auth_disc_publish_non_global_ips, + public_addresses: auth_disc_public_addresses, ..Default::default() }, client.clone(), diff --git a/substrate/client/authority-discovery/Cargo.toml b/substrate/client/authority-discovery/Cargo.toml index cdd4052f0b0998f283ede885f7e7de7509ecd169..26580064b3c453fb3a6ef77ff8d45413877f1d8d 100644 --- a/substrate/client/authority-discovery/Cargo.toml +++ b/substrate/client/authority-discovery/Cargo.toml @@ -29,6 +29,7 @@ multihash = { version = "0.18.1", default-features = false, features = [ "sha2", "std", ] } +linked_hash_set = "0.1.4" log = { workspace = true, default-features = true } prost = "0.12" rand = "0.8.5" diff --git a/substrate/client/authority-discovery/src/lib.rs b/substrate/client/authority-discovery/src/lib.rs index 6bb12804cada34d1b3f87b4249fc466fcf253a9a..281188de14324ed38793a8968341f6d450d3f8c0 100644 --- a/substrate/client/authority-discovery/src/lib.rs +++ b/substrate/client/authority-discovery/src/lib.rs @@ -80,6 +80,10 @@ pub struct WorkerConfig { /// Defaults to `true` to avoid the surprise factor. pub publish_non_global_ips: bool, + /// Public addresses set by the node operator to always publish first in the authority + /// discovery DHT record. + pub public_addresses: Vec<Multiaddr>, + /// Reject authority discovery records that are not signed by their network identity (PeerId) /// /// Defaults to `false` to provide compatibility with old versions @@ -104,6 +108,7 @@ impl Default for WorkerConfig { // `authority_discovery_dht_event_received`. max_query_interval: Duration::from_secs(10 * 60), publish_non_global_ips: true, + public_addresses: Vec::new(), strict_record_validation: false, } } diff --git a/substrate/client/authority-discovery/src/worker.rs b/substrate/client/authority-discovery/src/worker.rs index 9bccb96ff378f262e9b66b4268d2ed95befe2bdb..b77f0241ec2faf4d0466103b2bd07226715b334c 100644 --- a/substrate/client/authority-discovery/src/worker.rs +++ b/substrate/client/authority-discovery/src/worker.rs @@ -35,6 +35,7 @@ use addr_cache::AddrCache; use codec::{Decode, Encode}; use ip_network::IpNetwork; use libp2p::{core::multiaddr, identity::PublicKey, multihash::Multihash, Multiaddr, PeerId}; +use linked_hash_set::LinkedHashSet; use multihash_codetable::{Code, MultihashDigest}; use log::{debug, error, log_enabled}; @@ -120,14 +121,22 @@ pub struct Worker<Client, Network, Block, DhtEventStream> { /// Interval to be proactive, publishing own addresses. publish_interval: ExpIncInterval, + /// Pro-actively publish our own addresses at this interval, if the keys in the keystore /// have changed. publish_if_changed_interval: ExpIncInterval, + /// List of keys onto which addresses have been published at the latest publication. /// Used to check whether they have changed. latest_published_keys: HashSet<AuthorityId>, + /// Same value as in the configuration. publish_non_global_ips: bool, + + /// Public addresses set by the node operator to always publish first in the authority + /// discovery DHT record. + public_addresses: LinkedHashSet<Multiaddr>, + /// Same value as in the configuration. strict_record_validation: bool, @@ -136,6 +145,7 @@ pub struct Worker<Client, Network, Block, DhtEventStream> { /// Queue of throttled lookups pending to be passed to the network. pending_lookups: Vec<AuthorityId>, + /// Set of in-flight lookups. in_flight_lookups: HashMap<KademliaKey, AuthorityId>, @@ -224,6 +234,29 @@ where None => None, }; + let public_addresses = { + let local_peer_id: Multihash = network.local_peer_id().into(); + + config + .public_addresses + .into_iter() + .map(|mut address| { + if let Some(multiaddr::Protocol::P2p(peer_id)) = address.iter().last() { + if peer_id != local_peer_id { + error!( + target: LOG_TARGET, + "Discarding invalid local peer ID in public address {address}.", + ); + } + // Always discard `/p2p/...` protocol for proper address comparison (local + // peer id will be added before publishing). + address.pop(); + } + address + }) + .collect() + }; + Worker { from_service: from_service.fuse(), client, @@ -233,6 +266,7 @@ where publish_if_changed_interval, latest_published_keys: HashSet::new(), publish_non_global_ips: config.publish_non_global_ips, + public_addresses, strict_record_validation: config.strict_record_validation, query_interval, pending_lookups: Vec::new(), @@ -304,32 +338,48 @@ where } fn addresses_to_publish(&self) -> impl Iterator<Item = Multiaddr> { - let peer_id: Multihash = self.network.local_peer_id().into(); let publish_non_global_ips = self.publish_non_global_ips; - let addresses = self.network.external_addresses().into_iter().filter(move |a| { - if publish_non_global_ips { - return true - } + let addresses = self + .public_addresses + .clone() + .into_iter() + .chain(self.network.external_addresses().into_iter().filter_map(|mut address| { + // Make sure the reported external address does not contain `/p2p/...` protocol. + if let Some(multiaddr::Protocol::P2p(_)) = address.iter().last() { + address.pop(); + } - a.iter().all(|p| match p { - // The `ip_network` library is used because its `is_global()` method is stable, - // while `is_global()` in the standard library currently isn't. - multiaddr::Protocol::Ip4(ip) if !IpNetwork::from(ip).is_global() => false, - multiaddr::Protocol::Ip6(ip) if !IpNetwork::from(ip).is_global() => false, - _ => true, + if self.public_addresses.contains(&address) { + // Already added above. + None + } else { + Some(address) + } + })) + .filter(move |address| { + if publish_non_global_ips { + return true + } + + address.iter().all(|protocol| match protocol { + // The `ip_network` library is used because its `is_global()` method is stable, + // while `is_global()` in the standard library currently isn't. + multiaddr::Protocol::Ip4(ip) if !IpNetwork::from(ip).is_global() => false, + multiaddr::Protocol::Ip6(ip) if !IpNetwork::from(ip).is_global() => false, + _ => true, + }) }) - }); + .collect::<Vec<_>>(); - debug!(target: LOG_TARGET, "Authority DHT record peer_id='{:?}' addresses='{:?}'", peer_id, addresses.clone().collect::<Vec<_>>()); + let peer_id = self.network.local_peer_id(); + debug!( + target: LOG_TARGET, + "Authority DHT record peer_id='{peer_id}' addresses='{addresses:?}'", + ); - // The address must include the peer id if not already set. - addresses.map(move |a| { - if a.iter().any(|p| matches!(p, multiaddr::Protocol::P2p(_))) { - a - } else { - a.with(multiaddr::Protocol::P2p(peer_id)) - } - }) + // The address must include the peer id. + let peer_id: Multihash = peer_id.into(); + addresses.into_iter().map(move |a| a.with(multiaddr::Protocol::P2p(peer_id))) } /// Publish own public addresses. diff --git a/substrate/client/network/Cargo.toml b/substrate/client/network/Cargo.toml index cbf74440dc1a83933d1e994294a7d37609c72331..c6f17647166c4c406bd83003344177395599e67b 100644 --- a/substrate/client/network/Cargo.toml +++ b/substrate/client/network/Cargo.toml @@ -29,7 +29,7 @@ futures = "0.3.21" futures-timer = "3.0.2" ip_network = "0.4.1" libp2p = { version = "0.51.4", features = ["dns", "identify", "kad", "macros", "mdns", "noise", "ping", "request-response", "tcp", "tokio", "websocket", "yamux"] } -linked_hash_set = "0.1.3" +linked_hash_set = "0.1.4" log = { workspace = true, default-features = true } mockall = "0.11.3" parking_lot = "0.12.1" diff --git a/substrate/client/network/src/discovery.rs b/substrate/client/network/src/discovery.rs index 77c26266aac4691cf36b440e6d06c792e57a311e..4e2121c5540d6c7c6d895e2756b266f6b5a945e8 100644 --- a/substrate/client/network/src/discovery.rs +++ b/substrate/client/network/src/discovery.rs @@ -72,6 +72,7 @@ use libp2p::{ }, PeerId, }; +use linked_hash_set::LinkedHashSet; use log::{debug, info, trace, warn}; use sp_core::hexdisplay::HexDisplay; use std::{ @@ -550,14 +551,20 @@ impl NetworkBehaviour for DiscoveryBehaviour { ) -> Result<Vec<Multiaddr>, ConnectionDenied> { let Some(peer_id) = maybe_peer else { return Ok(Vec::new()) }; - let mut list = self + // Collect addresses into [`LinkedHashSet`] to eliminate duplicate entries preserving the + // order of addresses. Give priority to `permanent_addresses` (used with reserved nodes) and + // `ephemeral_addresses` (used for addresses discovered from other sources, like authority + // discovery DHT records). + let mut list: LinkedHashSet<_> = self .permanent_addresses .iter() .filter_map(|(p, a)| (*p == peer_id).then_some(a.clone())) - .collect::<Vec<_>>(); + .collect(); if let Some(ephemeral_addresses) = self.ephemeral_addresses.get(&peer_id) { - list.extend(ephemeral_addresses.clone()); + ephemeral_addresses.iter().for_each(|address| { + list.insert_if_absent(address.clone()); + }); } { @@ -583,12 +590,14 @@ impl NetworkBehaviour for DiscoveryBehaviour { }); } - list.extend(list_to_filter); + list_to_filter.into_iter().for_each(|address| { + list.insert_if_absent(address); + }); } trace!(target: "sub-libp2p", "Addresses of {:?}: {:?}", peer_id, list); - Ok(list) + Ok(list.into_iter().collect()) } fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {