diff --git a/Cargo.lock b/Cargo.lock index aea00abb8f036e068fe1f8120056a5da97b72442..c26c4081bbd9d50b13fe5a0ad721a5ae9cce25a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12325,6 +12325,7 @@ dependencies = [ "futures", "futures-timer", "lazy_static", + "parking_lot 0.12.1", "polkadot-node-network-protocol", "polkadot-node-subsystem", "polkadot-node-subsystem-test-helpers", diff --git a/polkadot/node/network/gossip-support/Cargo.toml b/polkadot/node/network/gossip-support/Cargo.toml index c17f39b019de1f6f248236c387742a9033e60df8..8d0edc206d728b16e0448c3339c1719f26d81413 100644 --- a/polkadot/node/network/gossip-support/Cargo.toml +++ b/polkadot/node/network/gossip-support/Cargo.toml @@ -38,5 +38,6 @@ polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" } assert_matches = "1.4.0" async-trait = "0.1.74" +parking_lot = "0.12.1" lazy_static = "1.4.0" quickcheck = "1.0.3" diff --git a/polkadot/node/network/gossip-support/src/lib.rs b/polkadot/node/network/gossip-support/src/lib.rs index e9cb8a4de1c47b5e3f6e285f86ad38b3cf5ac342..4dfdd1f7208f66ec4a787ffe8bc7f72c41575c9d 100644 --- a/polkadot/node/network/gossip-support/src/lib.rs +++ b/polkadot/node/network/gossip-support/src/lib.rs @@ -63,8 +63,12 @@ use metrics::Metrics; const LOG_TARGET: &str = "parachain::gossip-support"; // How much time should we wait to reissue a connection request // since the last authority discovery resolution failure. +#[cfg(not(test))] const BACKOFF_DURATION: Duration = Duration::from_secs(5); +#[cfg(test)] +const BACKOFF_DURATION: Duration = Duration::from_millis(500); + /// Duration after which we consider low connectivity a problem. /// /// Especially at startup low connectivity is expected (authority discovery cache needs to be @@ -271,8 +275,8 @@ where ) .await?; } - // authority_discovery is just a cache so let's try every leaf to detect if there - // are new authorities there. + // authority_discovery is just a cache so let's try every time we try to re-connect + // if new authorities are present. self.update_authority_ids(sender, session_info.discovery_keys).await; } } diff --git a/polkadot/node/network/gossip-support/src/tests.rs b/polkadot/node/network/gossip-support/src/tests.rs index e5ee101c31d857b2dbd540596649ddaf9b826bd5..6817c85f98d87c03b3c252783c464efed88dfdb6 100644 --- a/polkadot/node/network/gossip-support/src/tests.rs +++ b/polkadot/node/network/gossip-support/src/tests.rs @@ -25,13 +25,19 @@ use lazy_static::lazy_static; use quickcheck::quickcheck; use rand::seq::SliceRandom as _; +use parking_lot::Mutex; use sc_network::multiaddr::Protocol; use sp_authority_discovery::AuthorityPair as AuthorityDiscoveryPair; use sp_consensus_babe::{AllowedSlots, BabeEpochConfiguration, Epoch as BabeEpoch}; use sp_core::crypto::Pair as PairT; use sp_keyring::Sr25519Keyring; +use std::sync::Arc; -use polkadot_node_network_protocol::grid_topology::{SessionGridTopology, TopologyPeerInfo}; +use polkadot_node_network_protocol::{ + grid_topology::{SessionGridTopology, TopologyPeerInfo}, + peer_set::ValidationVersion, + ObservedRole, +}; use polkadot_node_subsystem::messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest}; use polkadot_node_subsystem_test_helpers as test_helpers; use polkadot_node_subsystem_util::TimeoutExt as _; @@ -51,7 +57,6 @@ const AUTHORITY_KEYRINGS: &[Sr25519Keyring] = &[ ]; lazy_static! { - static ref MOCK_AUTHORITY_DISCOVERY: MockAuthorityDiscovery = MockAuthorityDiscovery::new(); static ref AUTHORITIES: Vec<AuthorityDiscoveryId> = AUTHORITY_KEYRINGS.iter().map(|k| k.public().into()).collect(); @@ -89,17 +94,14 @@ type VirtualOverseer = test_helpers::TestSubsystemContextHandle<GossipSupportMes #[derive(Debug, Clone)] struct MockAuthorityDiscovery { - addrs: HashMap<AuthorityDiscoveryId, HashSet<Multiaddr>>, - authorities: HashMap<PeerId, HashSet<AuthorityDiscoveryId>>, + addrs: Arc<Mutex<HashMap<AuthorityDiscoveryId, HashSet<Multiaddr>>>>, + authorities: Arc<Mutex<HashMap<PeerId, HashSet<AuthorityDiscoveryId>>>>, } impl MockAuthorityDiscovery { - fn new() -> Self { - let authorities: HashMap<_, _> = PAST_PRESENT_FUTURE_AUTHORITIES - .clone() - .into_iter() - .map(|a| (PeerId::random(), a)) - .collect(); + fn new(authorities: Vec<AuthorityDiscoveryId>) -> Self { + let authorities: HashMap<_, _> = + authorities.clone().into_iter().map(|a| (PeerId::random(), a)).collect(); let addrs = authorities .clone() .into_iter() @@ -109,10 +111,37 @@ impl MockAuthorityDiscovery { }) .collect(); Self { - addrs, - authorities: authorities.into_iter().map(|(p, a)| (p, HashSet::from([a]))).collect(), + addrs: Arc::new(Mutex::new(addrs)), + authorities: Arc::new(Mutex::new( + authorities.into_iter().map(|(p, a)| (p, HashSet::from([a]))).collect(), + )), } } + + fn authorities(&self) -> HashMap<PeerId, HashSet<AuthorityDiscoveryId>> { + self.authorities.lock().clone() + } + + fn add_more_authorties( + &self, + new_known: Vec<AuthorityDiscoveryId>, + ) -> HashMap<PeerId, HashSet<AuthorityDiscoveryId>> { + let authorities: HashMap<_, _> = + new_known.clone().into_iter().map(|a| (PeerId::random(), a)).collect(); + let addrs: HashMap<AuthorityDiscoveryId, HashSet<Multiaddr>> = authorities + .clone() + .into_iter() + .map(|(p, a)| { + let multiaddr = Multiaddr::empty().with(Protocol::P2p(p.into())); + (a, HashSet::from([multiaddr])) + }) + .collect(); + let authorities: HashMap<PeerId, HashSet<AuthorityDiscoveryId>> = + authorities.into_iter().map(|(p, a)| (p, HashSet::from([a]))).collect(); + self.addrs.as_ref().lock().extend(addrs); + self.authorities.as_ref().lock().extend(authorities.clone()); + authorities + } } #[async_trait] @@ -121,19 +150,23 @@ impl AuthorityDiscovery for MockAuthorityDiscovery { &mut self, authority: polkadot_primitives::AuthorityDiscoveryId, ) -> Option<HashSet<sc_network::Multiaddr>> { - self.addrs.get(&authority).cloned() + self.addrs.lock().get(&authority).cloned() } + async fn get_authority_ids_by_peer_id( &mut self, peer_id: polkadot_node_network_protocol::PeerId, ) -> Option<HashSet<polkadot_primitives::AuthorityDiscoveryId>> { - self.authorities.get(&peer_id).cloned() + self.authorities.as_ref().lock().get(&peer_id).cloned() } } -async fn get_multiaddrs(authorities: Vec<AuthorityDiscoveryId>) -> Vec<HashSet<Multiaddr>> { +async fn get_multiaddrs( + authorities: Vec<AuthorityDiscoveryId>, + mock_authority_discovery: MockAuthorityDiscovery, +) -> Vec<HashSet<Multiaddr>> { let mut addrs = Vec::with_capacity(authorities.len()); - let mut discovery = MOCK_AUTHORITY_DISCOVERY.clone(); + let mut discovery = mock_authority_discovery.clone(); for authority in authorities.into_iter() { if let Some(addr) = discovery.get_addresses_by_authority_id(authority).await { addrs.push(addr); @@ -144,9 +177,10 @@ async fn get_multiaddrs(authorities: Vec<AuthorityDiscoveryId>) -> Vec<HashSet<M async fn get_address_map( authorities: Vec<AuthorityDiscoveryId>, + mock_authority_discovery: MockAuthorityDiscovery, ) -> HashMap<AuthorityDiscoveryId, HashSet<Multiaddr>> { let mut addrs = HashMap::with_capacity(authorities.len()); - let mut discovery = MOCK_AUTHORITY_DISCOVERY.clone(); + let mut discovery = mock_authority_discovery.clone(); for authority in authorities.into_iter() { if let Some(addr) = discovery.get_addresses_by_authority_id(authority.clone()).await { addrs.insert(authority, addr); @@ -155,12 +189,10 @@ async fn get_address_map( addrs } -fn make_subsystem() -> GossipSupport<MockAuthorityDiscovery> { - GossipSupport::new( - make_ferdie_keystore(), - MOCK_AUTHORITY_DISCOVERY.clone(), - Metrics::new_dummy(), - ) +fn make_subsystem_with_authority_discovery( + mock: MockAuthorityDiscovery, +) -> GossipSupport<MockAuthorityDiscovery> { + GossipSupport::new(make_ferdie_keystore(), mock, Metrics::new_dummy()) } fn test_harness<T: Future<Output = VirtualOverseer>, AD: AuthorityDiscovery>( @@ -291,59 +323,65 @@ async fn test_neighbors(overseer: &mut VirtualOverseer, expected_session: Sessio #[test] fn issues_a_connection_request_on_new_session() { + let mock_authority_discovery = + MockAuthorityDiscovery::new(PAST_PRESENT_FUTURE_AUTHORITIES.clone()); + let mock_authority_discovery_clone = mock_authority_discovery.clone(); let hash = Hash::repeat_byte(0xAA); - let state = test_harness(make_subsystem(), |mut virtual_overseer| async move { - let overseer = &mut virtual_overseer; - overseer_signal_active_leaves(overseer, hash).await; - assert_matches!( - overseer_recv(overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::SessionIndexForChild(tx), - )) => { - assert_eq!(relay_parent, hash); - tx.send(Ok(1)).unwrap(); - } - ); + let state = test_harness( + make_subsystem_with_authority_discovery(mock_authority_discovery.clone()), + |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + overseer_signal_active_leaves(overseer, hash).await; + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionIndexForChild(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(1)).unwrap(); + } + ); - assert_matches!( - overseer_recv(overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::SessionInfo(s, tx), - )) => { - assert_eq!(relay_parent, hash); - assert_eq!(s, 1); - tx.send(Ok(Some(make_session_info()))).unwrap(); - } - ); + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionInfo(s, tx), + )) => { + assert_eq!(relay_parent, hash); + assert_eq!(s, 1); + tx.send(Ok(Some(make_session_info()))).unwrap(); + } + ); - assert_matches!( - overseer_recv(overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::Authorities(tx), - )) => { - assert_eq!(relay_parent, hash); - tx.send(Ok(AUTHORITIES.clone())).unwrap(); - } - ); + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Authorities(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(AUTHORITIES.clone())).unwrap(); + } + ); - assert_matches!( - overseer_recv(overseer).await, - AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ConnectToResolvedValidators { - validator_addrs, - peer_set, - }) => { - assert_eq!(validator_addrs, get_multiaddrs(AUTHORITIES_WITHOUT_US.clone()).await); - assert_eq!(peer_set, PeerSet::Validation); - } - ); + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ConnectToResolvedValidators { + validator_addrs, + peer_set, + }) => { + assert_eq!(validator_addrs, get_multiaddrs(AUTHORITIES_WITHOUT_US.clone(), mock_authority_discovery_clone).await); + assert_eq!(peer_set, PeerSet::Validation); + } + ); - test_neighbors(overseer, 1).await; + test_neighbors(overseer, 1).await; - virtual_overseer - }); + virtual_overseer + }, + ); assert_eq!(state.last_session_index, Some(1)); assert!(state.last_failure.is_none()); @@ -363,6 +401,7 @@ fn issues_a_connection_request_on_new_session() { tx.send(Ok(1)).unwrap(); } ); + virtual_overseer }); @@ -414,7 +453,7 @@ fn issues_a_connection_request_on_new_session() { validator_addrs, peer_set, }) => { - assert_eq!(validator_addrs, get_multiaddrs(AUTHORITIES_WITHOUT_US.clone()).await); + assert_eq!(validator_addrs, get_multiaddrs(AUTHORITIES_WITHOUT_US.clone(), mock_authority_discovery.clone()).await); assert_eq!(peer_set, PeerSet::Validation); } ); @@ -430,125 +469,405 @@ fn issues_a_connection_request_on_new_session() { #[test] fn issues_connection_request_to_past_present_future() { let hash = Hash::repeat_byte(0xAA); - test_harness(make_subsystem(), |mut virtual_overseer| async move { - let overseer = &mut virtual_overseer; - overseer_signal_active_leaves(overseer, hash).await; - assert_matches!( - overseer_recv(overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::SessionIndexForChild(tx), - )) => { - assert_eq!(relay_parent, hash); - tx.send(Ok(1)).unwrap(); + let mock_authority_discovery = + MockAuthorityDiscovery::new(PAST_PRESENT_FUTURE_AUTHORITIES.clone()); + test_harness( + make_subsystem_with_authority_discovery(mock_authority_discovery.clone()), + |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + overseer_signal_active_leaves(overseer, hash).await; + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionIndexForChild(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(1)).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionInfo(s, tx), + )) => { + assert_eq!(relay_parent, hash); + assert_eq!(s, 1); + tx.send(Ok(Some(make_session_info()))).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Authorities(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(PAST_PRESENT_FUTURE_AUTHORITIES.clone())).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ConnectToResolvedValidators { + validator_addrs, + peer_set, + }) => { + let all_without_ferdie: Vec<_> = PAST_PRESENT_FUTURE_AUTHORITIES + .iter() + .cloned() + .filter(|p| p != &Sr25519Keyring::Ferdie.public().into()) + .collect(); + + let addrs = get_multiaddrs(all_without_ferdie, mock_authority_discovery.clone()).await; + + assert_eq!(validator_addrs, addrs); + assert_eq!(peer_set, PeerSet::Validation); + } + ); + + // Ensure neighbors are unaffected + test_neighbors(overseer, 1).await; + + virtual_overseer + }, + ); +} + +// Test we notify peer about learning of the authority ID after session boundary, when we couldn't +// connect to more than 1/3 of the authorities. +#[test] +fn issues_update_authorities_after_session() { + let hash = Hash::repeat_byte(0xAA); + + let mut authorities = PAST_PRESENT_FUTURE_AUTHORITIES.clone(); + let unknown_at_session = authorities.split_off(authorities.len() / 3 - 1); + let mut authority_discovery_mock = MockAuthorityDiscovery::new(authorities); + + test_harness( + make_subsystem_with_authority_discovery(authority_discovery_mock.clone()), + |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + // 1. Initialize with the first leaf in the session. + overseer_signal_active_leaves(overseer, hash).await; + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionIndexForChild(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(1)).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionInfo(s, tx), + )) => { + assert_eq!(relay_parent, hash); + assert_eq!(s, 1); + let mut session_info = make_session_info(); + session_info.discovery_keys = PAST_PRESENT_FUTURE_AUTHORITIES.clone(); + tx.send(Ok(Some(session_info))).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Authorities(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(PAST_PRESENT_FUTURE_AUTHORITIES.clone())).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ConnectToResolvedValidators { + validator_addrs, + peer_set, + }) => { + let all_without_ferdie: Vec<_> = PAST_PRESENT_FUTURE_AUTHORITIES + .iter() + .cloned() + .filter(|p| p != &Sr25519Keyring::Ferdie.public().into()) + .collect(); + + let addrs = get_multiaddrs(all_without_ferdie, authority_discovery_mock.clone()).await; + + assert_eq!(validator_addrs, addrs); + assert_eq!(peer_set, PeerSet::Validation); + } + ); + + // Ensure neighbors are unaffected + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _, + RuntimeApiRequest::CurrentBabeEpoch(tx), + )) => { + let _ = tx.send(Ok(BabeEpoch { + epoch_index: 2 as _, + start_slot: 0.into(), + duration: 200, + authorities: vec![(Sr25519Keyring::Alice.public().into(), 1)], + randomness: [0u8; 32], + config: BabeEpochConfiguration { + c: (1, 4), + allowed_slots: AllowedSlots::PrimarySlots, + }, + })).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeRx(NetworkBridgeRxMessage::NewGossipTopology { + session: _, + local_index: _, + canonical_shuffling: _, + shuffled_indices: _, + }) => { + + } + ); + + // 2. Connect all authorities that are known so far. + let known_authorities = authority_discovery_mock.authorities(); + for (peer_id, _id) in known_authorities.iter() { + let msg = + GossipSupportMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerConnected( + *peer_id, + ObservedRole::Authority, + ValidationVersion::V3.into(), + None, + )); + overseer.send(FromOrchestra::Communication { msg }).await } - ); - assert_matches!( - overseer_recv(overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::SessionInfo(s, tx), - )) => { - assert_eq!(relay_parent, hash); - assert_eq!(s, 1); - tx.send(Ok(Some(make_session_info()))).unwrap(); + Delay::new(BACKOFF_DURATION).await; + // 3. Send a new leaf after BACKOFF_DURATION and check UpdateAuthority is emitted for + // all known connected peers. + let hash = Hash::repeat_byte(0xBB); + overseer_signal_active_leaves(overseer, hash).await; + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionIndexForChild(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(1)).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionInfo(s, tx), + )) => { + assert_eq!(relay_parent, hash); + assert_eq!(s, 1); + let mut session_info = make_session_info(); + session_info.discovery_keys = PAST_PRESENT_FUTURE_AUTHORITIES.clone(); + tx.send(Ok(Some(session_info))).unwrap(); + + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Authorities(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(PAST_PRESENT_FUTURE_AUTHORITIES.clone())).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ConnectToResolvedValidators { + validator_addrs: _, + peer_set: _, + }) => { + } + ); + + for _ in 0..known_authorities.len() { + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeRx(NetworkBridgeRxMessage::UpdatedAuthorityIds { + peer_id, + authority_ids, + }) => { + assert_eq!(authority_discovery_mock.get_authority_ids_by_peer_id(peer_id).await.unwrap_or_default(), authority_ids); + } + ); } - ); - assert_matches!( - overseer_recv(overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::Authorities(tx), - )) => { - assert_eq!(relay_parent, hash); - tx.send(Ok(PAST_PRESENT_FUTURE_AUTHORITIES.clone())).unwrap(); + assert!(overseer.recv().timeout(TIMEOUT).await.is_none()); + // 4. Connect more authorities except one + let newly_added = authority_discovery_mock.add_more_authorties(unknown_at_session); + let mut newly_added_iter = newly_added.iter(); + let unconnected_at_last_retry = newly_added_iter + .next() + .map(|(peer_id, authority_id)| (*peer_id, authority_id.clone())) + .unwrap(); + for (peer_id, _) in newly_added_iter { + let msg = + GossipSupportMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerConnected( + *peer_id, + ObservedRole::Authority, + ValidationVersion::V3.into(), + None, + )); + overseer.send(FromOrchestra::Communication { msg }).await } - ); - assert_matches!( - overseer_recv(overseer).await, - AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ConnectToResolvedValidators { - validator_addrs, - peer_set, - }) => { - let all_without_ferdie: Vec<_> = PAST_PRESENT_FUTURE_AUTHORITIES - .iter() - .cloned() - .filter(|p| p != &Sr25519Keyring::Ferdie.public().into()) - .collect(); + // 5. Send a new leaf and check UpdateAuthority is emitted only for the newly connected + // peers. + let hash = Hash::repeat_byte(0xCC); + Delay::new(BACKOFF_DURATION).await; + overseer_signal_active_leaves(overseer, hash).await; - let addrs = get_multiaddrs(all_without_ferdie).await; + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionIndexForChild(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(1)).unwrap(); + } + ); - assert_eq!(validator_addrs, addrs); - assert_eq!(peer_set, PeerSet::Validation); - } - ); + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionInfo(s, tx), + )) => { + assert_eq!(relay_parent, hash); + assert_eq!(s, 1); + let mut session_info = make_session_info(); + session_info.discovery_keys = PAST_PRESENT_FUTURE_AUTHORITIES.clone(); + tx.send(Ok(Some(session_info))).unwrap(); + } + ); - // Ensure neighbors are unaffected - test_neighbors(overseer, 1).await; + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Authorities(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(PAST_PRESENT_FUTURE_AUTHORITIES.clone())).unwrap(); + } + ); - virtual_overseer - }); + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ConnectToResolvedValidators { + validator_addrs: _, + peer_set: _, + }) => { + } + ); + + for _ in 1..newly_added.len() { + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeRx(NetworkBridgeRxMessage::UpdatedAuthorityIds { + peer_id, + authority_ids, + }) => { + assert_ne!(peer_id, unconnected_at_last_retry.0); + assert_eq!(newly_added.get(&peer_id).cloned().unwrap_or_default(), authority_ids); + } + ); + } + + assert!(overseer.recv().timeout(TIMEOUT).await.is_none()); + virtual_overseer + }, + ); } #[test] fn disconnect_when_not_in_past_present_future() { sp_tracing::try_init_simple(); + let mock_authority_discovery = + MockAuthorityDiscovery::new(PAST_PRESENT_FUTURE_AUTHORITIES.clone()); let hash = Hash::repeat_byte(0xAA); - test_harness(make_subsystem(), |mut virtual_overseer| async move { - let overseer = &mut virtual_overseer; - overseer_signal_active_leaves(overseer, hash).await; - assert_matches!( - overseer_recv(overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::SessionIndexForChild(tx), - )) => { - assert_eq!(relay_parent, hash); - tx.send(Ok(1)).unwrap(); - } - ); + test_harness( + make_subsystem_with_authority_discovery(mock_authority_discovery.clone()), + |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + overseer_signal_active_leaves(overseer, hash).await; + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionIndexForChild(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(1)).unwrap(); + } + ); - assert_matches!( - overseer_recv(overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::SessionInfo(s, tx), - )) => { - assert_eq!(relay_parent, hash); - assert_eq!(s, 1); - let mut heute_leider_nicht = make_session_info(); - heute_leider_nicht.discovery_keys = AUTHORITIES_WITHOUT_US.clone(); - tx.send(Ok(Some(heute_leider_nicht))).unwrap(); - } - ); + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionInfo(s, tx), + )) => { + assert_eq!(relay_parent, hash); + assert_eq!(s, 1); + let mut heute_leider_nicht = make_session_info(); + heute_leider_nicht.discovery_keys = AUTHORITIES_WITHOUT_US.clone(); + tx.send(Ok(Some(heute_leider_nicht))).unwrap(); + } + ); - assert_matches!( - overseer_recv(overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::Authorities(tx), - )) => { - assert_eq!(relay_parent, hash); - tx.send(Ok(AUTHORITIES_WITHOUT_US.clone())).unwrap(); - } - ); + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Authorities(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(AUTHORITIES_WITHOUT_US.clone())).unwrap(); + } + ); - assert_matches!( - overseer_recv(overseer).await, - AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ConnectToResolvedValidators { - validator_addrs, - peer_set, - }) => { - assert!(validator_addrs.is_empty()); - assert_eq!(peer_set, PeerSet::Validation); - } - ); + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ConnectToResolvedValidators { + validator_addrs, + peer_set, + }) => { + assert!(validator_addrs.is_empty()); + assert_eq!(peer_set, PeerSet::Validation); + } + ); - virtual_overseer - }); + virtual_overseer + }, + ); } #[test] @@ -579,13 +898,15 @@ fn test_log_output() { #[test] fn issues_a_connection_request_when_last_request_was_mostly_unresolved() { let hash = Hash::repeat_byte(0xAA); - let mut state = make_subsystem(); + let mock_authority_discovery = + MockAuthorityDiscovery::new(PAST_PRESENT_FUTURE_AUTHORITIES.clone()); + let state = make_subsystem_with_authority_discovery(mock_authority_discovery.clone()); // There will be two lookup failures: let alice = Sr25519Keyring::Alice.public().into(); let bob = Sr25519Keyring::Bob.public().into(); - let alice_addr = state.authority_discovery.addrs.remove(&alice); - state.authority_discovery.addrs.remove(&bob); - + let alice_addr = state.authority_discovery.addrs.lock().remove(&alice); + state.authority_discovery.addrs.lock().remove(&bob); + let mock_authority_discovery_clone = mock_authority_discovery.clone(); let mut state = { let alice = alice.clone(); let bob = bob.clone(); @@ -633,7 +954,7 @@ fn issues_a_connection_request_when_last_request_was_mostly_unresolved() { validator_addrs, peer_set, }) => { - let mut expected = get_address_map(AUTHORITIES_WITHOUT_US.clone()).await; + let mut expected = get_address_map(AUTHORITIES_WITHOUT_US.clone(), mock_authority_discovery_clone.clone()).await; expected.remove(&alice); expected.remove(&bob); let expected: HashSet<Multiaddr> = expected.into_values().flat_map(|v| v.into_iter()).collect(); @@ -652,7 +973,7 @@ fn issues_a_connection_request_when_last_request_was_mostly_unresolved() { assert!(state.last_failure.is_some()); state.last_failure = state.last_failure.and_then(|i| i.checked_sub(BACKOFF_DURATION)); // One error less: - state.authority_discovery.addrs.insert(alice, alice_addr.unwrap()); + state.authority_discovery.addrs.lock().insert(alice, alice_addr.unwrap()); let hash = Hash::repeat_byte(0xBB); let state = test_harness(state, |mut virtual_overseer| async move { @@ -698,7 +1019,7 @@ fn issues_a_connection_request_when_last_request_was_mostly_unresolved() { validator_addrs, peer_set, }) => { - let mut expected = get_address_map(AUTHORITIES_WITHOUT_US.clone()).await; + let mut expected = get_address_map(AUTHORITIES_WITHOUT_US.clone(), mock_authority_discovery.clone()).await; expected.remove(&bob); let expected: HashSet<Multiaddr> = expected.into_values().flat_map(|v| v.into_iter()).collect(); assert_eq!(validator_addrs.into_iter().flat_map(|v| v.into_iter()).collect::<HashSet<_>>(), expected);