From 064df81ee48d6da176e487fe1c7ebe3d9dcad099 Mon Sep 17 00:00:00 2001 From: Robert Habermeier <rphmeier@gmail.com> Date: Fri, 26 Mar 2021 13:06:40 +0100 Subject: [PATCH] Add block number to activated leaves and associated fixes (#2718) * add number to `ActivatedLeavesUpdate` * update subsystem util and overseer * use new ActivatedLeaf everywhere * sort view * sorted and limited view in network bridge * use live block hash only if it's newer * grumples --- polkadot/node/collation-generation/src/lib.rs | 2 +- polkadot/node/core/approval-voting/src/lib.rs | 3 +- polkadot/node/core/av-store/src/lib.rs | 4 +- polkadot/node/core/av-store/src/tests.rs | 14 ++- polkadot/node/core/backing/src/lib.rs | 11 +- .../src/requester/mod.rs | 7 +- .../src/tests/state.rs | 14 ++- .../network/availability-recovery/src/lib.rs | 18 +-- .../availability-recovery/src/tests.rs | 44 +++++-- .../network/bitfield-distribution/src/lib.rs | 6 +- polkadot/node/network/bridge/src/lib.rs | 112 +++++++++++++++--- .../collator-protocol/src/collator_side.rs | 8 +- .../node/network/gossip-support/src/lib.rs | 2 +- .../node/network/pov-distribution/src/lib.rs | 5 +- .../network/pov-distribution/src/tests.rs | 14 ++- .../network/statement-distribution/src/lib.rs | 13 +- polkadot/node/overseer/src/lib.rs | 53 +++++++-- polkadot/node/subsystem-util/src/lib.rs | 20 +++- polkadot/node/subsystem/src/lib.rs | 31 +++-- .../src/types/overseer-protocol.md | 2 +- 20 files changed, 286 insertions(+), 97 deletions(-) diff --git a/polkadot/node/collation-generation/src/lib.rs b/polkadot/node/collation-generation/src/lib.rs index f91b71e839e..fe7a448ac91 100644 --- a/polkadot/node/collation-generation/src/lib.rs +++ b/polkadot/node/collation-generation/src/lib.rs @@ -128,7 +128,7 @@ impl CollationGenerationSubsystem { let metrics = self.metrics.clone(); if let Err(err) = handle_new_activations( config.clone(), - activated.into_iter().map(|v| v.0), + activated.into_iter().map(|v| v.hash), ctx, metrics, sender, diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs index 4ee997d77d8..d04ab79f0cc 100644 --- a/polkadot/node/core/approval-voting/src/lib.rs +++ b/polkadot/node/core/approval-voting/src/lib.rs @@ -591,7 +591,8 @@ async fn handle_from_overseer( FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => { let mut actions = Vec::new(); - for (head, _span) in update.activated { + for activated in update.activated { + let head = activated.hash; match import::handle_new_head( ctx, state, diff --git a/polkadot/node/core/av-store/src/lib.rs b/polkadot/node/core/av-store/src/lib.rs index 226bfe13dc4..18200b5ac7a 100644 --- a/polkadot/node/core/av-store/src/lib.rs +++ b/polkadot/node/core/av-store/src/lib.rs @@ -551,9 +551,9 @@ where FromOverseer::Signal(OverseerSignal::ActiveLeaves( ActiveLeavesUpdate { activated, .. }) ) => { - for (activated, _span) in activated.into_iter() { + for activated in activated.into_iter() { let _timer = subsystem.metrics.time_block_activated(); - process_block_activated(ctx, subsystem, activated).await?; + process_block_activated(ctx, subsystem, activated.hash).await?; } } FromOverseer::Signal(OverseerSignal::BlockFinalized(hash, number)) => { diff --git a/polkadot/node/core/av-store/src/tests.rs b/polkadot/node/core/av-store/src/tests.rs index c92e28ce3d8..465fae94a26 100644 --- a/polkadot/node/core/av-store/src/tests.rs +++ b/polkadot/node/core/av-store/src/tests.rs @@ -31,7 +31,7 @@ use polkadot_primitives::v1::{ }; use polkadot_node_subsystem_util::TimeoutExt; use polkadot_subsystem::{ - ActiveLeavesUpdate, errors::RuntimeApiError, jaeger, messages::AllMessages, + ActiveLeavesUpdate, errors::RuntimeApiError, jaeger, messages::AllMessages, ActivatedLeaf, }; use polkadot_node_subsystem_test_helpers as test_helpers; use sp_keyring::Sr25519Keyring; @@ -240,7 +240,11 @@ fn runtime_api_error_does_not_stop_the_subsystem() { overseer_signal( &mut virtual_overseer, OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { - activated: vec![(new_leaf, Arc::new(jaeger::Span::Disabled))].into(), + activated: vec![ActivatedLeaf { + hash: new_leaf, + number: 1, + span: Arc::new(jaeger::Span::Disabled), + }].into(), deactivated: vec![].into(), }), ).await; @@ -885,7 +889,11 @@ async fn import_leaf( overseer_signal( virtual_overseer, OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { - activated: vec![(new_leaf, Arc::new(jaeger::Span::Disabled))].into(), + activated: vec![ActivatedLeaf { + hash: new_leaf, + number: 1, + span: Arc::new(jaeger::Span::Disabled), + }].into(), deactivated: vec![].into(), }), ).await; diff --git a/polkadot/node/core/backing/src/lib.rs b/polkadot/node/core/backing/src/lib.rs index 6dd64b56a7a..0e1a8ec8f17 100644 --- a/polkadot/node/core/backing/src/lib.rs +++ b/polkadot/node/core/backing/src/lib.rs @@ -1249,7 +1249,7 @@ mod tests { use polkadot_primitives::v1::{BlockData, GroupRotationInfo, HeadData, PersistedValidationData, ScheduledCore}; use polkadot_subsystem::{ messages::{RuntimeApiRequest, RuntimeApiMessage}, - ActiveLeavesUpdate, FromOverseer, OverseerSignal, + ActiveLeavesUpdate, FromOverseer, OverseerSignal, ActivatedLeaf, }; use polkadot_node_primitives::InvalidCandidate; use sp_keyring::Sr25519Keyring; @@ -1428,10 +1428,11 @@ mod tests { ) { // Start work on some new parent. virtual_overseer.send(FromOverseer::Signal( - OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work( - test_state.relay_parent, - Arc::new(jaeger::Span::Disabled), - ))) + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf { + hash: test_state.relay_parent, + number: 1, + span: Arc::new(jaeger::Span::Disabled), + }))) ).await; // Check that subsystem job issues a request for a validator set. diff --git a/polkadot/node/network/availability-distribution/src/requester/mod.rs b/polkadot/node/network/availability-distribution/src/requester/mod.rs index 52843581c42..ad7b1f036b0 100644 --- a/polkadot/node/network/availability-distribution/src/requester/mod.rs +++ b/polkadot/node/network/availability-distribution/src/requester/mod.rs @@ -23,7 +23,6 @@ use std::collections::{ }; use std::iter::IntoIterator; use std::pin::Pin; -use std::sync::Arc; use futures::{ channel::mpsc, @@ -36,7 +35,7 @@ use sp_keystore::SyncCryptoStorePtr; use polkadot_node_subsystem_util::request_availability_cores_ctx; use polkadot_primitives::v1::{CandidateHash, CoreState, Hash, OccupiedCore}; use polkadot_subsystem::{ - messages::AllMessages, ActiveLeavesUpdate, jaeger, SubsystemContext, + messages::AllMessages, ActiveLeavesUpdate, SubsystemContext, ActivatedLeaf, }; use super::{error::recv_runtime, session_cache::SessionCache, LOG_TARGET, Metrics}; @@ -121,12 +120,12 @@ impl Requester { async fn start_requesting_chunks<Context>( &mut self, ctx: &mut Context, - new_heads: impl Iterator<Item = (Hash, Arc<jaeger::Span>)>, + new_heads: impl Iterator<Item = ActivatedLeaf>, ) -> super::Result<Option<NonFatalError>> where Context: SubsystemContext, { - for (leaf, _) in new_heads { + for ActivatedLeaf { hash: leaf, .. } in new_heads { let cores = match query_occupied_cores(ctx, leaf).await { Err(err) => return Ok(Some(err)), Ok(cores) => cores, diff --git a/polkadot/node/network/availability-distribution/src/tests/state.rs b/polkadot/node/network/availability-distribution/src/tests/state.rs index a227cca0367..42cd72dff08 100644 --- a/polkadot/node/network/availability-distribution/src/tests/state.rs +++ b/polkadot/node/network/availability-distribution/src/tests/state.rs @@ -32,9 +32,11 @@ use sc_network as network; use sc_network::IfDisconnected; use sc_network::config as netconfig; -use polkadot_subsystem::{ActiveLeavesUpdate, FromOverseer, OverseerSignal, messages::{AllMessages, - AvailabilityDistributionMessage, AvailabilityStoreMessage, NetworkBridgeMessage, RuntimeApiMessage, - RuntimeApiRequest} +use polkadot_subsystem::{ActiveLeavesUpdate, FromOverseer, OverseerSignal, ActivatedLeaf, + messages::{ + AllMessages, AvailabilityDistributionMessage, AvailabilityStoreMessage, NetworkBridgeMessage, + RuntimeApiMessage, RuntimeApiRequest, + } }; use polkadot_primitives::v1::{CandidateHash, CoreState, ErasureChunk, GroupIndex, Hash, Id as ParaId, ScheduledCore, SessionInfo, ValidatorId, @@ -169,7 +171,11 @@ impl TestState { self .relay_chain.iter().zip(advanced) .map(|(old, new)| ActiveLeavesUpdate { - activated: smallvec![(new.clone(), Arc::new(jaeger::Span::Disabled))], + activated: smallvec![ActivatedLeaf { + hash: new.clone(), + number: 1, + span: Arc::new(jaeger::Span::Disabled), + }], deactivated: smallvec![old.clone()], }).collect::<Vec<_>>() }; diff --git a/polkadot/node/network/availability-recovery/src/lib.rs b/polkadot/node/network/availability-recovery/src/lib.rs index 3470e2c3cd9..d8d282e1152 100644 --- a/polkadot/node/network/availability-recovery/src/lib.rs +++ b/polkadot/node/network/availability-recovery/src/lib.rs @@ -28,7 +28,7 @@ use rand::seq::SliceRandom; use polkadot_primitives::v1::{ AuthorityDiscoveryId, AvailableData, CandidateReceipt, CandidateHash, Hash, ErasureChunk, ValidatorId, ValidatorIndex, - SessionInfo, SessionIndex, BlakeTwo256, HashT, GroupIndex, + SessionInfo, SessionIndex, BlakeTwo256, HashT, GroupIndex, BlockNumber, }; use polkadot_subsystem::{ SubsystemContext, SubsystemResult, SubsystemError, Subsystem, SpawnedSubsystem, FromOverseer, @@ -473,7 +473,7 @@ struct State { interactions: HashMap<CandidateHash, InteractionHandle>, /// A recent block hash for which state should be available. - live_block_hash: Hash, + live_block: (BlockNumber, Hash), /// interaction communication. This is cloned and given to interactions that are spun up. from_interaction_tx: mpsc::Sender<FromInteraction>, @@ -491,7 +491,7 @@ impl Default for State { Self { interactions: HashMap::new(), - live_block_hash: Hash::default(), + live_block: (0, Hash::default()), from_interaction_tx, from_interaction_rx, availability_lru: LruCache::new(LRU_SIZE), @@ -521,9 +521,11 @@ async fn handle_signal( match signal { OverseerSignal::Conclude => Ok(true), OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, .. }) => { - // if activated is non-empty, set state.live_block_hash to the first block in Activated. - if let Some(hash) = activated.get(0) { - state.live_block_hash = hash.0; + // if activated is non-empty, set state.live_block to the highest block in `activated` + for activated in activated { + if activated.number > state.live_block.0 { + state.live_block = (activated.number, activated.hash) + } } Ok(false) @@ -630,7 +632,7 @@ async fn handle_recover( let _span = span.child("not-cached"); let session_info = request_session_info_ctx( - state.live_block_hash, + state.live_block.1, session_index, ctx, ).await?.await.map_err(error::Error::CanceledSessionInfo)??; @@ -651,7 +653,7 @@ async fn handle_recover( None => { tracing::warn!( target: LOG_TARGET, - "SessionInfo is `None` at {}", state.live_block_hash, + "SessionInfo is `None` at {:?}", state.live_block, ); response_sender .send(Err(RecoveryError::Unavailable)) diff --git a/polkadot/node/network/availability-recovery/src/tests.rs b/polkadot/node/network/availability-recovery/src/tests.rs index 2dda6cbe59f..f7e2a2f30b6 100644 --- a/polkadot/node/network/availability-recovery/src/tests.rs +++ b/polkadot/node/network/availability-recovery/src/tests.rs @@ -32,7 +32,7 @@ use polkadot_primitives::v1::{ use polkadot_erasure_coding::{branches, obtain_chunks_v1 as obtain_chunks}; use polkadot_node_subsystem_util::TimeoutExt; use polkadot_subsystem_testhelpers as test_helpers; -use polkadot_subsystem::{messages::{RuntimeApiMessage, RuntimeApiRequest}, jaeger}; +use polkadot_subsystem::{messages::{RuntimeApiMessage, RuntimeApiRequest}, jaeger, ActivatedLeaf}; type VirtualOverseer = test_helpers::TestSubsystemContextHandle<AvailabilityRecoveryMessage>; @@ -418,7 +418,11 @@ fn availability_is_recovered_from_chunks_if_no_group_provided() { overseer_signal( &mut virtual_overseer, OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { - activated: smallvec![(test_state.current.clone(), Arc::new(jaeger::Span::Disabled))], + activated: smallvec![ActivatedLeaf { + hash: test_state.current.clone(), + number: 1, + span: Arc::new(jaeger::Span::Disabled), + }], deactivated: smallvec![], }), ).await; @@ -490,7 +494,11 @@ fn availability_is_recovered_from_chunks_even_if_backing_group_supplied_if_chunk overseer_signal( &mut virtual_overseer, OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { - activated: smallvec![(test_state.current.clone(), Arc::new(jaeger::Span::Disabled))], + activated: smallvec![ActivatedLeaf { + hash: test_state.current.clone(), + number: 1, + span: Arc::new(jaeger::Span::Disabled), + }], deactivated: smallvec![], }), ).await; @@ -562,7 +570,11 @@ fn bad_merkle_path_leads_to_recovery_error() { overseer_signal( &mut virtual_overseer, OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { - activated: smallvec![(test_state.current.clone(), Arc::new(jaeger::Span::Disabled))], + activated: smallvec![ActivatedLeaf { + hash: test_state.current.clone(), + number: 1, + span: Arc::new(jaeger::Span::Disabled), + }], deactivated: smallvec![], }), ).await; @@ -612,7 +624,11 @@ fn wrong_chunk_index_leads_to_recovery_error() { overseer_signal( &mut virtual_overseer, OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { - activated: smallvec![(test_state.current.clone(), Arc::new(jaeger::Span::Disabled))], + activated: smallvec![ActivatedLeaf { + hash: test_state.current.clone(), + number: 1, + span: Arc::new(jaeger::Span::Disabled), + }], deactivated: smallvec![], }), ).await; @@ -682,7 +698,11 @@ fn invalid_erasure_coding_leads_to_invalid_error() { overseer_signal( &mut virtual_overseer, OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { - activated: smallvec![(test_state.current.clone(), Arc::new(jaeger::Span::Disabled))], + activated: smallvec![ActivatedLeaf { + hash: test_state.current.clone(), + number: 1, + span: Arc::new(jaeger::Span::Disabled), + }], deactivated: smallvec![], }), ).await; @@ -723,7 +743,11 @@ fn fast_path_backing_group_recovers() { overseer_signal( &mut virtual_overseer, OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { - activated: smallvec![(test_state.current.clone(), Arc::new(jaeger::Span::Disabled))], + activated: smallvec![ActivatedLeaf { + hash: test_state.current.clone(), + number: 1, + span: Arc::new(jaeger::Span::Disabled), + }], deactivated: smallvec![], }), ).await; @@ -768,7 +792,11 @@ fn no_answers_in_fast_path_causes_chunk_requests() { overseer_signal( &mut virtual_overseer, OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { - activated: smallvec![(test_state.current.clone(), Arc::new(jaeger::Span::Disabled))], + activated: smallvec![ActivatedLeaf { + hash: test_state.current.clone(), + number: 1, + span: Arc::new(jaeger::Span::Disabled), + }], deactivated: smallvec![], }), ).await; diff --git a/polkadot/node/network/bitfield-distribution/src/lib.rs b/polkadot/node/network/bitfield-distribution/src/lib.rs index fff3ede9518..546a086a08a 100644 --- a/polkadot/node/network/bitfield-distribution/src/lib.rs +++ b/polkadot/node/network/bitfield-distribution/src/lib.rs @@ -192,9 +192,11 @@ impl BitfieldDistribution { FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, .. })) => { let _timer = self.metrics.time_active_leaves_update(); - for (relay_parent, span) in activated { + for activated in activated { + let relay_parent = activated.hash; + tracing::trace!(target: LOG_TARGET, relay_parent = %relay_parent, "activated"); - let span = PerLeafSpan::new(span, "bitfield-distribution"); + let span = PerLeafSpan::new(activated.span, "bitfield-distribution"); let _span = span.child("query-basics"); // query validator set and signing context per relay_parent once only diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index 1c5b2121652..6afe21f26ba 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -24,8 +24,8 @@ use parity_scale_codec::{Encode, Decode}; use futures::prelude::*; use polkadot_subsystem::{ - ActiveLeavesUpdate, Subsystem, SubsystemContext, SpawnedSubsystem, SubsystemError, - SubsystemResult, jaeger, + ActiveLeavesUpdate, ActivatedLeaf, Subsystem, SubsystemContext, SpawnedSubsystem, SubsystemError, + SubsystemResult, }; use polkadot_subsystem::messages::{ NetworkBridgeMessage, AllMessages, @@ -43,7 +43,6 @@ pub use polkadot_node_network_protocol::peer_set::{peer_sets_info, IsAuthority}; use std::collections::{HashMap, hash_map}; use std::iter::ExactSizeIterator; -use std::sync::Arc; use std::time::Instant; mod validator_discovery; @@ -154,8 +153,8 @@ where { let mut event_stream = bridge.network_service.event_stream().fuse(); - // Most recent heads are at the back. - let mut live_heads: Vec<(Hash, Arc<jaeger::Span>)> = Vec::with_capacity(MAX_VIEW_HEADS); + // This is kept sorted, descending, by block number. + let mut live_heads: Vec<ActivatedLeaf> = Vec::with_capacity(MAX_VIEW_HEADS); let mut local_view = View::default(); let mut finalized_number = 0; @@ -315,8 +314,14 @@ where num_deactivated = %deactivated.len(), ); - live_heads.extend(activated); - live_heads.retain(|h| !deactivated.contains(&h.0)); + for activated in activated { + let pos = live_heads + .binary_search_by(|probe| probe.number.cmp(&activated.number).reverse()) + .unwrap_or_else(|i| i); + + live_heads.insert(pos, activated); + } + live_heads.retain(|h| !deactivated.contains(&h.hash)); update_our_view( &mut bridge.network_service, @@ -490,8 +495,8 @@ where fn construct_view(live_heads: impl DoubleEndedIterator<Item = Hash>, finalized_number: BlockNumber) -> View { View::new( - live_heads.rev().take(MAX_VIEW_HEADS), - finalized_number + live_heads.take(MAX_VIEW_HEADS), + finalized_number, ) } @@ -499,13 +504,13 @@ fn construct_view(live_heads: impl DoubleEndedIterator<Item = Hash>, finalized_n async fn update_our_view( net: &mut impl Network, ctx: &mut impl SubsystemContext<Message = NetworkBridgeMessage>, - live_heads: &[(Hash, Arc<jaeger::Span>)], + live_heads: &[ActivatedLeaf], local_view: &mut View, finalized_number: BlockNumber, validation_peers: &HashMap<PeerId, PeerData>, collation_peers: &HashMap<PeerId, PeerData>, ) -> SubsystemResult<()> { - let new_view = construct_view(live_heads.iter().map(|v| v.0), finalized_number); + let new_view = construct_view(live_heads.iter().map(|v| v.hash), finalized_number); // We only want to send a view update when the heads changed. // A change in finalized block number only is _not_ sufficient. @@ -527,7 +532,10 @@ async fn update_our_view( WireMessage::ViewUpdate(new_view), ).await?; - let our_view = OurView::new(live_heads.iter().cloned(), finalized_number); + let our_view = OurView::new( + live_heads.iter().take(MAX_VIEW_HEADS).cloned().map(|a| (a.hash, a.span)), + finalized_number, + ); dispatch_validation_event_to_all(NetworkBridgeEvent::OurViewChange(our_view.clone()), ctx).await; @@ -684,7 +692,7 @@ mod tests { use sc_network::{Event as NetworkEvent, IfDisconnected}; - use polkadot_subsystem::{ActiveLeavesUpdate, FromOverseer, OverseerSignal}; + use polkadot_subsystem::{jaeger, ActiveLeavesUpdate, FromOverseer, OverseerSignal}; use polkadot_subsystem::messages::{ ApprovalDistributionMessage, BitfieldDistributionMessage, @@ -929,7 +937,11 @@ mod tests { let head = Hash::repeat_byte(1); virtual_overseer.send( FromOverseer::Signal(OverseerSignal::ActiveLeaves( - ActiveLeavesUpdate::start_work(head, Arc::new(jaeger::Span::Disabled)), + ActiveLeavesUpdate::start_work(ActivatedLeaf { + hash: head, + number: 1, + span: Arc::new(jaeger::Span::Disabled), + }) )) ).await; @@ -984,7 +996,11 @@ mod tests { virtual_overseer.send( FromOverseer::Signal(OverseerSignal::ActiveLeaves( - ActiveLeavesUpdate::start_work(hash_a, Arc::new(jaeger::Span::Disabled)), + ActiveLeavesUpdate::start_work(ActivatedLeaf { + hash: hash_a, + number: 1, + span: Arc::new(jaeger::Span::Disabled), + }) )) ).await; @@ -1046,7 +1062,11 @@ mod tests { // This should trigger the view update to our peers. virtual_overseer.send( FromOverseer::Signal(OverseerSignal::ActiveLeaves( - ActiveLeavesUpdate::start_work(hash_a, Arc::new(jaeger::Span::Disabled)), + ActiveLeavesUpdate::start_work(ActivatedLeaf { + hash: hash_a, + number: 1, + span: Arc::new(jaeger::Span::Disabled), + }) )) ).await; @@ -1236,7 +1256,11 @@ mod tests { virtual_overseer.send( FromOverseer::Signal(OverseerSignal::ActiveLeaves( - ActiveLeavesUpdate::start_work(hash_a, Arc::new(jaeger::Span::Disabled)), + ActiveLeavesUpdate::start_work(ActivatedLeaf { + hash: hash_a, + number: 1, + span: Arc::new(jaeger::Span::Disabled), + }) )) ).await; @@ -1429,7 +1453,11 @@ mod tests { ).await; virtual_overseer.send( FromOverseer::Signal(OverseerSignal::ActiveLeaves( - ActiveLeavesUpdate::start_work(hash_b, Arc::new(jaeger::Span::Disabled)), + ActiveLeavesUpdate::start_work(ActivatedLeaf { + hash: hash_b, + number: 1, + span: Arc::new(jaeger::Span::Disabled), + }) )) ).await; @@ -1626,4 +1654,52 @@ mod tests { } assert_eq!(cnt, EXPECTED_COUNT); } + + #[test] + fn our_view_updates_decreasing_order_and_limited_to_max() { + test_harness(|test_harness| async move { + let TestHarness { + mut virtual_overseer, + .. + } = test_harness; + + + // to show that we're still connected on the collation protocol, send a view update. + + let hashes = (0..MAX_VIEW_HEADS * 3).map(|i| Hash::repeat_byte(i as u8)); + + virtual_overseer.send( + FromOverseer::Signal(OverseerSignal::ActiveLeaves( + // These are in reverse order, so the subsystem must sort internally to + // get the correct view. + ActiveLeavesUpdate { + activated: hashes.enumerate().map(|(i, h)| ActivatedLeaf { + hash: h, + number: i as _, + span: Arc::new(jaeger::Span::Disabled), + }).rev().collect(), + deactivated: Default::default(), + } + )) + ).await; + + let view_heads = (MAX_VIEW_HEADS * 2 .. MAX_VIEW_HEADS * 3).rev() + .map(|i| (Hash::repeat_byte(i as u8), Arc::new(jaeger::Span::Disabled)) ); + + let our_view = OurView::new( + view_heads, + 0, + ); + + assert_sends_validation_event_to_all( + NetworkBridgeEvent::OurViewChange(our_view.clone()), + &mut virtual_overseer, + ).await; + + assert_sends_collation_event_to_all( + NetworkBridgeEvent::OurViewChange(our_view), + &mut virtual_overseer, + ).await; + }); + } } diff --git a/polkadot/node/network/collator-protocol/src/collator_side.rs b/polkadot/node/network/collator-protocol/src/collator_side.rs index a46da32b126..32d0cef7bf7 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side.rs @@ -955,7 +955,7 @@ mod tests { use polkadot_subsystem::{ jaeger, messages::{RuntimeApiMessage, RuntimeApiRequest}, - ActiveLeavesUpdate, + ActiveLeavesUpdate, ActivatedLeaf, }; use polkadot_subsystem_testhelpers as test_helpers; @@ -1215,7 +1215,11 @@ mod tests { overseer_signal( virtual_overseer, OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { - activated: [(test_state.relay_parent, Arc::new(jaeger::Span::Disabled))][..].into(), + activated: vec![ActivatedLeaf { + hash: test_state.relay_parent, + number: 1, + span: Arc::new(jaeger::Span::Disabled), + }].into(), deactivated: [][..].into(), }), ).await; diff --git a/polkadot/node/network/gossip-support/src/lib.rs b/polkadot/node/network/gossip-support/src/lib.rs index bbda8f0a0dd..0d9fb55abcc 100644 --- a/polkadot/node/network/gossip-support/src/lib.rs +++ b/polkadot/node/network/gossip-support/src/lib.rs @@ -80,7 +80,7 @@ impl GossipSupport { })) => { tracing::trace!(target: LOG_TARGET, "active leaves signal"); - let leaves = activated.into_iter().map(|(h, _)| h); + let leaves = activated.into_iter().map(|a| a.hash); if let Err(e) = state.handle_active_leaves(&mut ctx, leaves).await { tracing::debug!(target: LOG_TARGET, error = ?e); } diff --git a/polkadot/node/network/pov-distribution/src/lib.rs b/polkadot/node/network/pov-distribution/src/lib.rs index bc8812b5727..add86b02aee 100644 --- a/polkadot/node/network/pov-distribution/src/lib.rs +++ b/polkadot/node/network/pov-distribution/src/lib.rs @@ -153,10 +153,11 @@ async fn handle_signal( OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }) => { let _timer = state.metrics.time_handle_signal(); - for (relay_parent, span) in activated { - let _span = span.child("pov-dist") + for activated in activated { + let _span = activated.span.child("pov-dist") .with_stage(jaeger::Stage::PoVDistribution); + let relay_parent = activated.hash; match request_validators_ctx(relay_parent, ctx).await { Ok(vals_rx) => { let n_validators = match vals_rx.await? { diff --git a/polkadot/node/network/pov-distribution/src/tests.rs b/polkadot/node/network/pov-distribution/src/tests.rs index 6d07eac6b5d..cf545fc406d 100644 --- a/polkadot/node/network/pov-distribution/src/tests.rs +++ b/polkadot/node/network/pov-distribution/src/tests.rs @@ -25,7 +25,7 @@ use tracing::trace; use sp_keyring::Sr25519Keyring; use polkadot_primitives::v1::{AuthorityDiscoveryId, BlockData, CoreState, GroupRotationInfo, Id as ParaId, ScheduledCore, SessionIndex, SessionInfo, ValidatorIndex}; -use polkadot_subsystem::{messages::{RuntimeApiMessage, RuntimeApiRequest}, jaeger}; +use polkadot_subsystem::{messages::{RuntimeApiMessage, RuntimeApiRequest}, jaeger, ActivatedLeaf}; use polkadot_node_subsystem_test_helpers as test_helpers; use polkadot_node_subsystem_util::TimeoutExt; use polkadot_node_network_protocol::{view, our_view}; @@ -275,7 +275,11 @@ fn ask_validators_for_povs() { overseer_signal( &mut virtual_overseer, OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { - activated: [(test_state.relay_parent, Arc::new(jaeger::Span::Disabled))][..].into(), + activated: vec![ActivatedLeaf { + hash: test_state.relay_parent, + number: 1, + span: Arc::new(jaeger::Span::Disabled), + }].into(), deactivated: [][..].into(), }), ).await; @@ -447,7 +451,11 @@ fn ask_validators_for_povs() { overseer_signal( &mut virtual_overseer, OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { - activated: [(next_leaf, Arc::new(jaeger::Span::Disabled))][..].into(), + activated: vec![ActivatedLeaf { + hash: next_leaf, + number: 2, + span: Arc::new(jaeger::Span::Disabled), + }].into(), deactivated: [current.clone()][..].into(), }) ).await; diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs index 97b60262cfd..22e82b33ca2 100644 --- a/polkadot/node/network/statement-distribution/src/lib.rs +++ b/polkadot/node/network/statement-distribution/src/lib.rs @@ -1016,8 +1016,9 @@ impl StatementDistribution { FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, .. })) => { let _timer = metrics.time_active_leaves_update(); - for (relay_parent, span) in activated { - let span = PerLeafSpan::new(span, "statement-distribution"); + for activated in activated { + let relay_parent = activated.hash; + let span = PerLeafSpan::new(activated.span, "statement-distribution"); let (validators, session_index) = { let (val_tx, val_rx) = oneshot::channel(); @@ -1187,7 +1188,7 @@ mod tests { use sp_keystore::{CryptoStore, SyncCryptoStorePtr, SyncCryptoStore}; use sc_keystore::LocalKeystore; use polkadot_node_network_protocol::{view, ObservedRole, our_view}; - use polkadot_subsystem::jaeger; + use polkadot_subsystem::{jaeger, ActivatedLeaf}; #[test] fn active_head_accepts_only_2_seconded_per_validator() { @@ -1743,7 +1744,11 @@ mod tests { let test_fut = async move { // register our active heads. handle.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { - activated: vec![(hash_a, Arc::new(jaeger::Span::Disabled))].into(), + activated: vec![ActivatedLeaf { + hash: hash_a, + number: 1, + span: Arc::new(jaeger::Span::Disabled), + }].into(), deactivated: vec![].into(), }))).await; diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index c2fe73514cc..ebe2f6b7f0a 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -90,7 +90,7 @@ use polkadot_subsystem::messages::{ }; pub use polkadot_subsystem::{ Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult, - SpawnedSubsystem, ActiveLeavesUpdate, DummySubsystem, jaeger, + SpawnedSubsystem, ActiveLeavesUpdate, ActivatedLeaf, DummySubsystem, jaeger, }; use polkadot_node_subsystem_util::{TimeoutExt, metrics::{self, prometheus}, metered, Metronome}; use polkadot_node_primitives::SpawnNamed; @@ -1898,7 +1898,11 @@ where for (hash, number) in std::mem::take(&mut self.leaves) { let _ = self.active_leaves.insert(hash, number); let span = self.on_head_activated(&hash, None); - update.activated.push((hash, span)); + update.activated.push(ActivatedLeaf { + hash, + number, + span, + }); } if !update.is_empty() { @@ -1982,7 +1986,11 @@ where }; let span = self.on_head_activated(&block.hash, Some(block.parent_hash)); - let mut update = ActiveLeavesUpdate::start_work(block.hash, span); + let mut update = ActiveLeavesUpdate::start_work(ActivatedLeaf { + hash: block.hash, + number: block.number, + span + }); if let Some(number) = self.active_leaves.remove(&block.parent_hash) { debug_assert_eq!(block.number.saturating_sub(1), number); @@ -2602,16 +2610,25 @@ mod tests { handler.block_imported(third_block).await; let expected_heartbeats = vec![ - OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work( - first_block_hash, - Arc::new(jaeger::Span::Disabled), - )), + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf { + hash: first_block_hash, + number: 1, + span: Arc::new(jaeger::Span::Disabled), + })), OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { - activated: [(second_block_hash, Arc::new(jaeger::Span::Disabled))].as_ref().into(), + activated: [ActivatedLeaf { + hash: second_block_hash, + number: 2, + span: Arc::new(jaeger::Span::Disabled), + }].as_ref().into(), deactivated: [first_block_hash].as_ref().into(), }), OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { - activated: [(third_block_hash, Arc::new(jaeger::Span::Disabled))].as_ref().into(), + activated: [ActivatedLeaf { + hash: third_block_hash, + number: 3, + span: Arc::new(jaeger::Span::Disabled), + }].as_ref().into(), deactivated: [second_block_hash].as_ref().into(), }), ]; @@ -2700,8 +2717,16 @@ mod tests { let expected_heartbeats = vec![ OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated: [ - (first_block_hash, Arc::new(jaeger::Span::Disabled)), - (second_block_hash, Arc::new(jaeger::Span::Disabled)), + ActivatedLeaf { + hash: first_block_hash, + number: 1, + span: Arc::new(jaeger::Span::Disabled), + }, + ActivatedLeaf { + hash: second_block_hash, + number: 2, + span: Arc::new(jaeger::Span::Disabled), + }, ].as_ref().into(), ..Default::default() }), @@ -2788,7 +2813,11 @@ mod tests { let expected_heartbeats = vec![ OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated: [ - (imported_block.hash, Arc::new(jaeger::Span::Disabled)), + ActivatedLeaf { + hash: imported_block.hash, + number: imported_block.number, + span: Arc::new(jaeger::Span::Disabled) + } ].as_ref().into(), ..Default::default() }), diff --git a/polkadot/node/subsystem-util/src/lib.rs b/polkadot/node/subsystem-util/src/lib.rs index 835fdc05e82..4b8b3d2c91d 100644 --- a/polkadot/node/subsystem-util/src/lib.rs +++ b/polkadot/node/subsystem-util/src/lib.rs @@ -776,15 +776,15 @@ where activated, deactivated, }))) => { - for (hash, span) in activated { + for activated in activated { let metrics = metrics.clone(); - if let Err(e) = jobs.spawn_job(hash, span, run_args.clone(), metrics) { + if let Err(e) = jobs.spawn_job(activated.hash, activated.span, run_args.clone(), metrics) { tracing::error!( job = Job::NAME, err = ?e, "failed to spawn a job", ); - Self::fwd_err(Some(hash), JobsError::Utility(e), err_tx).await; + Self::fwd_err(Some(activated.hash), JobsError::Utility(e), err_tx).await; return true; } } @@ -1048,7 +1048,7 @@ mod tests { use polkadot_node_jaeger as jaeger; use polkadot_node_subsystem::{ messages::{AllMessages, CandidateSelectionMessage}, ActiveLeavesUpdate, FromOverseer, OverseerSignal, - SpawnedSubsystem, + SpawnedSubsystem, ActivatedLeaf, }; use assert_matches::assert_matches; use futures::{channel::mpsc, executor, StreamExt, future, Future, FutureExt, SinkExt}; @@ -1174,7 +1174,11 @@ mod tests { test_harness(true, |mut overseer_handle, err_rx| async move { overseer_handle .send(FromOverseer::Signal(OverseerSignal::ActiveLeaves( - ActiveLeavesUpdate::start_work(relay_parent, Arc::new(jaeger::Span::Disabled)), + ActiveLeavesUpdate::start_work(ActivatedLeaf { + hash: relay_parent, + number: 1, + span: Arc::new(jaeger::Span::Disabled), + }), ))) .await; assert_matches!( @@ -1203,7 +1207,11 @@ mod tests { test_harness(true, |mut overseer_handle, err_rx| async move { overseer_handle .send(FromOverseer::Signal(OverseerSignal::ActiveLeaves( - ActiveLeavesUpdate::start_work(relay_parent, Arc::new(jaeger::Span::Disabled)), + ActiveLeavesUpdate::start_work(ActivatedLeaf { + hash: relay_parent, + number: 1, + span: Arc::new(jaeger::Span::Disabled), + }), ))) .await; diff --git a/polkadot/node/subsystem/src/lib.rs b/polkadot/node/subsystem/src/lib.rs index 825614e3b16..f7cba512aba 100644 --- a/polkadot/node/subsystem/src/lib.rs +++ b/polkadot/node/subsystem/src/lib.rs @@ -46,24 +46,35 @@ use self::messages::AllMessages; /// If there are greater than this number of slots, then we fall back to a heap vector. const ACTIVE_LEAVES_SMALLVEC_CAPACITY: usize = 8; +/// Activated leaf. +#[derive(Debug, Clone)] +pub struct ActivatedLeaf { + /// The block hash. + pub hash: Hash, + /// The block number. + pub number: BlockNumber, + /// An associated [`jaeger::Span`]. + /// + /// NOTE: Each span should only be kept active as long as the leaf is considered active and should be dropped + /// when the leaf is deactivated. + pub span: Arc<jaeger::Span>, +} + /// Changes in the set of active leaves: the parachain heads which we care to work on. /// /// Note that the activated and deactivated fields indicate deltas, not complete sets. #[derive(Clone, Default)] pub struct ActiveLeavesUpdate { - /// New relay chain block hashes of interest and their associated [`jaeger::Span`]. - /// - /// NOTE: Each span should only be kept active as long as the leaf is considered active and should be dropped - /// when the leaf is deactivated. - pub activated: SmallVec<[(Hash, Arc<jaeger::Span>); ACTIVE_LEAVES_SMALLVEC_CAPACITY]>, + /// New relay chain blocks of interest. + pub activated: SmallVec<[ActivatedLeaf; ACTIVE_LEAVES_SMALLVEC_CAPACITY]>, /// Relay chain block hashes no longer of interest. pub deactivated: SmallVec<[Hash; ACTIVE_LEAVES_SMALLVEC_CAPACITY]>, } impl ActiveLeavesUpdate { /// Create a ActiveLeavesUpdate with a single activated hash - pub fn start_work(hash: Hash, span: Arc<jaeger::Span>) -> Self { - Self { activated: [(hash, span)][..].into(), ..Default::default() } + pub fn start_work(activated: ActivatedLeaf) -> Self { + Self { activated: [activated][..].into(), ..Default::default() } } /// Create a ActiveLeavesUpdate with a single deactivated hash @@ -83,17 +94,17 @@ impl PartialEq for ActiveLeavesUpdate { /// Instead, it means equality when `activated` and `deactivated` are considered as sets. fn eq(&self, other: &Self) -> bool { self.activated.len() == other.activated.len() && self.deactivated.len() == other.deactivated.len() - && self.activated.iter().all(|a| other.activated.iter().any(|o| a.0 == o.0)) + && self.activated.iter().all(|a| other.activated.iter().any(|o| a.hash == o.hash)) && self.deactivated.iter().all(|a| other.deactivated.contains(a)) } } impl fmt::Debug for ActiveLeavesUpdate { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - struct Activated<'a>(&'a [(Hash, Arc<jaeger::Span>)]); + struct Activated<'a>(&'a [ActivatedLeaf]); impl fmt::Debug for Activated<'_> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_list().entries(self.0.iter().map(|e| e.0)).finish() + f.debug_list().entries(self.0.iter().map(|e| e.hash)).finish() } } diff --git a/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md b/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md index 9ffab8a551c..cedf01d3f88 100644 --- a/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md +++ b/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md @@ -30,7 +30,7 @@ Indicates a change in active leaves. Activated leaves should have jobs, whereas ```rust struct ActiveLeavesUpdate { - activated: [Hash], // in practice, these should probably be a SmallVec + activated: [(Hash, Number)], // in practice, these should probably be a SmallVec deactivated: [Hash], } ``` -- GitLab