diff --git a/substrate/client/authority-discovery/src/lib.rs b/substrate/client/authority-discovery/src/lib.rs index 800f683aa0aefd5b9914b03b3725acb470488290..1bbb9f38796c28e0053d192d6cc3c907157802d2 100644 --- a/substrate/client/authority-discovery/src/lib.rs +++ b/substrate/client/authority-discovery/src/lib.rs @@ -18,6 +18,7 @@ #![warn(missing_docs)] #![recursion_limit = "1024"] + //! Substrate authority discovery. //! //! This crate enables Substrate authorities to discover and directly connect to @@ -31,7 +32,7 @@ pub use crate::{ worker::{NetworkProvider, Role, Worker}, }; -use std::{sync::Arc, time::Duration}; +use std::{collections::HashSet, sync::Arc, time::Duration}; use futures::{ channel::{mpsc, oneshot}, @@ -58,11 +59,13 @@ pub struct WorkerConfig { /// /// By default this is set to 1 hour. pub max_publish_interval: Duration, + /// Interval at which the keystore is queried. If the keys have changed, unconditionally /// re-publish its addresses on the DHT. /// /// By default this is set to 1 minute. pub keystore_refresh_interval: Duration, + /// The maximum interval in which the node will query the DHT for new entries. /// /// By default this is set to 10 minutes. @@ -156,7 +159,7 @@ where /// Message send from the [`Service`] to the [`Worker`]. pub(crate) enum ServicetoWorkerMsg { /// See [`Service::get_addresses_by_authority_id`]. - GetAddressesByAuthorityId(AuthorityId, oneshot::Sender<Option<Vec<Multiaddr>>>), - /// See [`Service::get_authority_id_by_peer_id`]. - GetAuthorityIdByPeerId(PeerId, oneshot::Sender<Option<AuthorityId>>), + GetAddressesByAuthorityId(AuthorityId, oneshot::Sender<Option<HashSet<Multiaddr>>>), + /// See [`Service::get_authority_ids_by_peer_id`]. + GetAuthorityIdsByPeerId(PeerId, oneshot::Sender<Option<HashSet<AuthorityId>>>), } diff --git a/substrate/client/authority-discovery/src/service.rs b/substrate/client/authority-discovery/src/service.rs index 2e5ae66e4dd4a0e193af551082b77a42aac8d0e0..9b59a4ec8647f55692e50b61a9844b24319cbb1e 100644 --- a/substrate/client/authority-discovery/src/service.rs +++ b/substrate/client/authority-discovery/src/service.rs @@ -16,7 +16,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see <https://www.gnu.org/licenses/>. -use std::fmt::Debug; +use std::{collections::HashSet, fmt::Debug}; use crate::ServicetoWorkerMsg; @@ -62,7 +62,7 @@ impl Service { pub async fn get_addresses_by_authority_id( &mut self, authority: AuthorityId, - ) -> Option<Vec<Multiaddr>> { + ) -> Option<HashSet<Multiaddr>> { let (tx, rx) = oneshot::channel(); self.to_worker @@ -78,11 +78,14 @@ impl Service { /// /// Returns `None` if no entry was present or connection to the /// [`crate::Worker`] failed. - pub async fn get_authority_id_by_peer_id(&mut self, peer_id: PeerId) -> Option<AuthorityId> { + pub async fn get_authority_ids_by_peer_id( + &mut self, + peer_id: PeerId, + ) -> Option<HashSet<AuthorityId>> { let (tx, rx) = oneshot::channel(); self.to_worker - .send(ServicetoWorkerMsg::GetAuthorityIdByPeerId(peer_id, tx)) + .send(ServicetoWorkerMsg::GetAuthorityIdsByPeerId(peer_id, tx)) .await .ok()?; diff --git a/substrate/client/authority-discovery/src/tests.rs b/substrate/client/authority-discovery/src/tests.rs index 3784b4c834266fe1565d8e1afdd383b8b09cfb78..cef91445064caf4b10236fa43bed4ac3f6e1b716 100644 --- a/substrate/client/authority-discovery/src/tests.rs +++ b/substrate/client/authority-discovery/src/tests.rs @@ -29,7 +29,7 @@ use libp2p::core::{ multiaddr::{Multiaddr, Protocol}, PeerId, }; -use std::sync::Arc; +use std::{collections::HashSet, sync::Arc}; use sp_authority_discovery::AuthorityId; use sp_core::crypto::key_types; @@ -73,12 +73,12 @@ fn get_addresses_and_authority_id() { pool.run_until(async { assert_eq!( - Some(vec![remote_addr]), + Some(HashSet::from([remote_addr])), service.get_addresses_by_authority_id(remote_authority_id.clone()).await, ); assert_eq!( - Some(remote_authority_id), - service.get_authority_id_by_peer_id(remote_peer_id).await, + Some(HashSet::from([remote_authority_id])), + service.get_authority_ids_by_peer_id(remote_peer_id).await, ); }); } diff --git a/substrate/client/authority-discovery/src/worker.rs b/substrate/client/authority-discovery/src/worker.rs index a689d0bafd2624fe9d5f1753e7f059de3d7be50f..00021ecbdcb833cc4e8f4e985b3848b1bc1a7ae8 100644 --- a/substrate/client/authority-discovery/src/worker.rs +++ b/substrate/client/authority-discovery/src/worker.rs @@ -259,9 +259,9 @@ where self.addr_cache.get_addresses_by_authority_id(&authority).map(Clone::clone), ); }, - ServicetoWorkerMsg::GetAuthorityIdByPeerId(peer_id, sender) => { + ServicetoWorkerMsg::GetAuthorityIdsByPeerId(peer_id, sender) => { let _ = sender - .send(self.addr_cache.get_authority_id_by_peer_id(&peer_id).map(Clone::clone)); + .send(self.addr_cache.get_authority_ids_by_peer_id(&peer_id).map(Clone::clone)); }, } } @@ -374,7 +374,7 @@ where .map_err(|e| Error::CallingRuntime(e.into()))? .into_iter() .filter(|id| !local_keys.contains(id.as_ref())) - .collect(); + .collect::<Vec<_>>(); self.addr_cache.retain_ids(&authorities); @@ -548,7 +548,7 @@ where if let Some(metrics) = &self.metrics { metrics .known_authorities_count - .set(self.addr_cache.num_ids().try_into().unwrap_or(std::u64::MAX)); + .set(self.addr_cache.num_authority_ids().try_into().unwrap_or(std::u64::MAX)); } } Ok(()) diff --git a/substrate/client/authority-discovery/src/worker/addr_cache.rs b/substrate/client/authority-discovery/src/worker/addr_cache.rs index e770297f6f3bed4cfc3a74743bd71225a37ba6fa..d4ba156d5fa191935dbef85497f221870f9d2fb4 100644 --- a/substrate/client/authority-discovery/src/worker/addr_cache.rs +++ b/substrate/client/authority-discovery/src/worker/addr_cache.rs @@ -17,79 +17,94 @@ // along with this program. If not, see <https://www.gnu.org/licenses/>. use libp2p::core::multiaddr::{Multiaddr, Protocol}; -use std::collections::HashMap; use sc_network::PeerId; use sp_authority_discovery::AuthorityId; +use std::collections::{hash_map::Entry, HashMap, HashSet}; -/// Cache for [`AuthorityId`] -> [`Vec<Multiaddr>`] and [`PeerId`] -> [`AuthorityId`] mappings. +/// Cache for [`AuthorityId`] -> [`HashSet<Multiaddr>`] and [`PeerId`] -> [`HashSet<AuthorityId>`] +/// mappings. pub(super) struct AddrCache { - // The addresses found in `authority_id_to_addresses` are guaranteed to always match - // the peerids found in `peer_id_to_authority_id`. In other words, these two hashmaps - // are similar to a bi-directional map. - authority_id_to_addresses: HashMap<AuthorityId, Vec<Multiaddr>>, - peer_id_to_authority_id: HashMap<PeerId, AuthorityId>, + /// The addresses found in `authority_id_to_addresses` are guaranteed to always match + /// the peerids found in `peer_id_to_authority_ids`. In other words, these two hashmaps + /// are similar to a bi-directional map. + /// + /// Since we may store the mapping across several sessions, a single + /// `PeerId` might correspond to multiple `AuthorityId`s. However, + /// it's not expected that a single `AuthorityId` can have multiple `PeerId`s. + authority_id_to_addresses: HashMap<AuthorityId, HashSet<Multiaddr>>, + peer_id_to_authority_ids: HashMap<PeerId, HashSet<AuthorityId>>, } impl AddrCache { pub fn new() -> Self { AddrCache { authority_id_to_addresses: HashMap::new(), - peer_id_to_authority_id: HashMap::new(), + peer_id_to_authority_ids: HashMap::new(), } } /// Inserts the given [`AuthorityId`] and [`Vec<Multiaddr>`] pair for future lookups by /// [`AuthorityId`] or [`PeerId`]. - pub fn insert(&mut self, authority_id: AuthorityId, mut addresses: Vec<Multiaddr>) { - addresses.sort_unstable_by(|a, b| a.as_ref().cmp(b.as_ref())); + pub fn insert(&mut self, authority_id: AuthorityId, addresses: Vec<Multiaddr>) { + let addresses = addresses.into_iter().collect::<HashSet<_>>(); + let peer_ids = addresses_to_peer_ids(&addresses); + + if peer_ids.is_empty() { + log::debug!( + target: super::LOG_TARGET, + "Authority({:?}) provides no addresses or addresses without peer ids. Adresses: {:?}", + authority_id, + addresses, + ); - // Insert into `self.peer_id_to_authority_id`. - let peer_ids = addresses - .iter() - .map(|a| peer_id_from_multiaddr(a)) - .filter_map(|peer_id| peer_id); - for peer_id in peer_ids.clone() { - let former_auth = - match self.peer_id_to_authority_id.insert(peer_id, authority_id.clone()) { - Some(a) if a != authority_id => a, - _ => continue, - }; - - // PeerId was associated to a different authority id before. - // Remove corresponding authority from `self.authority_id_to_addresses`. - let former_auth_addrs = match self.authority_id_to_addresses.get_mut(&former_auth) { - Some(a) => a, - None => { - debug_assert!(false); - continue - }, - }; - former_auth_addrs.retain(|a| peer_id_from_multiaddr(a).map_or(true, |p| p != peer_id)); + return + } else if peer_ids.len() > 1 { + log::warn!( + target: super::LOG_TARGET, + "Authority({:?}) can be reached through multiple peer ids: {:?}", + authority_id, + peer_ids + ); } - // Insert into `self.authority_id_to_addresses`. - for former_addr in self - .authority_id_to_addresses - .insert(authority_id.clone(), addresses.clone()) - .unwrap_or_default() - { - // Must remove from `self.peer_id_to_authority_id` any PeerId formerly associated - // to that authority but that can't be found in its new addresses. - - let peer_id = match peer_id_from_multiaddr(&former_addr) { - Some(p) => p, - None => continue, - }; + let old_addresses = self.authority_id_to_addresses.insert(authority_id.clone(), addresses); + let old_peer_ids = addresses_to_peer_ids(&old_addresses.unwrap_or_default()); - if !peer_ids.clone().any(|p| p == peer_id) { - self.peer_id_to_authority_id.remove(&peer_id); + // Add the new peer ids + peer_ids.difference(&old_peer_ids).for_each(|new_peer_id| { + self.peer_id_to_authority_ids + .entry(*new_peer_id) + .or_default() + .insert(authority_id.clone()); + }); + + // Remove the old peer ids + self.remove_authority_id_from_peer_ids(&authority_id, old_peer_ids.difference(&peer_ids)); + } + + /// Remove the given `authority_id` from the `peer_id` to `authority_ids` mapping. + /// + /// If a `peer_id` doesn't have any `authority_id` assigned anymore, it is removed. + fn remove_authority_id_from_peer_ids<'a>( + &mut self, + authority_id: &AuthorityId, + peer_ids: impl Iterator<Item = &'a PeerId>, + ) { + peer_ids.for_each(|peer_id| { + if let Entry::Occupied(mut e) = self.peer_id_to_authority_ids.entry(*peer_id) { + e.get_mut().remove(authority_id); + + // If there are no more entries, remove the peer id. + if e.get().is_empty() { + e.remove(); + } } - } + }) } /// Returns the number of authority IDs in the cache. - pub fn num_ids(&self) -> usize { + pub fn num_authority_ids(&self) -> usize { self.authority_id_to_addresses.len() } @@ -97,18 +112,21 @@ impl AddrCache { pub fn get_addresses_by_authority_id( &self, authority_id: &AuthorityId, - ) -> Option<&Vec<Multiaddr>> { - self.authority_id_to_addresses.get(&authority_id) + ) -> Option<&HashSet<Multiaddr>> { + self.authority_id_to_addresses.get(authority_id) } - /// Returns the [`AuthorityId`] for the given [`PeerId`]. - pub fn get_authority_id_by_peer_id(&self, peer_id: &PeerId) -> Option<&AuthorityId> { - self.peer_id_to_authority_id.get(peer_id) + /// Returns the [`AuthorityId`]s for the given [`PeerId`]. + /// + /// As the authority id can change between sessions, one [`PeerId`] can be mapped to + /// multiple authority ids. + pub fn get_authority_ids_by_peer_id(&self, peer_id: &PeerId) -> Option<&HashSet<AuthorityId>> { + self.peer_id_to_authority_ids.get(peer_id) } /// Removes all [`PeerId`]s and [`Multiaddr`]s from the cache that are not related to the given /// [`AuthorityId`]s. - pub fn retain_ids(&mut self, authority_ids: &Vec<AuthorityId>) { + pub fn retain_ids(&mut self, authority_ids: &[AuthorityId]) { // The below logic could be replaced by `BtreeMap::drain_filter` once it stabilized. let authority_ids_to_remove = self .authority_id_to_addresses @@ -120,19 +138,18 @@ impl AddrCache { for authority_id_to_remove in authority_ids_to_remove { // Remove other entries from `self.authority_id_to_addresses`. - let addresses = self.authority_id_to_addresses.remove(&authority_id_to_remove); - - // Remove other entries from `self.peer_id_to_authority_id`. - let peer_ids = addresses - .iter() - .flatten() - .map(|a| peer_id_from_multiaddr(a)) - .filter_map(|peer_id| peer_id); - for peer_id in peer_ids { - if let Some(id) = self.peer_id_to_authority_id.remove(&peer_id) { - debug_assert_eq!(authority_id_to_remove, id); - } - } + let addresses = if let Some(addresses) = + self.authority_id_to_addresses.remove(&authority_id_to_remove) + { + addresses + } else { + continue + }; + + self.remove_authority_id_from_peer_ids( + &authority_id_to_remove, + addresses_to_peer_ids(&addresses).iter(), + ); } } } @@ -147,6 +164,13 @@ fn peer_id_from_multiaddr(addr: &Multiaddr) -> Option<PeerId> { }) } +fn addresses_to_peer_ids(addresses: &HashSet<Multiaddr>) -> HashSet<PeerId> { + addresses + .iter() + .filter_map(|a| peer_id_from_multiaddr(a)) + .collect::<HashSet<_>>() +} + #[cfg(test)] mod tests { use super::*; @@ -226,27 +250,27 @@ mod tests { cache.insert(third.0.clone(), vec![third.1.clone()]); assert_eq!( - Some(&vec![third.1.clone()]), + Some(&HashSet::from([third.1.clone()])), cache.get_addresses_by_authority_id(&third.0), - "Expect `get_addresses_by_authority_id` to return addresses of third authority." + "Expect `get_addresses_by_authority_id` to return addresses of third authority.", ); assert_eq!( - Some(&third.0), - cache.get_authority_id_by_peer_id(&peer_id_from_multiaddr(&third.1).unwrap()), - "Expect `get_authority_id_by_peer_id` to return `AuthorityId` of third authority." + Some(&HashSet::from([third.0.clone()])), + cache.get_authority_ids_by_peer_id(&peer_id_from_multiaddr(&third.1).unwrap()), + "Expect `get_authority_id_by_peer_id` to return `AuthorityId` of third authority.", ); - cache.retain_ids(&vec![first.0, second.0]); + cache.retain_ids(&vec![first.0.clone(), second.0]); assert_eq!( None, cache.get_addresses_by_authority_id(&third.0), - "Expect `get_addresses_by_authority_id` to not return `None` for third authority." + "Expect `get_addresses_by_authority_id` to not return `None` for third authority.", ); assert_eq!( None, - cache.get_authority_id_by_peer_id(&peer_id_from_multiaddr(&third.1).unwrap()), - "Expect `get_authority_id_by_peer_id` to return `None` for third authority." + cache.get_authority_ids_by_peer_id(&peer_id_from_multiaddr(&third.1).unwrap()), + "Expect `get_authority_id_by_peer_id` to return `None` for third authority.", ); TestResult::passed() @@ -282,44 +306,47 @@ mod tests { assert_eq!( None, - cache.get_authority_id_by_peer_id(&peer_id_from_multiaddr(&multiaddr1).unwrap()) + cache.get_authority_ids_by_peer_id(&peer_id_from_multiaddr(&multiaddr1).unwrap()) ); assert_eq!( - Some(&authority1), - cache.get_authority_id_by_peer_id(&peer_id_from_multiaddr(&multiaddr2).unwrap()) + Some(&HashSet::from([authority1.clone()])), + cache.get_authority_ids_by_peer_id(&peer_id_from_multiaddr(&multiaddr2).unwrap()) ); assert_eq!( - Some(&authority1), - cache.get_authority_id_by_peer_id(&peer_id_from_multiaddr(&multiaddr3).unwrap()) + Some(&HashSet::from([authority1.clone()])), + cache.get_authority_ids_by_peer_id(&peer_id_from_multiaddr(&multiaddr3).unwrap()) ); assert_eq!( - Some(&authority1), - cache.get_authority_id_by_peer_id(&peer_id_from_multiaddr(&multiaddr4).unwrap()) + Some(&HashSet::from([authority1.clone()])), + cache.get_authority_ids_by_peer_id(&peer_id_from_multiaddr(&multiaddr4).unwrap()) ); cache.insert(authority2.clone(), vec![multiaddr2.clone()]); assert_eq!( - Some(&authority2), - cache.get_authority_id_by_peer_id(&peer_id_from_multiaddr(&multiaddr2).unwrap()) + Some(&HashSet::from([authority2.clone(), authority1.clone()])), + cache.get_authority_ids_by_peer_id(&peer_id_from_multiaddr(&multiaddr2).unwrap()) ); assert_eq!( - Some(&authority1), - cache.get_authority_id_by_peer_id(&peer_id_from_multiaddr(&multiaddr3).unwrap()) + Some(&HashSet::from([authority1.clone()])), + cache.get_authority_ids_by_peer_id(&peer_id_from_multiaddr(&multiaddr3).unwrap()) ); - assert_eq!(cache.get_addresses_by_authority_id(&authority1).unwrap().len(), 2); + assert_eq!(cache.get_addresses_by_authority_id(&authority1).unwrap().len(), 3); cache.insert(authority2.clone(), vec![multiaddr2.clone(), multiaddr3.clone()]); assert_eq!( - Some(&authority2), - cache.get_authority_id_by_peer_id(&peer_id_from_multiaddr(&multiaddr2).unwrap()) + Some(&HashSet::from([authority2.clone(), authority1.clone()])), + cache.get_authority_ids_by_peer_id(&peer_id_from_multiaddr(&multiaddr2).unwrap()) + ); + assert_eq!( + Some(&HashSet::from([authority2.clone(), authority1.clone()])), + cache.get_authority_ids_by_peer_id(&peer_id_from_multiaddr(&multiaddr3).unwrap()) ); assert_eq!( - Some(&authority2), - cache.get_authority_id_by_peer_id(&peer_id_from_multiaddr(&multiaddr3).unwrap()) + &HashSet::from([multiaddr2.clone(), multiaddr3.clone(), multiaddr4.clone()]), + cache.get_addresses_by_authority_id(&authority1).unwrap(), ); - assert!(cache.get_addresses_by_authority_id(&authority1).unwrap().is_empty()); TestResult::passed() } @@ -328,4 +355,31 @@ mod tests { .max_tests(10) .quickcheck(property as fn(_, _, _, _, _) -> TestResult) } + + /// As the runtime gives us the current + next authority ids, it can happen that some + /// authority changed its session keys. Changing the sessions keys leads to having two + /// authority ids that map to the same `PeerId` & addresses. + #[test] + fn adding_two_authority_ids_for_the_same_peer_id() { + let mut addr_cache = AddrCache::new(); + + let peer_id = PeerId::random(); + let addr = Multiaddr::empty().with(Protocol::P2p(peer_id.into())); + + let authority_id0 = AuthorityPair::generate().0.public(); + let authority_id1 = AuthorityPair::generate().0.public(); + + addr_cache.insert(authority_id0.clone(), vec![addr.clone()]); + addr_cache.insert(authority_id1.clone(), vec![addr.clone()]); + + assert_eq!(2, addr_cache.num_authority_ids()); + assert_eq!( + &HashSet::from([addr.clone()]), + addr_cache.get_addresses_by_authority_id(&authority_id0).unwrap() + ); + assert_eq!( + &HashSet::from([addr]), + addr_cache.get_addresses_by_authority_id(&authority_id1).unwrap() + ); + } } diff --git a/substrate/client/authority-discovery/src/worker/tests.rs b/substrate/client/authority-discovery/src/worker/tests.rs index 3c1610256f5bc583546debef6950143a0e20101f..130aea71fdfb0bc6a7b79c84c49de236bf154f8a 100644 --- a/substrate/client/authority-discovery/src/worker/tests.rs +++ b/substrate/client/authority-discovery/src/worker/tests.rs @@ -19,6 +19,7 @@ use crate::worker::schema; use std::{ + collections::HashSet, sync::{Arc, Mutex}, task::Poll, }; @@ -469,7 +470,7 @@ fn dont_stop_polling_dht_event_stream_after_bogus_event() { .send(ServicetoWorkerMsg::GetAddressesByAuthorityId(remote_public_key, sender)) .await .expect("Channel has capacity of 1."); - assert_eq!(Some(vec![remote_multiaddr]), addresses.await.unwrap()); + assert_eq!(Some(HashSet::from([remote_multiaddr])), addresses.await.unwrap()); }); } @@ -562,7 +563,7 @@ fn do_not_cache_addresses_without_peer_id() { local_worker.handle_dht_value_found_event(vec![dht_event]).unwrap(); assert_eq!( - Some(&vec![multiaddr_with_peer_id]), + Some(&HashSet::from([multiaddr_with_peer_id])), local_worker.addr_cache.get_addresses_by_authority_id(&remote_public.into()), "Expect worker to only cache `Multiaddr`s with `PeerId`s.", );