diff --git a/polkadot/node/network/bridge/src/rx/mod.rs b/polkadot/node/network/bridge/src/rx/mod.rs index bb99536f78334d024d8fe60974a7303fa783a39a..ba550df8f1a455134b47fcb6fbd90653f67c88e0 100644 --- a/polkadot/node/network/bridge/src/rx/mod.rs +++ b/polkadot/node/network/bridge/src/rx/mod.rs @@ -57,6 +57,7 @@ use polkadot_primitives::{AuthorityDiscoveryId, BlockNumber, Hash, ValidatorInde use std::{ collections::{hash_map, HashMap}, iter::ExactSizeIterator, + u32, }; use super::validator_discovery; @@ -750,7 +751,7 @@ where // This is kept sorted, descending, by block number. let mut live_heads: Vec<ActivatedLeaf> = Vec::with_capacity(MAX_VIEW_HEADS); let mut finalized_number = 0; - + let mut newest_session = u32::MIN; let mut mode = Mode::Syncing(sync_oracle); loop { match ctx.recv().fuse().await? { @@ -775,15 +776,29 @@ where flesh_out_topology_peers(&mut authority_discovery_service, canonical_shuffling) .await; - dispatch_validation_event_to_all_unbounded( - NetworkBridgeEvent::NewGossipTopology(NewGossipTopology { - session, - topology: SessionGridTopology::new(shuffled_indices, topology_peers), - local_index, - }), - ctx.sender(), - approval_voting_parallel_enabled, - ); + if session >= newest_session { + dispatch_validation_event_to_all_unbounded( + NetworkBridgeEvent::NewGossipTopology(NewGossipTopology { + session, + topology: SessionGridTopology::new(shuffled_indices, topology_peers), + local_index, + }), + ctx.sender(), + approval_voting_parallel_enabled, + ); + } else { + dispatch_validation_event_to_approval_unbounded( + &NetworkBridgeEvent::NewGossipTopology(NewGossipTopology { + session, + topology: SessionGridTopology::new(shuffled_indices, topology_peers), + local_index, + }), + ctx.sender(), + approval_voting_parallel_enabled, + ); + } + + newest_session = newest_session.max(session); }, FromOrchestra::Communication { msg: NetworkBridgeRxMessage::UpdatedAuthorityIds { peer_id, authority_ids }, @@ -1123,22 +1138,11 @@ async fn dispatch_collation_event_to_all( dispatch_collation_events_to_all(std::iter::once(event), ctx).await } -fn dispatch_validation_event_to_all_unbounded( - event: NetworkBridgeEvent<net_protocol::VersionedValidationProtocol>, +fn dispatch_validation_event_to_approval_unbounded( + event: &NetworkBridgeEvent<net_protocol::VersionedValidationProtocol>, sender: &mut impl overseer::NetworkBridgeRxSenderTrait, approval_voting_parallel_enabled: bool, ) { - event - .focus() - .ok() - .map(StatementDistributionMessage::from) - .and_then(|msg| Some(sender.send_unbounded_message(msg))); - event - .focus() - .ok() - .map(BitfieldDistributionMessage::from) - .and_then(|msg| Some(sender.send_unbounded_message(msg))); - if approval_voting_parallel_enabled { event .focus() @@ -1152,6 +1156,30 @@ fn dispatch_validation_event_to_all_unbounded( .map(ApprovalDistributionMessage::from) .and_then(|msg| Some(sender.send_unbounded_message(msg))); } +} + +fn dispatch_validation_event_to_all_unbounded( + event: NetworkBridgeEvent<net_protocol::VersionedValidationProtocol>, + sender: &mut impl overseer::NetworkBridgeRxSenderTrait, + approval_voting_parallel_enabled: bool, +) { + event + .focus() + .ok() + .map(StatementDistributionMessage::from) + .and_then(|msg| Some(sender.send_unbounded_message(msg))); + event + .focus() + .ok() + .map(BitfieldDistributionMessage::from) + .and_then(|msg| Some(sender.send_unbounded_message(msg))); + + dispatch_validation_event_to_approval_unbounded( + &event, + sender, + approval_voting_parallel_enabled, + ); + event .focus() .ok() diff --git a/polkadot/node/network/bridge/src/rx/tests.rs b/polkadot/node/network/bridge/src/rx/tests.rs index e3f2715ef2b07fc2738b244a1e92cf672bf5d2a0..4cd903a8b31a0298d59e30954dcbd4fe596e2d90 100644 --- a/polkadot/node/network/bridge/src/rx/tests.rs +++ b/polkadot/node/network/bridge/src/rx/tests.rs @@ -22,9 +22,11 @@ use polkadot_node_subsystem::messages::NetworkBridgeEvent; use assert_matches::assert_matches; use async_trait::async_trait; use parking_lot::Mutex; +use polkadot_overseer::TimeoutExt; use std::{ collections::HashSet, sync::atomic::{AtomicBool, Ordering}, + time::Duration, }; use sc_network::{ @@ -1465,6 +1467,120 @@ fn network_protocol_versioning_view_update() { }); } +// Test rx bridge sends the newest gossip topology to all subsystems and old ones only to approval +// distribution. +#[test] +fn network_new_topology_update() { + let (oracle, handle) = make_sync_oracle(false); + test_harness(Box::new(oracle), |test_harness| async move { + let TestHarness { mut network_handle, mut virtual_overseer, shared } = test_harness; + + let peer_ids: Vec<_> = (0..4).map(|_| PeerId::random()).collect(); + let peers = [ + (peer_ids[0], PeerSet::Validation, ValidationVersion::V2), + (peer_ids[1], PeerSet::Validation, ValidationVersion::V1), + (peer_ids[2], PeerSet::Validation, ValidationVersion::V1), + (peer_ids[3], PeerSet::Collation, ValidationVersion::V2), + ]; + + let head = Hash::repeat_byte(1); + virtual_overseer + .send(FromOrchestra::Signal(OverseerSignal::ActiveLeaves( + ActiveLeavesUpdate::start_work(new_leaf(head, 1)), + ))) + .await; + + handle.await_mode_switch().await; + + let mut total_validation_peers = 0; + let mut total_collation_peers = 0; + + for &(peer_id, peer_set, version) in &peers { + network_handle + .connect_peer(peer_id, version, peer_set, ObservedRole::Full) + .await; + + match peer_set { + PeerSet::Validation => total_validation_peers += 1, + PeerSet::Collation => total_collation_peers += 1, + } + } + + await_peer_connections(&shared, total_validation_peers, total_collation_peers).await; + + // Drain setup messages. + while let Some(_) = virtual_overseer.recv().timeout(Duration::from_secs(1)).await {} + + // 1. Send new gossip topology and check is sent to all subsystems. + virtual_overseer + .send(polkadot_overseer::FromOrchestra::Communication { + msg: NetworkBridgeRxMessage::NewGossipTopology { + session: 2, + local_index: Some(ValidatorIndex(0)), + canonical_shuffling: Vec::new(), + shuffled_indices: Vec::new(), + }, + }) + .await; + + assert_sends_validation_event_to_all( + NetworkBridgeEvent::NewGossipTopology(NewGossipTopology { + session: 2, + topology: SessionGridTopology::new(Vec::new(), Vec::new()), + local_index: Some(ValidatorIndex(0)), + }), + &mut virtual_overseer, + ) + .await; + + // 2. Send old gossip topology and check is sent only to approval distribution. + virtual_overseer + .send(polkadot_overseer::FromOrchestra::Communication { + msg: NetworkBridgeRxMessage::NewGossipTopology { + session: 1, + local_index: Some(ValidatorIndex(0)), + canonical_shuffling: Vec::new(), + shuffled_indices: Vec::new(), + }, + }) + .await; + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::NewGossipTopology(NewGossipTopology { + session: 1, + topology: _, + local_index: _, + }) + )) + ); + + // 3. Send new gossip topology and check is sent to all subsystems. + virtual_overseer + .send(polkadot_overseer::FromOrchestra::Communication { + msg: NetworkBridgeRxMessage::NewGossipTopology { + session: 3, + local_index: Some(ValidatorIndex(0)), + canonical_shuffling: Vec::new(), + shuffled_indices: Vec::new(), + }, + }) + .await; + + assert_sends_validation_event_to_all( + NetworkBridgeEvent::NewGossipTopology(NewGossipTopology { + session: 3, + topology: SessionGridTopology::new(Vec::new(), Vec::new()), + local_index: Some(ValidatorIndex(0)), + }), + &mut virtual_overseer, + ) + .await; + virtual_overseer + }); +} + #[test] fn network_protocol_versioning_subsystem_msg() { use polkadot_primitives::CandidateHash; diff --git a/polkadot/node/network/gossip-support/src/lib.rs b/polkadot/node/network/gossip-support/src/lib.rs index cd327c11e408c5117f9679938eb157e713b7afd9..dece8fae509392c695cabdde45970f4f7c01e68a 100644 --- a/polkadot/node/network/gossip-support/src/lib.rs +++ b/polkadot/node/network/gossip-support/src/lib.rs @@ -28,6 +28,7 @@ use std::{ collections::{HashMap, HashSet}, fmt, time::{Duration, Instant}, + u32, }; use futures::{channel::oneshot, select, FutureExt as _}; @@ -45,8 +46,8 @@ use polkadot_node_network_protocol::{ }; use polkadot_node_subsystem::{ messages::{ - GossipSupportMessage, NetworkBridgeEvent, NetworkBridgeRxMessage, NetworkBridgeTxMessage, - RuntimeApiMessage, RuntimeApiRequest, + ChainApiMessage, GossipSupportMessage, NetworkBridgeEvent, NetworkBridgeRxMessage, + NetworkBridgeTxMessage, RuntimeApiMessage, RuntimeApiRequest, }, overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, }; @@ -96,6 +97,8 @@ pub struct GossipSupport<AD> { keystore: KeystorePtr, last_session_index: Option<SessionIndex>, + /// The minimum known session we build the topology for. + min_known_session: SessionIndex, // Some(timestamp) if we failed to resolve // at least a third of authorities the last time. // `None` otherwise. @@ -130,6 +133,9 @@ pub struct GossipSupport<AD> { /// Authority discovery service. authority_discovery: AD, + /// The oldest session we need to build a topology for because + /// the finalized blocks are from a session we haven't built a topology for. + finalized_needed_session: Option<u32>, /// Subsystem metrics. metrics: Metrics, } @@ -154,7 +160,9 @@ where resolved_authorities: HashMap::new(), connected_authorities: HashMap::new(), connected_peers: HashMap::new(), + min_known_session: u32::MAX, authority_discovery, + finalized_needed_session: None, metrics, } } @@ -199,7 +207,22 @@ where gum::debug!(target: LOG_TARGET, error = ?e); } }, - FromOrchestra::Signal(OverseerSignal::BlockFinalized(_hash, _number)) => {}, + FromOrchestra::Signal(OverseerSignal::BlockFinalized(_hash, _number)) => + if let Some(session_index) = self.last_session_index { + if let Err(e) = self + .build_topology_for_last_finalized_if_needed( + ctx.sender(), + session_index, + ) + .await + { + gum::warn!( + target: LOG_TARGET, + "Failed to build topology for last finalized session: {:?}", + e + ); + } + }, FromOrchestra::Signal(OverseerSignal::Conclude) => return self, } } @@ -294,9 +317,19 @@ where } if is_new_session { + if let Err(err) = self + .build_topology_for_last_finalized_if_needed(sender, session_index) + .await + { + gum::warn!( + target: LOG_TARGET, + "Failed to build topology for last finalized session: {:?}", + err + ); + } + // Gossip topology is only relevant for authorities in the current session. let our_index = self.get_key_index_and_update_metrics(&session_info)?; - update_gossip_topology( sender, our_index, @@ -314,6 +347,85 @@ where Ok(()) } + /// Build the gossip topology for the session of the last finalized block if we haven't built + /// one. + /// + /// This is needed to ensure that if finality is lagging accross session boundary and a restart + /// happens after the new session started, we built a topology from the session we haven't + /// finalized the blocks yet. + /// Once finalized blocks start to be from a session we've built a topology for, we can stop. + async fn build_topology_for_last_finalized_if_needed( + &mut self, + sender: &mut impl overseer::GossipSupportSenderTrait, + current_session_index: u32, + ) -> Result<(), util::Error> { + self.min_known_session = self.min_known_session.min(current_session_index); + + if self + .finalized_needed_session + .map(|oldest_needed_session| oldest_needed_session < self.min_known_session) + .unwrap_or(true) + { + let (tx, rx) = oneshot::channel(); + sender.send_message(ChainApiMessage::FinalizedBlockNumber(tx)).await; + let finalized_block_number = match rx.await? { + Ok(block_number) => block_number, + _ => return Ok(()), + }; + + let (tx, rx) = oneshot::channel(); + sender + .send_message(ChainApiMessage::FinalizedBlockHash(finalized_block_number, tx)) + .await; + + let finalized_block_hash = match rx.await? { + Ok(Some(block_hash)) => block_hash, + _ => return Ok(()), + }; + + let finalized_session_index = + util::request_session_index_for_child(finalized_block_hash, sender) + .await + .await??; + + if finalized_session_index < self.min_known_session && + Some(finalized_session_index) != self.finalized_needed_session + { + gum::debug!( + target: LOG_TARGET, + ?finalized_block_hash, + ?finalized_block_number, + ?finalized_session_index, + "Building topology for finalized block session", + ); + + let finalized_session_info = match util::request_session_info( + finalized_block_hash, + finalized_session_index, + sender, + ) + .await + .await?? + { + Some(session_info) => session_info, + _ => return Ok(()), + }; + + let our_index = self.get_key_index_and_update_metrics(&finalized_session_info)?; + update_gossip_topology( + sender, + our_index, + finalized_session_info.discovery_keys.clone(), + finalized_block_hash, + finalized_session_index, + ) + .await?; + } + self.finalized_needed_session = Some(finalized_session_index); + } + Ok(()) + } + // Checks if the node is an authority and also updates `polkadot_node_is_authority` and // `polkadot_node_is_parachain_validator` metrics accordingly. // On success, returns the index of our keys in `session_info.discovery_keys`. diff --git a/polkadot/node/network/gossip-support/src/tests.rs b/polkadot/node/network/gossip-support/src/tests.rs index 399f29db67da8c029cab8b151eff220e03398b68..873a1122f2c5e29a50f0ca7e79e034ddc745c711 100644 --- a/polkadot/node/network/gossip-support/src/tests.rs +++ b/polkadot/node/network/gossip-support/src/tests.rs @@ -262,6 +262,38 @@ async fn overseer_recv(overseer: &mut VirtualOverseer) -> AllMessages { msg } +async fn provide_info_for_finalized(overseer: &mut VirtualOverseer, test_session: SessionIndex) { + assert_matches!( + overseer_recv(overseer).await, + AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber( + channel, + )) => { + channel.send(Ok(1)).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash( + _, + channel, + )) => { + channel.send(Ok(Some(Hash::repeat_byte(0xAA)))).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _, + RuntimeApiRequest::SessionIndexForChild(tx), + )) => { + // assert_eq!(relay_parent, hash); + tx.send(Ok(test_session)).unwrap(); + } + ); +} + async fn test_neighbors(overseer: &mut VirtualOverseer, expected_session: SessionIndex) { assert_matches!( overseer_recv(overseer).await, @@ -375,6 +407,7 @@ fn issues_a_connection_request_on_new_session() { assert_eq!(peer_set, PeerSet::Validation); } ); + provide_info_for_finalized(overseer, 1).await; test_neighbors(overseer, 1).await; @@ -527,6 +560,7 @@ fn issues_connection_request_to_past_present_future() { assert_eq!(peer_set, PeerSet::Validation); } ); + provide_info_for_finalized(overseer, 1).await; // Ensure neighbors are unaffected test_neighbors(overseer, 1).await; @@ -607,6 +641,7 @@ fn issues_update_authorities_after_session() { } ); + provide_info_for_finalized(overseer, 1).await; // Ensure neighbors are unaffected assert_matches!( overseer_recv(overseer).await, @@ -878,6 +913,7 @@ fn test_quickly_connect_to_authorities_that_changed_address() { } ); + provide_info_for_finalized(overseer, 1).await; // Ensure neighbors are unaffected assert_matches!( overseer_recv(overseer).await, @@ -1171,6 +1207,7 @@ fn disconnect_when_not_in_past_present_future() { } ); + provide_info_for_finalized(overseer, 1).await; virtual_overseer }, ); @@ -1268,13 +1305,13 @@ fn issues_a_connection_request_when_last_request_was_mostly_unresolved() { assert_eq!(peer_set, PeerSet::Validation); } ); + provide_info_for_finalized(overseer, 1).await; test_neighbors(overseer, 1).await; virtual_overseer }) }; - assert_eq!(state.last_session_index, Some(1)); assert!(state.last_failure.is_some()); state.last_failure = state.last_failure.and_then(|i| i.checked_sub(BACKOFF_DURATION)); @@ -1340,6 +1377,158 @@ fn issues_a_connection_request_when_last_request_was_mostly_unresolved() { assert!(state.last_failure.is_none()); } +// Test that topology is updated for all sessions we still have unfinalized blocks for. +#[test] +fn updates_topology_for_all_finalized_blocks() { + let hash = Hash::repeat_byte(0xAA); + let mock_authority_discovery = + MockAuthorityDiscovery::new(PAST_PRESENT_FUTURE_AUTHORITIES.clone()); + test_harness( + make_subsystem_with_authority_discovery(mock_authority_discovery.clone()), + |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + overseer_signal_active_leaves(overseer, hash).await; + let active_session = 5; + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionIndexForChild(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(active_session)).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionInfo(s, tx), + )) => { + assert_eq!(relay_parent, hash); + assert_eq!(s, active_session); + tx.send(Ok(Some(make_session_info()))).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Authorities(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(PAST_PRESENT_FUTURE_AUTHORITIES.clone())).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ConnectToResolvedValidators { + validator_addrs, + peer_set, + }) => { + let all_without_ferdie: Vec<_> = PAST_PRESENT_FUTURE_AUTHORITIES + .iter() + .cloned() + .filter(|p| p != &Sr25519Keyring::Ferdie.public().into()) + .collect(); + + let addrs = get_multiaddrs(all_without_ferdie, mock_authority_discovery.clone()).await; + + assert_eq!(validator_addrs, addrs); + assert_eq!(peer_set, PeerSet::Validation); + } + ); + + // Ensure first time we update the topology we also update topology for the session last + // finalized is in. + provide_info_for_finalized(overseer, 1).await; + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionInfo(s, tx), + )) => { + assert_eq!(relay_parent, hash); + assert_eq!(s, 1); + tx.send(Ok(Some(make_session_info()))).unwrap(); + } + ); + // Ensure we received topology for the session last finalized is in and the current + // active session + test_neighbors(overseer, 1).await; + test_neighbors(overseer, active_session).await; + + let mut block_number = 3; + // As finalized progresses, we should update topology for all sessions until we caught + // up with the known sessions. + for finalized in 2..active_session { + block_number += 1; + overseer + .send(FromOrchestra::Signal(OverseerSignal::BlockFinalized( + Hash::repeat_byte(block_number as u8), + block_number, + ))) + .timeout(TIMEOUT) + .await + .expect("signal send timeout"); + provide_info_for_finalized(overseer, finalized).await; + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionInfo(s, tx), + )) => { + assert_eq!(relay_parent, hash); + assert_eq!(s, finalized); + tx.send(Ok(Some(make_session_info()))).unwrap(); + } + ); + test_neighbors(overseer, finalized).await; + + block_number += 1; + overseer + .send(FromOrchestra::Signal(OverseerSignal::BlockFinalized( + Hash::repeat_byte(block_number as u8), + block_number, + ))) + .timeout(TIMEOUT) + .await + .expect("signal send timeout"); + provide_info_for_finalized(overseer, finalized).await; + } + + // No topology update is sent once finalized block is in the active session. + block_number += 1; + overseer + .send(FromOrchestra::Signal(OverseerSignal::BlockFinalized( + Hash::repeat_byte(block_number as u8), + block_number, + ))) + .timeout(TIMEOUT) + .await + .expect("signal send timeout"); + provide_info_for_finalized(overseer, active_session).await; + + // Code becomes no-op after we caught up with the last finalized block being in the + // active session. + block_number += 1; + overseer + .send(FromOrchestra::Signal(OverseerSignal::BlockFinalized( + Hash::repeat_byte(block_number as u8), + block_number, + ))) + .timeout(TIMEOUT) + .await + .expect("signal send timeout"); + + virtual_overseer + }, + ); +} + // note: this test was added at a time where the default `rand::SliceRandom::shuffle` // function was used to shuffle authorities for the topology and ensures backwards compatibility. // diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index 17470d74577d2147bb4faf2461aa2e1faa85ffae..a8d0ab90f6b91d35fe4967960b9e73ec94a0d1db 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -615,6 +615,7 @@ pub struct Overseer<SupportsParachains> { NetworkBridgeRxMessage, // TODO <https://github.com/paritytech/polkadot/issues/5626> RuntimeApiMessage, ChainSelectionMessage, + ChainApiMessage, ], can_receive_priority_messages)] gossip_support: GossipSupport, diff --git a/prdoc/pr_6913.prdoc b/prdoc/pr_6913.prdoc new file mode 100644 index 0000000000000000000000000000000000000000..16d2564a6eed86e5b136fc51157dcafcc528b44b --- /dev/null +++ b/prdoc/pr_6913.prdoc @@ -0,0 +1,13 @@ +title: Enable approval-voting-parallel by default on polkadot + +doc: + - audience: Node Dev + description: | + Enable approval-voting-parallel by default on polkadot +crates: + - name: polkadot-network-bridge + bump: minor + - name: polkadot-gossip-support + bump: minor + - name: polkadot-overseer + bump: minor