From 106bd929cecc08e71720f71fbcb32cd532d67815 Mon Sep 17 00:00:00 2001 From: Peter Goodspeed-Niklaus <coriolinus@users.noreply.github.com> Date: Mon, 27 Jul 2020 10:39:52 +0200 Subject: [PATCH] add ActiveLeavesUpdate, remove StartWork, StopWork (#1458) * add ActiveLeavesUpdate, remove StartWork, StopWork * replace StartWork, StopWork in subsystem crate tests * mechanically update OverseerSignal in other modules * convert overseer to take advantage of new multi-hash update abilities Note: this does not yet convert the tests; some of the tests now freeze: test tests::overseer_start_stop_works ... test tests::overseer_start_stop_works has been running for over 60 seconds test tests::overseer_finalize_works ... test tests::overseer_finalize_works has been running for over 60 seconds * fix broken overseer tests * manually impl PartialEq for ActiveLeavesUpdate, rm trait Equivalent This cleans up the code a bit and makes it easier in the future to do the right thing when comparing ALUs. * use target in all network bridge logging * reduce spamming of and --- polkadot/Cargo.lock | 1 + polkadot/node/core/backing/src/lib.rs | 10 ++-- .../network/bitfield-distribution/src/lib.rs | 41 +++++++++------- polkadot/node/network/bridge/Cargo.toml | 3 +- polkadot/node/network/bridge/src/lib.rs | 48 +++++++----------- .../node/network/pov-distribution/src/lib.rs | 35 ++++++------- .../network/statement-distribution/src/lib.rs | 43 ++++++++-------- polkadot/node/overseer/src/lib.rs | 49 ++++++++++++------- polkadot/node/subsystem/Cargo.toml | 1 + polkadot/node/subsystem/src/lib.rs | 47 ++++++++++++++++-- polkadot/node/subsystem/src/util.rs | 35 +++++++------ 11 files changed, 181 insertions(+), 132 deletions(-) diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 8b0b4066dab..046d2ac4530 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -4670,6 +4670,7 @@ dependencies = [ "polkadot-statement-table", "sc-keystore", "sc-network", + "smallvec 1.4.1", "sp-core", "streamunordered", ] diff --git a/polkadot/node/core/backing/src/lib.rs b/polkadot/node/core/backing/src/lib.rs index 0022a03f76f..89da1716dc0 100644 --- a/polkadot/node/core/backing/src/lib.rs +++ b/polkadot/node/core/backing/src/lib.rs @@ -778,7 +778,7 @@ mod tests { }; use polkadot_subsystem::{ messages::{RuntimeApiRequest, SchedulerRoster}, - FromOverseer, OverseerSignal, + ActiveLeavesUpdate, FromOverseer, OverseerSignal, }; use sp_keyring::Sr25519Keyring; use std::collections::HashMap; @@ -968,7 +968,7 @@ mod tests { ) { // Start work on some new parent. virtual_overseer.send(FromOverseer::Signal( - OverseerSignal::StartWork(test_state.relay_parent)) + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(test_state.relay_parent))) ).await; // Check that subsystem job issues a request for a validator set. @@ -1084,7 +1084,7 @@ mod tests { ); virtual_overseer.send(FromOverseer::Signal( - OverseerSignal::StopWork(test_state.relay_parent)) + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::stop_work(test_state.relay_parent))) ).await; }); } @@ -1202,7 +1202,7 @@ mod tests { assert_eq!(backed[0].0.validator_indices, bitvec::bitvec![Lsb0, u8; 1, 1, 0]); virtual_overseer.send(FromOverseer::Signal( - OverseerSignal::StopWork(test_state.relay_parent)) + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::stop_work(test_state.relay_parent))) ).await; }); } @@ -1476,7 +1476,7 @@ mod tests { ); virtual_overseer.send(FromOverseer::Signal( - OverseerSignal::StopWork(test_state.relay_parent)) + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::stop_work(test_state.relay_parent))) ).await; }); } diff --git a/polkadot/node/network/bitfield-distribution/src/lib.rs b/polkadot/node/network/bitfield-distribution/src/lib.rs index 1fce48d4985..9517504c1f5 100644 --- a/polkadot/node/network/bitfield-distribution/src/lib.rs +++ b/polkadot/node/network/bitfield-distribution/src/lib.rs @@ -28,7 +28,7 @@ use node_primitives::{ProtocolId, View}; use log::{trace, warn}; use polkadot_subsystem::messages::*; use polkadot_subsystem::{ - FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemResult, + ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemResult, }; use polkadot_primitives::v1::{Hash, SignedAvailabilityBitfield, SigningContext, ValidatorId}; use sc_network::ReputationChange; @@ -157,24 +157,27 @@ impl BitfieldDistribution { warn!(target: "bitd", "Failed to handle incomming network messages: {:?}", e); } } - FromOverseer::Signal(OverseerSignal::StartWork(relay_parent)) => { - trace!(target: "bitd", "Start {:?}", relay_parent); - // query basic system parameters once - let (validator_set, signing_context) = - query_basics(&mut ctx, relay_parent).await?; - - let _ = state.per_relay_parent.insert( - relay_parent, - PerRelayParentData { - signing_context, - validator_set, - ..Default::default() - }, - ); - } - FromOverseer::Signal(OverseerSignal::StopWork(relay_parent)) => { - trace!(target: "bitd", "Stop {:?}", relay_parent); - // defer the cleanup to the view change + FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated })) => { + for relay_parent in activated { + trace!(target: "bitd", "Start {:?}", relay_parent); + // query basic system parameters once + let (validator_set, signing_context) = + query_basics(&mut ctx, relay_parent).await?; + + let _ = state.per_relay_parent.insert( + relay_parent, + PerRelayParentData { + signing_context, + validator_set, + ..Default::default() + }, + ); + } + + for relay_parent in deactivated { + trace!(target: "bitd", "Stop {:?}", relay_parent); + // defer the cleanup to the view change + } } FromOverseer::Signal(OverseerSignal::Conclude) => { trace!(target: "bitd", "Conclude"); diff --git a/polkadot/node/network/bridge/Cargo.toml b/polkadot/node/network/bridge/Cargo.toml index 3bd4b6702b0..25a9dc62c24 100644 --- a/polkadot/node/network/bridge/Cargo.toml +++ b/polkadot/node/network/bridge/Cargo.toml @@ -17,6 +17,7 @@ sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } [dev-dependencies] -parking_lot = "0.10.0" assert_matches = "1.3.0" +parking_lot = "0.10.0" +polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem", features = [ "test-helpers" ] } sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index 6703be236bd..a1f56945687 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -28,7 +28,7 @@ use sc_network::{ use sp_runtime::ConsensusEngineId; use polkadot_subsystem::{ - FromOverseer, OverseerSignal, Subsystem, SubsystemContext, SpawnedSubsystem, SubsystemError, + ActiveLeavesUpdate, FromOverseer, OverseerSignal, Subsystem, SubsystemContext, SpawnedSubsystem, SubsystemError, SubsystemResult, }; use polkadot_subsystem::messages::{NetworkBridgeEvent, NetworkBridgeMessage, AllMessages}; @@ -57,6 +57,9 @@ const UNKNOWN_PROTO_COST: ReputationChange const MALFORMED_VIEW_COST: ReputationChange = ReputationChange::new(-500, "Malformed view"); +// network bridge log target +const TARGET: &'static str = "network_bridge"; + /// Messages received on the network. #[derive(Debug, Encode, Decode, Clone)] pub enum WireMessage { @@ -203,8 +206,7 @@ enum Action { RegisterEventProducer(ProtocolId, fn(NetworkBridgeEvent) -> AllMessages), SendMessage(Vec<PeerId>, ProtocolId, Vec<u8>), ReportPeer(PeerId, ReputationChange), - StartWork(Hash), - StopWork(Hash), + ActiveLeaves(ActiveLeavesUpdate), PeerConnected(PeerId, ObservedRole), PeerDisconnected(PeerId), @@ -217,10 +219,8 @@ fn action_from_overseer_message( res: polkadot_subsystem::SubsystemResult<FromOverseer<NetworkBridgeMessage>>, ) -> Action { match res { - Ok(FromOverseer::Signal(OverseerSignal::StartWork(relay_parent))) - => Action::StartWork(relay_parent), - Ok(FromOverseer::Signal(OverseerSignal::StopWork(relay_parent))) - => Action::StopWork(relay_parent), + Ok(FromOverseer::Signal(OverseerSignal::ActiveLeaves(active_leaves))) + => Action::ActiveLeaves(active_leaves), Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => Action::Abort, Ok(FromOverseer::Communication { msg }) => match msg { NetworkBridgeMessage::RegisterEventProducer(protocol_id, message_producer) @@ -230,7 +230,7 @@ fn action_from_overseer_message( => Action::SendMessage(peers, protocol, message), }, Err(e) => { - log::warn!("Shutting down Network Bridge due to error {:?}", e); + log::warn!(target: TARGET, "Shutting down Network Bridge due to error {:?}", e); Action::Abort } } @@ -239,7 +239,7 @@ fn action_from_overseer_message( fn action_from_network_message(event: Option<NetworkEvent>) -> Option<Action> { match event { None => { - log::info!("Shutting down Network Bridge: underlying event stream concluded"); + log::info!(target: TARGET, "Shutting down Network Bridge: underlying event stream concluded"); Some(Action::Abort) } Some(NetworkEvent::Dht(_)) => None, @@ -392,23 +392,10 @@ async fn run_network<N: Network>( Action::ReportPeer(peer, rep) => { net.report_peer(peer, rep).await?; } - Action::StartWork(relay_parent) => { - live_heads.push(relay_parent); - if let Some(view_update) - = update_view(&peers, &live_heads, &mut net, &mut local_view).await? - { - if let Err(e) = dispatch_update_to_all( - view_update, - event_producers.values(), - &mut ctx, - ).await { - log::warn!("Aborting - Failure to dispatch messages to overseer"); - return Err(e) - } - } - } - Action::StopWork(relay_parent) => { - live_heads.retain(|h| h != &relay_parent); + Action::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }) => { + live_heads.extend(activated); + live_heads.retain(|h| !deactivated.contains(h)); + if let Some(view_update) = update_view(&peers, &live_heads, &mut net, &mut local_view).await? { @@ -417,12 +404,11 @@ async fn run_network<N: Network>( event_producers.values(), &mut ctx, ).await { - log::warn!("Aborting - Failure to dispatch messages to overseer"); + log::warn!(target: TARGET, "Aborting - Failure to dispatch messages to overseer"); return Err(e) } } } - Action::PeerConnected(peer, role) => { match peers.entry(peer.clone()) { HEntry::Occupied(_) => continue, @@ -450,7 +436,7 @@ async fn run_network<N: Network>( event_producers.values(), &mut ctx, ).await { - log::warn!("Aborting - Failure to dispatch messages to overseer"); + log::warn!(target: TARGET, "Aborting - Failure to dispatch messages to overseer"); return Err(e) } } @@ -510,7 +496,7 @@ async fn run_network<N: Network>( let send_messages = ctx.send_messages(outgoing_messages); if let Err(e) = send_messages.await { - log::warn!("Aborting - Failure to dispatch messages to overseer"); + log::warn!(target: TARGET, "Aborting - Failure to dispatch messages to overseer"); return Err(e) } }, @@ -670,7 +656,7 @@ mod tests { let hash_a = Hash::from([1; 32]); - virtual_overseer.send(FromOverseer::Signal(OverseerSignal::StartWork(hash_a))).await; + virtual_overseer.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(hash_a)))).await; let actions = network_handle.next_network_actions(2).await; let wire_message = WireMessage::ViewUpdate(View(vec![hash_a])).encode(); diff --git a/polkadot/node/network/pov-distribution/src/lib.rs b/polkadot/node/network/pov-distribution/src/lib.rs index e222a89f494..55e3755266c 100644 --- a/polkadot/node/network/pov-distribution/src/lib.rs +++ b/polkadot/node/network/pov-distribution/src/lib.rs @@ -21,7 +21,7 @@ use polkadot_primitives::v1::{Hash, PoV, CandidateDescriptor}; use polkadot_subsystem::{ - OverseerSignal, SubsystemContext, Subsystem, SubsystemResult, FromOverseer, SpawnedSubsystem, + ActiveLeavesUpdate, OverseerSignal, SubsystemContext, Subsystem, SubsystemResult, FromOverseer, SpawnedSubsystem, }; use polkadot_subsystem::messages::{ PoVDistributionMessage, NetworkBridgeEvent, ReputationChange as Rep, PeerId, @@ -107,23 +107,24 @@ async fn handle_signal( ) -> SubsystemResult<bool> { match signal { OverseerSignal::Conclude => Ok(true), - OverseerSignal::StartWork(relay_parent) => { - let (vals_tx, vals_rx) = oneshot::channel(); - ctx.send_message(AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::Validators(vals_tx), - ))).await?; - - state.relay_parent_state.insert(relay_parent, BlockBasedState { - known: HashMap::new(), - fetching: HashMap::new(), - n_validators: vals_rx.await?.len(), - }); + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }) => { + for relay_parent in activated { + let (vals_tx, vals_rx) = oneshot::channel(); + ctx.send_message(AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Validators(vals_tx), + ))).await?; - Ok(false) - } - OverseerSignal::StopWork(relay_parent) => { - state.relay_parent_state.remove(&relay_parent); + state.relay_parent_state.insert(relay_parent, BlockBasedState { + known: HashMap::new(), + fetching: HashMap::new(), + n_validators: vals_rx.await?.len(), + }); + } + + for relay_parent in deactivated { + state.relay_parent_state.remove(&relay_parent); + } Ok(false) } diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs index f57f2745b83..5d9445a9207 100644 --- a/polkadot/node/network/statement-distribution/src/lib.rs +++ b/polkadot/node/network/statement-distribution/src/lib.rs @@ -21,7 +21,7 @@ use polkadot_subsystem::{ Subsystem, SubsystemResult, SubsystemContext, SpawnedSubsystem, - FromOverseer, OverseerSignal, + ActiveLeavesUpdate, FromOverseer, OverseerSignal, }; use polkadot_subsystem::messages::{ AllMessages, NetworkBridgeMessage, NetworkBridgeEvent, StatementDistributionMessage, @@ -840,30 +840,29 @@ async fn run( loop { let message = ctx.recv().await?; match message { - FromOverseer::Signal(OverseerSignal::StartWork(relay_parent)) => { - let (validators, session_index) = { - let (val_tx, val_rx) = oneshot::channel(); - let (session_tx, session_rx) = oneshot::channel(); + FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, .. })) => { + for relay_parent in activated { + let (validators, session_index) = { + let (val_tx, val_rx) = oneshot::channel(); + let (session_tx, session_rx) = oneshot::channel(); + + let val_message = AllMessages::RuntimeApi( + RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::Validators(val_tx)), + ); + let session_message = AllMessages::RuntimeApi( + RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::SigningContext(session_tx)), + ); - let val_message = AllMessages::RuntimeApi( - RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::Validators(val_tx)), - ); - let session_message = AllMessages::RuntimeApi( - RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::SigningContext(session_tx)), - ); + ctx.send_messages( + std::iter::once(val_message).chain(std::iter::once(session_message)) + ).await?; - ctx.send_messages( - std::iter::once(val_message).chain(std::iter::once(session_message)) - ).await?; + (val_rx.await?, session_rx.await?.session_index) + }; - (val_rx.await?, session_rx.await?.session_index) - }; - - active_heads.entry(relay_parent) - .or_insert(ActiveHeadData::new(validators, session_index)); - } - FromOverseer::Signal(OverseerSignal::StopWork(_relay_parent)) => { - // do nothing - we will handle this when our view changes. + active_heads.entry(relay_parent) + .or_insert(ActiveHeadData::new(validators, session_index)); + } } FromOverseer::Signal(OverseerSignal::Conclude) => break, FromOverseer::Communication { msg } => match msg { diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index 6262e65cc44..d7b8b1df6ff 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -83,7 +83,7 @@ use polkadot_subsystem::messages::{ }; pub use polkadot_subsystem::{ Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult, - SpawnedSubsystem, + SpawnedSubsystem, ActiveLeavesUpdate, }; use polkadot_node_primitives::SpawnNamed; @@ -726,12 +726,15 @@ where /// Run the `Overseer`. pub async fn run(mut self) -> SubsystemResult<()> { let leaves = std::mem::take(&mut self.leaves); + let mut update = ActiveLeavesUpdate::default(); for leaf in leaves.into_iter() { - self.broadcast_signal(OverseerSignal::StartWork(leaf.0)).await?; + update.activated.push(leaf.0); self.active_leaves.insert(leaf); } + self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; + loop { while let Poll::Ready(Some(msg)) = poll!(&mut self.events_rx.next()) { match msg { @@ -775,33 +778,35 @@ where } async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> { + let mut update = ActiveLeavesUpdate::default(); + if let Some(parent) = self.active_leaves.take(&(block.parent_hash, block.number - 1)) { - self.broadcast_signal(OverseerSignal::StopWork(parent.0)).await?; + update.deactivated.push(parent.0); } if !self.active_leaves.contains(&(block.hash, block.number)) { - self.broadcast_signal(OverseerSignal::StartWork(block.hash)).await?; + update.activated.push(block.hash); self.active_leaves.insert((block.hash, block.number)); } + self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; + Ok(()) } async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> { - let mut stop_these = Vec::new(); + let mut update = ActiveLeavesUpdate::default(); self.active_leaves.retain(|(h, n)| { if *n <= block.number { - stop_these.push(*h); + update.deactivated.push(*h); false } else { true } }); - for hash in stop_these.into_iter() { - self.broadcast_signal(OverseerSignal::StopWork(hash)).await? - } + self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; Ok(()) } @@ -1273,11 +1278,15 @@ mod tests { handler.block_imported(third_block).await.unwrap(); let expected_heartbeats = vec![ - OverseerSignal::StartWork(first_block_hash), - OverseerSignal::StopWork(first_block_hash), - OverseerSignal::StartWork(second_block_hash), - OverseerSignal::StopWork(second_block_hash), - OverseerSignal::StartWork(third_block_hash), + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(first_block_hash)), + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { + activated: [second_block_hash].as_ref().into(), + deactivated: [first_block_hash].as_ref().into(), + }), + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { + activated: [third_block_hash].as_ref().into(), + deactivated: [second_block_hash].as_ref().into(), + }), ]; loop { @@ -1371,10 +1380,14 @@ mod tests { handler.block_finalized(third_block).await.unwrap(); let expected_heartbeats = vec![ - OverseerSignal::StartWork(first_block_hash), - OverseerSignal::StartWork(second_block_hash), - OverseerSignal::StopWork(first_block_hash), - OverseerSignal::StopWork(second_block_hash), + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { + activated: [first_block_hash, second_block_hash].as_ref().into(), + ..Default::default() + }), + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { + deactivated: [first_block_hash, second_block_hash].as_ref().into(), + ..Default::default() + }), ]; loop { diff --git a/polkadot/node/subsystem/Cargo.toml b/polkadot/node/subsystem/Cargo.toml index 701a197c2fc..3befba7f7b2 100644 --- a/polkadot/node/subsystem/Cargo.toml +++ b/polkadot/node/subsystem/Cargo.toml @@ -19,6 +19,7 @@ polkadot-node-primitives = { path = "../primitives" } polkadot-primitives = { path = "../../primitives" } polkadot-statement-table = { path = "../../statement-table" } sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } +smallvec = "1.4.1" sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } streamunordered = "0.5.1" diff --git a/polkadot/node/subsystem/src/lib.rs b/polkadot/node/subsystem/src/lib.rs index 430a8418d90..0bdc9ffcf8b 100644 --- a/polkadot/node/subsystem/src/lib.rs +++ b/polkadot/node/subsystem/src/lib.rs @@ -30,6 +30,7 @@ use futures::future::BoxFuture; use polkadot_primitives::v1::Hash; use async_trait::async_trait; +use smallvec::SmallVec; use crate::messages::AllMessages; @@ -38,13 +39,51 @@ pub mod util; #[cfg(any(test, feature = "test-helpers"))] pub mod test_helpers; +/// How many slots are stack-reserved for active leaves updates +/// +/// If there are fewer than this number of slots, then we've wasted some stack space. +/// If there are greater than this number of slots, then we fall back to a heap vector. +const ACTIVE_LEAVES_SMALLVEC_CAPACITY: usize = 8; + +/// Changes in the set of active leaves: the parachain heads which we care to work on. +/// +/// Note that the activated and deactivated fields indicate deltas, not complete sets. +#[derive(Clone, Debug, Default, Eq)] +pub struct ActiveLeavesUpdate { + /// New relay chain block hashes of interest. + pub activated: SmallVec<[Hash; ACTIVE_LEAVES_SMALLVEC_CAPACITY]>, + /// Relay chain block hashes no longer of interest. + pub deactivated: SmallVec<[Hash; ACTIVE_LEAVES_SMALLVEC_CAPACITY]>, +} + +impl ActiveLeavesUpdate { + /// Create a ActiveLeavesUpdate with a single activated hash + pub fn start_work(hash: Hash) -> Self { + Self { activated: [hash].as_ref().into(), ..Default::default() } + } + + /// Create a ActiveLeavesUpdate with a single deactivated hash + pub fn stop_work(hash: Hash) -> Self { + Self { deactivated: [hash].as_ref().into(), ..Default::default() } + } +} + +impl PartialEq for ActiveLeavesUpdate { + /// Equality for `ActiveLeavesUpdate` doesnt imply bitwise equality. + /// + /// Instead, it means equality when `activated` and `deactivated` are considered as sets. + fn eq(&self, other: &Self) -> bool { + use std::collections::HashSet; + self.activated.iter().collect::<HashSet<_>>() == other.activated.iter().collect::<HashSet<_>>() && + self.deactivated.iter().collect::<HashSet<_>>() == other.deactivated.iter().collect::<HashSet<_>>() + } +} + /// Signals sent by an overseer to a subsystem. #[derive(PartialEq, Clone, Debug)] pub enum OverseerSignal { - /// `Subsystem` should start working on block-based work, given by the relay-chain block hash. - StartWork(Hash), - /// `Subsystem` should stop working on block-based work specified by the relay-chain block hash. - StopWork(Hash), + /// Subsystems should adjust their jobs to start and stop work on appropriate block hashes. + ActiveLeaves(ActiveLeavesUpdate), /// Conclude the work of the `Overseer` and all `Subsystem`s. Conclude, } diff --git a/polkadot/node/subsystem/src/util.rs b/polkadot/node/subsystem/src/util.rs index ee5cf2ded28..d37e46ad05c 100644 --- a/polkadot/node/subsystem/src/util.rs +++ b/polkadot/node/subsystem/src/util.rs @@ -602,21 +602,25 @@ where err_tx: &mut Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>> ) -> bool { use crate::FromOverseer::{Communication, Signal}; - use crate::OverseerSignal::{Conclude, StartWork, StopWork}; + use crate::ActiveLeavesUpdate; + use crate::OverseerSignal::{Conclude, ActiveLeaves}; match incoming { - Ok(Signal(StartWork(hash))) => { - if let Err(e) = jobs.spawn_job(hash, run_args.clone()) { - log::error!("Failed to spawn a job: {:?}", e); - Self::fwd_err(Some(hash), e.into(), err_tx).await; - return true; + Ok(Signal(ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }))) => { + for hash in activated { + if let Err(e) = jobs.spawn_job(hash, run_args.clone()) { + log::error!("Failed to spawn a job: {:?}", e); + Self::fwd_err(Some(hash), e.into(), err_tx).await; + return true; + } } - } - Ok(Signal(StopWork(hash))) => { - if let Err(e) = jobs.stop_job(hash).await { - log::error!("Failed to stop a job: {:?}", e); - Self::fwd_err(Some(hash), e.into(), err_tx).await; - return true; + + for hash in deactivated { + if let Err(e) = jobs.stop_job(hash).await { + log::error!("Failed to stop a job: {:?}", e); + Self::fwd_err(Some(hash), e.into(), err_tx).await; + return true; + } } } Ok(Signal(Conclude)) => { @@ -725,6 +729,7 @@ mod tests { JobTrait, ToJobTrait, }, + ActiveLeavesUpdate, FromOverseer, OverseerSignal, }; @@ -920,12 +925,12 @@ mod tests { run_args.insert(relay_parent.clone(), vec![FromJob::Test(test_message.clone())]); test_harness(run_args, |mut overseer_handle, err_rx| async move { - overseer_handle.send(FromOverseer::Signal(OverseerSignal::StartWork(relay_parent))).await; + overseer_handle.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(relay_parent)))).await; assert_matches!( overseer_handle.recv().await, AllMessages::Test(msg) if msg == test_message ); - overseer_handle.send(FromOverseer::Signal(OverseerSignal::StopWork(relay_parent))).await; + overseer_handle.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::stop_work(relay_parent)))).await; let errs: Vec<_> = err_rx.collect().await; assert_eq!(errs.len(), 0); @@ -938,7 +943,7 @@ mod tests { let run_args = HashMap::new(); test_harness(run_args, |mut overseer_handle, err_rx| async move { - overseer_handle.send(FromOverseer::Signal(OverseerSignal::StopWork(relay_parent))).await; + overseer_handle.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::stop_work(relay_parent)))).await; let errs: Vec<_> = err_rx.collect().await; assert_eq!(errs.len(), 1); -- GitLab