From a6952c74693be7ff9486f494551d877b6226a3ac Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe <49718502+alexggh@users.noreply.github.com> Date: Thu, 25 Jan 2024 12:58:37 +0200 Subject: [PATCH] approval-distribution: Update topology if authorities are discovered later (#2981) Fixes: https://github.com/paritytech/polkadot-sdk/issues/2138. Especially on restart AuthorithyDiscovery cache is not populated so we create an invalid topology and messages won't be routed correctly for the entire session. This PR proposes to try to fix this by updating the topology as soon as we now the Authority/PeerId mapping, that should impact the situation dramatically. [This issue was hit yesterday](https://grafana.teleport.parity.io/goto/o9q2625Sg?orgId=1), on Westend and resulted in stalling the finality. # TODO - [x] Unit tests - [x] Test impact on versi --------- Signed-off-by: Alexandru Gheorghe <alexandru.gheorghe@parity.io> --- .../network/approval-distribution/src/lib.rs | 57 ++- .../approval-distribution/src/tests.rs | 383 ++++++++++++++++-- .../network/bitfield-distribution/src/lib.rs | 7 +- .../node/network/gossip-support/src/lib.rs | 5 +- .../network/protocol/src/grid_topology.rs | 59 ++- .../src/legacy_v1/mod.rs | 4 +- 6 files changed, 473 insertions(+), 42 deletions(-) diff --git a/polkadot/node/network/approval-distribution/src/lib.rs b/polkadot/node/network/approval-distribution/src/lib.rs index 768af6f70e7..c5a8b0a6e9e 100644 --- a/polkadot/node/network/approval-distribution/src/lib.rs +++ b/polkadot/node/network/approval-distribution/src/lib.rs @@ -667,9 +667,12 @@ impl State { rng: &mut (impl CryptoRng + Rng), ) { match event { - NetworkBridgeEvent::PeerConnected(peer_id, role, version, _) => { + NetworkBridgeEvent::PeerConnected(peer_id, role, version, authority_ids) => { + gum::trace!(target: LOG_TARGET, ?peer_id, ?role, ?authority_ids, "Peer connected"); + if let Some(authority_ids) = authority_ids { + self.topologies.update_authority_ids(peer_id, &authority_ids); + } // insert a blank view if none already present - gum::trace!(target: LOG_TARGET, ?peer_id, ?role, "Peer connected"); self.peer_views .entry(peer_id) .or_insert(PeerEntry { view: Default::default(), version }); @@ -716,8 +719,41 @@ impl State { NetworkBridgeEvent::PeerMessage(peer_id, message) => { self.process_incoming_peer_message(ctx, metrics, peer_id, message, rng).await; }, - NetworkBridgeEvent::UpdatedAuthorityIds { .. } => { - // The approval-distribution subsystem doesn't deal with `AuthorityDiscoveryId`s. + NetworkBridgeEvent::UpdatedAuthorityIds(peer_id, authority_ids) => { + gum::debug!(target: LOG_TARGET, ?peer_id, ?authority_ids, "Update Authority Ids"); + // If we learn about a new PeerId for an authority ids we need to try to route the + // messages that should have sent to that validator according to the topology. + if self.topologies.update_authority_ids(peer_id, &authority_ids) { + if let Some(PeerEntry { view, version }) = self.peer_views.get(&peer_id) { + let intersection = self + .blocks_by_number + .iter() + .filter(|(block_number, _)| *block_number > &view.finalized_number) + .flat_map(|(_, hashes)| { + hashes.iter().filter(|hash| { + self.blocks + .get(&hash) + .map(|block| block.known_by.get(&peer_id).is_some()) + .unwrap_or_default() + }) + }); + let view_intersection = + View::new(intersection.cloned(), view.finalized_number); + Self::unify_with_peer( + ctx.sender(), + metrics, + &mut self.blocks, + &self.topologies, + self.peer_views.len(), + peer_id, + *version, + view_intersection, + rng, + true, + ) + .await; + } + } }, } } @@ -789,6 +825,7 @@ impl State { *version, view_intersection, rng, + false, ) .await; } @@ -1101,6 +1138,7 @@ impl State { protocol_version, view, rng, + false, ) .await; } @@ -1838,6 +1876,7 @@ impl State { protocol_version: ProtocolVersion, view: View, rng: &mut (impl CryptoRng + Rng), + retry_known_blocks: bool, ) { metrics.on_unify_with_peer(); let _timer = metrics.time_unify_with_peer(); @@ -1856,10 +1895,12 @@ impl State { _ => break, }; - // Any peer which is in the `known_by` set has already been - // sent all messages it's meant to get for that block and all - // in-scope prior blocks. - if entry.known_by.contains_key(&peer_id) { + // Any peer which is in the `known_by` see and we know its peer_id authorithy id + // mapping has already been sent all messages it's meant to get for that block and + // all in-scope prior blocks. In case, we just learnt about its peer_id + // authorithy-id mapping we have to retry sending the messages that should be sent + // to it for all un-finalized blocks. + if entry.known_by.contains_key(&peer_id) && !retry_known_blocks { break } diff --git a/polkadot/node/network/approval-distribution/src/tests.rs b/polkadot/node/network/approval-distribution/src/tests.rs index 7d933e2047f..11d82931651 100644 --- a/polkadot/node/network/approval-distribution/src/tests.rs +++ b/polkadot/node/network/approval-distribution/src/tests.rs @@ -130,7 +130,7 @@ fn make_peers_and_authority_ids(n: usize) -> Vec<(PeerId, AuthorityDiscoveryId)> fn make_gossip_topology( session: SessionIndex, - all_peers: &[(PeerId, AuthorityDiscoveryId)], + all_peers: &[(Option<PeerId>, AuthorityDiscoveryId)], neighbors_x: &[usize], neighbors_y: &[usize], local_index: usize, @@ -153,7 +153,7 @@ fn make_gossip_topology( assert!(all_peers.len() >= grid_size); let peer_info = |i: usize| TopologyPeerInfo { - peer_ids: vec![all_peers[i].0], + peer_ids: all_peers[i].0.into_iter().collect_vec(), validator_index: ValidatorIndex::from(i as u32), discovery_id: all_peers[i].1.clone(), }; @@ -396,7 +396,15 @@ fn try_import_the_same_assignment() { // Set up a gossip topology, where a, b, c and d are topology neighboors to the node under // testing. - setup_gossip_topology(overseer, make_gossip_topology(1, &peers, &[0, 1], &[2, 4], 3)).await; + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers_with_optional_peer_id, &[0, 1], &[2, 4], 3), + ) + .await; // new block `hash_a` with 1 candidates let meta = BlockApprovalMeta { @@ -485,7 +493,15 @@ fn try_import_the_same_assignment_v2() { // Set up a gossip topology, where a, b, c and d are topology neighboors to the node under // testing. - setup_gossip_topology(overseer, make_gossip_topology(1, &peers, &[0, 1], &[2, 4], 3)).await; + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers_with_optional_peer_id, &[0, 1], &[2, 4], 3), + ) + .await; // new block `hash_a` with 1 candidates let meta = BlockApprovalMeta { @@ -724,8 +740,16 @@ fn peer_sending_us_the_same_we_just_sent_them_is_ok() { let peer = &peer_a; setup_peer_with_view(overseer, peer, view![], ValidationVersion::V1).await; + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Setup a topology where peer_a is neigboor to current node. - setup_gossip_topology(overseer, make_gossip_topology(1, &peers, &[0], &[2], 1)).await; + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers_with_optional_peer_id, &[0], &[2], 1), + ) + .await; // new block `hash` with 1 candidates let meta = BlockApprovalMeta { @@ -822,8 +846,16 @@ fn import_approval_happy_path_v1_v2_peers() { let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); overseer_send(overseer, msg).await; + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Set up a gossip topology, where a, b, and c are topology neighboors to the node. - setup_gossip_topology(overseer, make_gossip_topology(1, &peers, &[0, 1], &[2, 4], 3)).await; + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers_with_optional_peer_id, &[0, 1], &[2, 4], 3), + ) + .await; // import an assignment related to `hash` locally let validator_index = ValidatorIndex(0); @@ -936,8 +968,16 @@ fn import_approval_happy_path_v2() { let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); overseer_send(overseer, msg).await; + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Set up a gossip topology, where a, b, and c are topology neighboors to the node. - setup_gossip_topology(overseer, make_gossip_topology(1, &peers, &[0, 1], &[2, 4], 3)).await; + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers_with_optional_peer_id, &[0, 1], &[2, 4], 3), + ) + .await; // import an assignment related to `hash` locally let validator_index = ValidatorIndex(0); @@ -1039,8 +1079,16 @@ fn multiple_assignments_covered_with_one_approval_vote() { let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); overseer_send(overseer, msg).await; + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Set up a gossip topology, where a, b, and c, d are topology neighboors to the node. - setup_gossip_topology(overseer, make_gossip_topology(1, &peers, &[0, 1], &[2, 4], 3)).await; + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers_with_optional_peer_id, &[0, 1], &[2, 4], 3), + ) + .await; // import an assignment related to `hash` locally let validator_index = ValidatorIndex(2); // peer_c is the originator @@ -1221,8 +1269,16 @@ fn unify_with_peer_multiple_assignments_covered_with_one_approval_vote() { let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); overseer_send(overseer, msg).await; + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Set up a gossip topology, where a, b, and c, d are topology neighboors to the node. - setup_gossip_topology(overseer, make_gossip_topology(1, &peers, &[0, 1], &[2, 4], 3)).await; + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers_with_optional_peer_id, &[0, 1], &[2, 4], 3), + ) + .await; // import an assignment related to `hash` locally let validator_index = ValidatorIndex(2); // peer_c is the originator @@ -1571,8 +1627,16 @@ fn update_peer_view() { let msg = ApprovalDistributionMessage::NewBlocks(vec![meta_a, meta_b, meta_c]); overseer_send(overseer, msg).await; + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Setup a topology where peer_a is neigboor to current node. - setup_gossip_topology(overseer, make_gossip_topology(1, &peers, &[0], &[2], 1)).await; + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers_with_optional_peer_id, &[0], &[2], 1), + ) + .await; let cert_a = fake_assignment_cert(hash_a, ValidatorIndex(0)); let cert_b = fake_assignment_cert(hash_b, ValidatorIndex(0)); @@ -1694,6 +1758,183 @@ fn update_peer_view() { assert!(state.blocks.get(&hash_c).unwrap().known_by.get(peer).is_none()); } +// Tests that updating the known peer_id for a given authorithy updates the topology +// and sends the required messages +#[test] +fn update_peer_authority_id() { + let parent_hash = Hash::repeat_byte(0xFF); + let hash_a = Hash::repeat_byte(0xAA); + let hash_b = Hash::repeat_byte(0xBB); + let hash_c = Hash::repeat_byte(0xCC); + let peers = make_peers_and_authority_ids(8); + let neighbour_x_index = 0; + let neighbour_y_index = 2; + let local_index = 1; + // X neighbour, we simulate that PeerId is not known in the beginining. + let neighbour_x = peers.get(neighbour_x_index).unwrap().0; + // Y neighbour, we simulate that PeerId is not known in the beginining. + let neighbour_y = peers.get(neighbour_y_index).unwrap().0; + + let _state = test_harness(State::default(), |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + // new block `hash_a` with 1 candidates + let meta_a = BlockApprovalMeta { + hash: hash_a, + parent_hash, + number: 1, + candidates: vec![Default::default(); 1], + slot: 1.into(), + session: 1, + }; + let meta_b = BlockApprovalMeta { + hash: hash_b, + parent_hash: hash_a, + number: 2, + candidates: vec![Default::default(); 1], + slot: 1.into(), + session: 1, + }; + let meta_c = BlockApprovalMeta { + hash: hash_c, + parent_hash: hash_b, + number: 3, + candidates: vec![Default::default(); 1], + slot: 1.into(), + session: 1, + }; + + let msg = ApprovalDistributionMessage::NewBlocks(vec![meta_a, meta_b, meta_c]); + overseer_send(overseer, msg).await; + + let peers_with_optional_peer_id = peers + .iter() + .enumerate() + .map(|(index, (peer_id, authority))| { + (if index == 0 { None } else { Some(*peer_id) }, authority.clone()) + }) + .collect_vec(); + + // Setup a topology where peer_a is neigboor to current node. + setup_gossip_topology( + overseer, + make_gossip_topology( + 1, + &peers_with_optional_peer_id, + &[neighbour_x_index], + &[neighbour_y_index], + local_index, + ), + ) + .await; + + let cert_a = fake_assignment_cert(hash_a, ValidatorIndex(local_index as u32)); + let cert_b = fake_assignment_cert(hash_b, ValidatorIndex(local_index as u32)); + + overseer_send( + overseer, + ApprovalDistributionMessage::DistributeAssignment(cert_a.into(), 0.into()), + ) + .await; + + overseer_send( + overseer, + ApprovalDistributionMessage::DistributeAssignment(cert_b.into(), 0.into()), + ) + .await; + + // connect a peer + setup_peer_with_view(overseer, &neighbour_x, view![hash_a], ValidationVersion::V1).await; + setup_peer_with_view(overseer, &neighbour_y, view![hash_a], ValidationVersion::V1).await; + + setup_peer_with_view(overseer, &neighbour_x, view![hash_b], ValidationVersion::V1).await; + setup_peer_with_view(overseer, &neighbour_y, view![hash_b], ValidationVersion::V1).await; + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage( + peers, + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Assignments(assignments) + )) + )) => { + assert_eq!(peers.len(), 1); + assert_eq!(assignments.len(), 1); + assert_eq!(peers.get(0), Some(&neighbour_y)); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage( + peers, + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Assignments(assignments) + )) + )) => { + assert_eq!(peers.len(), 1); + assert_eq!(assignments.len(), 1); + assert_eq!(peers.get(0), Some(&neighbour_y)); + } + ); + + overseer_send( + overseer, + ApprovalDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::UpdatedAuthorityIds( + peers[neighbour_x_index].0, + [peers[neighbour_x_index].1.clone()].into_iter().collect(), + ), + ), + ) + .await; + + // we should send relevant assignments to the peer, after we found it's peer id. + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage( + peers, + Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Assignments(assignments) + )) + )) => { + gum::info!(target: LOG_TARGET, ?peers, ?assignments); + assert_eq!(peers.len(), 1); + assert_eq!(assignments.len(), 2); + assert_eq!(assignments.get(0).unwrap().0.block_hash, hash_a); + assert_eq!(assignments.get(1).unwrap().0.block_hash, hash_b); + assert_eq!(peers.get(0), Some(&neighbour_x)); + } + ); + + overseer_send( + overseer, + ApprovalDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::UpdatedAuthorityIds( + peers[neighbour_y_index].0, + [peers[neighbour_y_index].1.clone()].into_iter().collect(), + ), + ), + ) + .await; + overseer_send( + overseer, + ApprovalDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::UpdatedAuthorityIds( + peers[neighbour_x_index].0, + [peers[neighbour_x_index].1.clone()].into_iter().collect(), + ), + ), + ) + .await; + assert!( + overseer.recv().timeout(TIMEOUT).await.is_none(), + "no message should be sent peers are already known" + ); + + virtual_overseer + }); +} + /// E.g. if someone copies the keys... #[test] fn import_remotely_then_locally() { @@ -1808,8 +2049,16 @@ fn sends_assignments_even_when_state_is_approved() { let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); overseer_send(overseer, msg).await; + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Setup a topology where peer_a is neigboor to current node. - setup_gossip_topology(overseer, make_gossip_topology(1, &peers, &[0], &[2], 1)).await; + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers_with_optional_peer_id, &[0], &[2], 1), + ) + .await; let validator_index = ValidatorIndex(0); let candidate_index = 0u32; @@ -1900,8 +2149,16 @@ fn sends_assignments_even_when_state_is_approved_v2() { let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); overseer_send(overseer, msg).await; + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Setup a topology where peer_a is neigboor to current node. - setup_gossip_topology(overseer, make_gossip_topology(1, &peers, &[0], &[2], 1)).await; + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers_with_optional_peer_id, &[0], &[2], 1), + ) + .await; let validator_index = ValidatorIndex(0); let cores = vec![0, 1, 2, 3]; @@ -2080,12 +2337,17 @@ fn propagates_locally_generated_assignment_to_both_dimensions() { setup_peer_with_view(overseer, peer, view![hash], ValidationVersion::V1).await; } + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); + // Set up a gossip topology. setup_gossip_topology( overseer, make_gossip_topology( 1, - &peers, + &peers_with_optional_peer_id, &[0, 10, 20, 30, 40, 60, 70, 80], &[50, 51, 52, 53, 54, 55, 56, 57], 1, @@ -2197,10 +2459,21 @@ fn propagates_assignments_along_unshared_dimension() { setup_peer_with_view(overseer, peer, view![hash], ValidationVersion::V1).await; } + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); + // Set up a gossip topology. setup_gossip_topology( overseer, - make_gossip_topology(1, &peers, &[0, 10, 20, 30], &[50, 51, 52, 53], 1), + make_gossip_topology( + 1, + &peers_with_optional_peer_id, + &[0, 10, 20, 30], + &[50, 51, 52, 53], + 1, + ), ) .await; @@ -2339,13 +2612,16 @@ fn propagates_to_required_after_connect() { setup_peer_with_view(overseer, peer, view![hash], ValidationVersion::V1).await; } } - + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Set up a gossip topology. setup_gossip_topology( overseer, make_gossip_topology( 1, - &peers, + &peers_with_optional_peer_id, &[0, 10, 20, 30, 40, 60, 70, 80], &[50, 51, 52, 53, 54, 55, 56, 57], 1, @@ -2533,11 +2809,20 @@ fn sends_to_more_peers_after_getting_topology() { let approvals = vec![approval.clone()]; let expected_indices = vec![0, 10, 20, 30, 50, 51, 52, 53]; - + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Set up a gossip topology. setup_gossip_topology( overseer, - make_gossip_topology(1, &peers, &[0, 10, 20, 30], &[50, 51, 52, 53], 1), + make_gossip_topology( + 1, + &peers_with_optional_peer_id, + &[0, 10, 20, 30], + &[50, 51, 52, 53], + 1, + ), ) .await; @@ -2636,11 +2921,20 @@ fn originator_aggression_l1() { validator: validator_index, signature: dummy_signature(), }; - + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Set up a gossip topology. setup_gossip_topology( overseer, - make_gossip_topology(1, &peers, &[0, 10, 20, 30], &[50, 51, 52, 53], 1), + make_gossip_topology( + 1, + &peers_with_optional_peer_id, + &[0, 10, 20, 30], + &[50, 51, 52, 53], + 1, + ), ) .await; @@ -2795,11 +3089,20 @@ fn non_originator_aggression_l1() { // import an assignment and approval locally. let cert = fake_assignment_cert(hash, validator_index); - + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Set up a gossip topology. setup_gossip_topology( overseer, - make_gossip_topology(1, &peers, &[0, 10, 20, 30], &[50, 51, 52, 53], 1), + make_gossip_topology( + 1, + &peers_with_optional_peer_id, + &[0, 10, 20, 30], + &[50, 51, 52, 53], + 1, + ), ) .await; @@ -2900,11 +3203,20 @@ fn non_originator_aggression_l2() { // import an assignment and approval locally. let cert = fake_assignment_cert(hash, validator_index); - + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Set up a gossip topology. setup_gossip_topology( overseer, - make_gossip_topology(1, &peers, &[0, 10, 20, 30], &[50, 51, 52, 53], 1), + make_gossip_topology( + 1, + &peers_with_optional_peer_id, + &[0, 10, 20, 30], + &[50, 51, 52, 53], + 1, + ), ) .await; @@ -3046,11 +3358,20 @@ fn resends_messages_periodically() { for (peer, _) in &peers { setup_peer_with_view(overseer, peer, view![hash], ValidationVersion::V1).await; } - + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); // Set up a gossip topology. setup_gossip_topology( overseer, - make_gossip_topology(1, &peers, &[0, 10, 20, 30], &[50, 51, 52, 53], 1), + make_gossip_topology( + 1, + &peers_with_optional_peer_id, + &[0, 10, 20, 30], + &[50, 51, 52, 53], + 1, + ), ) .await; @@ -3190,7 +3511,15 @@ fn import_versioned_approval() { // Set up a gossip topology, where a, b, c and d are topology neighboors to the node under // testing. - setup_gossip_topology(overseer, make_gossip_topology(1, &peers, &[0, 1], &[2, 4], 3)).await; + let peers_with_optional_peer_id = peers + .iter() + .map(|(peer_id, authority)| (Some(*peer_id), authority.clone())) + .collect_vec(); + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers_with_optional_peer_id, &[0, 1], &[2, 4], 3), + ) + .await; // new block `hash_a` with 1 candidates let meta = BlockApprovalMeta { diff --git a/polkadot/node/network/bitfield-distribution/src/lib.rs b/polkadot/node/network/bitfield-distribution/src/lib.rs index 76baf499cad..029401e0bd5 100644 --- a/polkadot/node/network/bitfield-distribution/src/lib.rs +++ b/polkadot/node/network/bitfield-distribution/src/lib.rs @@ -800,8 +800,11 @@ async fn handle_network_msg<Context>( }, NetworkBridgeEvent::PeerMessage(remote, message) => process_incoming_peer_message(ctx, state, metrics, remote, message, rng).await, - NetworkBridgeEvent::UpdatedAuthorityIds { .. } => { - // The bitfield-distribution subsystem doesn't deal with `AuthorityDiscoveryId`s. + NetworkBridgeEvent::UpdatedAuthorityIds(peer_id, authority_ids) => { + state + .topologies + .get_current_topology_mut() + .update_authority_ids(peer_id, &authority_ids); }, } } diff --git a/polkadot/node/network/gossip-support/src/lib.rs b/polkadot/node/network/gossip-support/src/lib.rs index a6f0fdf75bb..e9cb8a4de1c 100644 --- a/polkadot/node/network/gossip-support/src/lib.rs +++ b/polkadot/node/network/gossip-support/src/lib.rs @@ -270,9 +270,10 @@ where session_index, ) .await?; - - self.update_authority_ids(sender, session_info.discovery_keys).await; } + // authority_discovery is just a cache so let's try every leaf to detect if there + // are new authorities there. + self.update_authority_ids(sender, session_info.discovery_keys).await; } } Ok(()) diff --git a/polkadot/node/network/protocol/src/grid_topology.rs b/polkadot/node/network/protocol/src/grid_topology.rs index 8bd9adbc17c..3c4372a27a2 100644 --- a/polkadot/node/network/protocol/src/grid_topology.rs +++ b/polkadot/node/network/protocol/src/grid_topology.rs @@ -89,6 +89,26 @@ impl SessionGridTopology { SessionGridTopology { shuffled_indices, canonical_shuffling, peer_ids } } + /// Updates the known peer ids for the passed authorithies ids. + pub fn update_authority_ids( + &mut self, + peer_id: PeerId, + ids: &HashSet<AuthorityDiscoveryId>, + ) -> bool { + let mut updated = false; + if !self.peer_ids.contains(&peer_id) { + for peer in self + .canonical_shuffling + .iter_mut() + .filter(|peer| ids.contains(&peer.discovery_id)) + { + peer.peer_ids.push(peer_id); + self.peer_ids.insert(peer_id); + updated = true; + } + } + updated + } /// Produces the outgoing routing logic for a particular peer. /// /// Returns `None` if the validator index is out of bounds. @@ -269,6 +289,7 @@ impl GridNeighbors { pub struct SessionGridTopologyEntry { topology: SessionGridTopology, local_neighbors: GridNeighbors, + local_index: Option<ValidatorIndex>, } impl SessionGridTopologyEntry { @@ -291,6 +312,25 @@ impl SessionGridTopologyEntry { pub fn is_validator(&self, peer: &PeerId) -> bool { self.topology.is_validator(peer) } + + /// Updates the known peer ids for the passed authorithies ids. + pub fn update_authority_ids( + &mut self, + peer_id: PeerId, + ids: &HashSet<AuthorityDiscoveryId>, + ) -> bool { + let peer_id_updated = self.topology.update_authority_ids(peer_id, ids); + // If we added a new peer id we need to recompute the grid neighbors, so that + // neighbors_x and neighbors_y reflect the right peer ids. + if peer_id_updated { + if let Some(local_index) = self.local_index.as_ref() { + if let Some(new_grid) = self.topology.compute_grid_neighbors_for(*local_index) { + self.local_neighbors = new_grid; + } + } + } + peer_id_updated + } } /// A set of topologies indexed by session @@ -305,6 +345,20 @@ impl SessionGridTopologies { self.inner.get(&session).and_then(|val| val.0.as_ref()) } + /// Updates the known peer ids for the passed authorithies ids. + pub fn update_authority_ids( + &mut self, + peer_id: PeerId, + ids: &HashSet<AuthorityDiscoveryId>, + ) -> bool { + self.inner + .iter_mut() + .map(|(_, topology)| { + topology.0.as_mut().map(|topology| topology.update_authority_ids(peer_id, ids)) + }) + .any(|updated| updated.unwrap_or_default()) + } + /// Increase references counter for a specific topology pub fn inc_session_refs(&mut self, session: SessionIndex) { self.inner.entry(session).or_insert((None, 0)).1 += 1; @@ -333,7 +387,7 @@ impl SessionGridTopologies { .and_then(|l| topology.compute_grid_neighbors_for(l)) .unwrap_or_else(GridNeighbors::empty); - entry.0 = Some(SessionGridTopologyEntry { topology, local_neighbors }); + entry.0 = Some(SessionGridTopologyEntry { topology, local_neighbors, local_index }); } } } @@ -368,6 +422,7 @@ impl Default for SessionBoundGridTopologyStorage { peer_ids: Default::default(), }, local_neighbors: GridNeighbors::empty(), + local_index: None, }, }, prev_topology: None, @@ -412,7 +467,7 @@ impl SessionBoundGridTopologyStorage { let old_current = std::mem::replace( &mut self.current_topology, GridTopologySessionBound { - entry: SessionGridTopologyEntry { topology, local_neighbors }, + entry: SessionGridTopologyEntry { topology, local_neighbors, local_index }, session_index, }, ); diff --git a/polkadot/node/network/statement-distribution/src/legacy_v1/mod.rs b/polkadot/node/network/statement-distribution/src/legacy_v1/mod.rs index 93f97fe1dd6..e22883f8937 100644 --- a/polkadot/node/network/statement-distribution/src/legacy_v1/mod.rs +++ b/polkadot/node/network/statement-distribution/src/legacy_v1/mod.rs @@ -1892,7 +1892,9 @@ pub(crate) async fn handle_network_update<Context, R>( ?authority_ids, "Updated `AuthorityDiscoveryId`s" ); - + topology_storage + .get_current_topology_mut() + .update_authority_ids(peer, &authority_ids); // Remove the authority IDs which were previously mapped to the peer // but aren't part of the new set. authorities.retain(|a, p| p != &peer || authority_ids.contains(a)); -- GitLab