diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 8b0b4066dab8ac718e7f14f70724132620c240ce..046d2ac45300c61cb3bb507e226d595aa406e30c 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -4670,6 +4670,7 @@ dependencies = [ "polkadot-statement-table", "sc-keystore", "sc-network", + "smallvec 1.4.1", "sp-core", "streamunordered", ] diff --git a/polkadot/node/core/backing/src/lib.rs b/polkadot/node/core/backing/src/lib.rs index 0022a03f76f0b6fe721094b2fa1a32c9ddefde44..89da1716dc0733b739fd16a156346a9a5c7f0e1c 100644 --- a/polkadot/node/core/backing/src/lib.rs +++ b/polkadot/node/core/backing/src/lib.rs @@ -778,7 +778,7 @@ mod tests { }; use polkadot_subsystem::{ messages::{RuntimeApiRequest, SchedulerRoster}, - FromOverseer, OverseerSignal, + ActiveLeavesUpdate, FromOverseer, OverseerSignal, }; use sp_keyring::Sr25519Keyring; use std::collections::HashMap; @@ -968,7 +968,7 @@ mod tests { ) { // Start work on some new parent. virtual_overseer.send(FromOverseer::Signal( - OverseerSignal::StartWork(test_state.relay_parent)) + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(test_state.relay_parent))) ).await; // Check that subsystem job issues a request for a validator set. @@ -1084,7 +1084,7 @@ mod tests { ); virtual_overseer.send(FromOverseer::Signal( - OverseerSignal::StopWork(test_state.relay_parent)) + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::stop_work(test_state.relay_parent))) ).await; }); } @@ -1202,7 +1202,7 @@ mod tests { assert_eq!(backed[0].0.validator_indices, bitvec::bitvec![Lsb0, u8; 1, 1, 0]); virtual_overseer.send(FromOverseer::Signal( - OverseerSignal::StopWork(test_state.relay_parent)) + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::stop_work(test_state.relay_parent))) ).await; }); } @@ -1476,7 +1476,7 @@ mod tests { ); virtual_overseer.send(FromOverseer::Signal( - OverseerSignal::StopWork(test_state.relay_parent)) + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::stop_work(test_state.relay_parent))) ).await; }); } diff --git a/polkadot/node/network/bitfield-distribution/src/lib.rs b/polkadot/node/network/bitfield-distribution/src/lib.rs index 1fce48d4985e34da01e75544181489d76762bc42..9517504c1f521525526ef8bae836cdc46e1394ac 100644 --- a/polkadot/node/network/bitfield-distribution/src/lib.rs +++ b/polkadot/node/network/bitfield-distribution/src/lib.rs @@ -28,7 +28,7 @@ use node_primitives::{ProtocolId, View}; use log::{trace, warn}; use polkadot_subsystem::messages::*; use polkadot_subsystem::{ - FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemResult, + ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemResult, }; use polkadot_primitives::v1::{Hash, SignedAvailabilityBitfield, SigningContext, ValidatorId}; use sc_network::ReputationChange; @@ -157,24 +157,27 @@ impl BitfieldDistribution { warn!(target: "bitd", "Failed to handle incomming network messages: {:?}", e); } } - FromOverseer::Signal(OverseerSignal::StartWork(relay_parent)) => { - trace!(target: "bitd", "Start {:?}", relay_parent); - // query basic system parameters once - let (validator_set, signing_context) = - query_basics(&mut ctx, relay_parent).await?; - - let _ = state.per_relay_parent.insert( - relay_parent, - PerRelayParentData { - signing_context, - validator_set, - ..Default::default() - }, - ); - } - FromOverseer::Signal(OverseerSignal::StopWork(relay_parent)) => { - trace!(target: "bitd", "Stop {:?}", relay_parent); - // defer the cleanup to the view change + FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated })) => { + for relay_parent in activated { + trace!(target: "bitd", "Start {:?}", relay_parent); + // query basic system parameters once + let (validator_set, signing_context) = + query_basics(&mut ctx, relay_parent).await?; + + let _ = state.per_relay_parent.insert( + relay_parent, + PerRelayParentData { + signing_context, + validator_set, + ..Default::default() + }, + ); + } + + for relay_parent in deactivated { + trace!(target: "bitd", "Stop {:?}", relay_parent); + // defer the cleanup to the view change + } } FromOverseer::Signal(OverseerSignal::Conclude) => { trace!(target: "bitd", "Conclude"); diff --git a/polkadot/node/network/bridge/Cargo.toml b/polkadot/node/network/bridge/Cargo.toml index 3bd4b6702b0b5a6a1c57aa86a24368af9e943d07..25a9dc62c24070c09bfaffd36c6eaafc13819b39 100644 --- a/polkadot/node/network/bridge/Cargo.toml +++ b/polkadot/node/network/bridge/Cargo.toml @@ -17,6 +17,7 @@ sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } [dev-dependencies] -parking_lot = "0.10.0" assert_matches = "1.3.0" +parking_lot = "0.10.0" +polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem", features = [ "test-helpers" ] } sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index 6703be236bd50e992ad69c285d30b3f2f87d0149..a1f56945687a48c526f38e5461accbe079ecd698 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -28,7 +28,7 @@ use sc_network::{ use sp_runtime::ConsensusEngineId; use polkadot_subsystem::{ - FromOverseer, OverseerSignal, Subsystem, SubsystemContext, SpawnedSubsystem, SubsystemError, + ActiveLeavesUpdate, FromOverseer, OverseerSignal, Subsystem, SubsystemContext, SpawnedSubsystem, SubsystemError, SubsystemResult, }; use polkadot_subsystem::messages::{NetworkBridgeEvent, NetworkBridgeMessage, AllMessages}; @@ -57,6 +57,9 @@ const UNKNOWN_PROTO_COST: ReputationChange const MALFORMED_VIEW_COST: ReputationChange = ReputationChange::new(-500, "Malformed view"); +// network bridge log target +const TARGET: &'static str = "network_bridge"; + /// Messages received on the network. #[derive(Debug, Encode, Decode, Clone)] pub enum WireMessage { @@ -203,8 +206,7 @@ enum Action { RegisterEventProducer(ProtocolId, fn(NetworkBridgeEvent) -> AllMessages), SendMessage(Vec<PeerId>, ProtocolId, Vec<u8>), ReportPeer(PeerId, ReputationChange), - StartWork(Hash), - StopWork(Hash), + ActiveLeaves(ActiveLeavesUpdate), PeerConnected(PeerId, ObservedRole), PeerDisconnected(PeerId), @@ -217,10 +219,8 @@ fn action_from_overseer_message( res: polkadot_subsystem::SubsystemResult<FromOverseer<NetworkBridgeMessage>>, ) -> Action { match res { - Ok(FromOverseer::Signal(OverseerSignal::StartWork(relay_parent))) - => Action::StartWork(relay_parent), - Ok(FromOverseer::Signal(OverseerSignal::StopWork(relay_parent))) - => Action::StopWork(relay_parent), + Ok(FromOverseer::Signal(OverseerSignal::ActiveLeaves(active_leaves))) + => Action::ActiveLeaves(active_leaves), Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => Action::Abort, Ok(FromOverseer::Communication { msg }) => match msg { NetworkBridgeMessage::RegisterEventProducer(protocol_id, message_producer) @@ -230,7 +230,7 @@ fn action_from_overseer_message( => Action::SendMessage(peers, protocol, message), }, Err(e) => { - log::warn!("Shutting down Network Bridge due to error {:?}", e); + log::warn!(target: TARGET, "Shutting down Network Bridge due to error {:?}", e); Action::Abort } } @@ -239,7 +239,7 @@ fn action_from_overseer_message( fn action_from_network_message(event: Option<NetworkEvent>) -> Option<Action> { match event { None => { - log::info!("Shutting down Network Bridge: underlying event stream concluded"); + log::info!(target: TARGET, "Shutting down Network Bridge: underlying event stream concluded"); Some(Action::Abort) } Some(NetworkEvent::Dht(_)) => None, @@ -392,23 +392,10 @@ async fn run_network<N: Network>( Action::ReportPeer(peer, rep) => { net.report_peer(peer, rep).await?; } - Action::StartWork(relay_parent) => { - live_heads.push(relay_parent); - if let Some(view_update) - = update_view(&peers, &live_heads, &mut net, &mut local_view).await? - { - if let Err(e) = dispatch_update_to_all( - view_update, - event_producers.values(), - &mut ctx, - ).await { - log::warn!("Aborting - Failure to dispatch messages to overseer"); - return Err(e) - } - } - } - Action::StopWork(relay_parent) => { - live_heads.retain(|h| h != &relay_parent); + Action::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }) => { + live_heads.extend(activated); + live_heads.retain(|h| !deactivated.contains(h)); + if let Some(view_update) = update_view(&peers, &live_heads, &mut net, &mut local_view).await? { @@ -417,12 +404,11 @@ async fn run_network<N: Network>( event_producers.values(), &mut ctx, ).await { - log::warn!("Aborting - Failure to dispatch messages to overseer"); + log::warn!(target: TARGET, "Aborting - Failure to dispatch messages to overseer"); return Err(e) } } } - Action::PeerConnected(peer, role) => { match peers.entry(peer.clone()) { HEntry::Occupied(_) => continue, @@ -450,7 +436,7 @@ async fn run_network<N: Network>( event_producers.values(), &mut ctx, ).await { - log::warn!("Aborting - Failure to dispatch messages to overseer"); + log::warn!(target: TARGET, "Aborting - Failure to dispatch messages to overseer"); return Err(e) } } @@ -510,7 +496,7 @@ async fn run_network<N: Network>( let send_messages = ctx.send_messages(outgoing_messages); if let Err(e) = send_messages.await { - log::warn!("Aborting - Failure to dispatch messages to overseer"); + log::warn!(target: TARGET, "Aborting - Failure to dispatch messages to overseer"); return Err(e) } }, @@ -670,7 +656,7 @@ mod tests { let hash_a = Hash::from([1; 32]); - virtual_overseer.send(FromOverseer::Signal(OverseerSignal::StartWork(hash_a))).await; + virtual_overseer.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(hash_a)))).await; let actions = network_handle.next_network_actions(2).await; let wire_message = WireMessage::ViewUpdate(View(vec![hash_a])).encode(); diff --git a/polkadot/node/network/pov-distribution/src/lib.rs b/polkadot/node/network/pov-distribution/src/lib.rs index e222a89f494619584379c021e0e62e5fc80ad030..55e3755266c877f295086f84d872cfd18770c531 100644 --- a/polkadot/node/network/pov-distribution/src/lib.rs +++ b/polkadot/node/network/pov-distribution/src/lib.rs @@ -21,7 +21,7 @@ use polkadot_primitives::v1::{Hash, PoV, CandidateDescriptor}; use polkadot_subsystem::{ - OverseerSignal, SubsystemContext, Subsystem, SubsystemResult, FromOverseer, SpawnedSubsystem, + ActiveLeavesUpdate, OverseerSignal, SubsystemContext, Subsystem, SubsystemResult, FromOverseer, SpawnedSubsystem, }; use polkadot_subsystem::messages::{ PoVDistributionMessage, NetworkBridgeEvent, ReputationChange as Rep, PeerId, @@ -107,23 +107,24 @@ async fn handle_signal( ) -> SubsystemResult<bool> { match signal { OverseerSignal::Conclude => Ok(true), - OverseerSignal::StartWork(relay_parent) => { - let (vals_tx, vals_rx) = oneshot::channel(); - ctx.send_message(AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::Validators(vals_tx), - ))).await?; - - state.relay_parent_state.insert(relay_parent, BlockBasedState { - known: HashMap::new(), - fetching: HashMap::new(), - n_validators: vals_rx.await?.len(), - }); + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }) => { + for relay_parent in activated { + let (vals_tx, vals_rx) = oneshot::channel(); + ctx.send_message(AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Validators(vals_tx), + ))).await?; - Ok(false) - } - OverseerSignal::StopWork(relay_parent) => { - state.relay_parent_state.remove(&relay_parent); + state.relay_parent_state.insert(relay_parent, BlockBasedState { + known: HashMap::new(), + fetching: HashMap::new(), + n_validators: vals_rx.await?.len(), + }); + } + + for relay_parent in deactivated { + state.relay_parent_state.remove(&relay_parent); + } Ok(false) } diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs index f57f2745b8371c021aa6d8bb0f04fa8a363c02ce..5d9445a9207fe80ca7e00ba88db77472a5c0d93a 100644 --- a/polkadot/node/network/statement-distribution/src/lib.rs +++ b/polkadot/node/network/statement-distribution/src/lib.rs @@ -21,7 +21,7 @@ use polkadot_subsystem::{ Subsystem, SubsystemResult, SubsystemContext, SpawnedSubsystem, - FromOverseer, OverseerSignal, + ActiveLeavesUpdate, FromOverseer, OverseerSignal, }; use polkadot_subsystem::messages::{ AllMessages, NetworkBridgeMessage, NetworkBridgeEvent, StatementDistributionMessage, @@ -840,30 +840,29 @@ async fn run( loop { let message = ctx.recv().await?; match message { - FromOverseer::Signal(OverseerSignal::StartWork(relay_parent)) => { - let (validators, session_index) = { - let (val_tx, val_rx) = oneshot::channel(); - let (session_tx, session_rx) = oneshot::channel(); + FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, .. })) => { + for relay_parent in activated { + let (validators, session_index) = { + let (val_tx, val_rx) = oneshot::channel(); + let (session_tx, session_rx) = oneshot::channel(); + + let val_message = AllMessages::RuntimeApi( + RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::Validators(val_tx)), + ); + let session_message = AllMessages::RuntimeApi( + RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::SigningContext(session_tx)), + ); - let val_message = AllMessages::RuntimeApi( - RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::Validators(val_tx)), - ); - let session_message = AllMessages::RuntimeApi( - RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::SigningContext(session_tx)), - ); + ctx.send_messages( + std::iter::once(val_message).chain(std::iter::once(session_message)) + ).await?; - ctx.send_messages( - std::iter::once(val_message).chain(std::iter::once(session_message)) - ).await?; + (val_rx.await?, session_rx.await?.session_index) + }; - (val_rx.await?, session_rx.await?.session_index) - }; - - active_heads.entry(relay_parent) - .or_insert(ActiveHeadData::new(validators, session_index)); - } - FromOverseer::Signal(OverseerSignal::StopWork(_relay_parent)) => { - // do nothing - we will handle this when our view changes. + active_heads.entry(relay_parent) + .or_insert(ActiveHeadData::new(validators, session_index)); + } } FromOverseer::Signal(OverseerSignal::Conclude) => break, FromOverseer::Communication { msg } => match msg { diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index 6262e65cc44b87f091bae65436aa8b3b82c91298..d7b8b1df6ff8605dabd812c7ee746f5d3c1a8467 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -83,7 +83,7 @@ use polkadot_subsystem::messages::{ }; pub use polkadot_subsystem::{ Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult, - SpawnedSubsystem, + SpawnedSubsystem, ActiveLeavesUpdate, }; use polkadot_node_primitives::SpawnNamed; @@ -726,12 +726,15 @@ where /// Run the `Overseer`. pub async fn run(mut self) -> SubsystemResult<()> { let leaves = std::mem::take(&mut self.leaves); + let mut update = ActiveLeavesUpdate::default(); for leaf in leaves.into_iter() { - self.broadcast_signal(OverseerSignal::StartWork(leaf.0)).await?; + update.activated.push(leaf.0); self.active_leaves.insert(leaf); } + self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; + loop { while let Poll::Ready(Some(msg)) = poll!(&mut self.events_rx.next()) { match msg { @@ -775,33 +778,35 @@ where } async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> { + let mut update = ActiveLeavesUpdate::default(); + if let Some(parent) = self.active_leaves.take(&(block.parent_hash, block.number - 1)) { - self.broadcast_signal(OverseerSignal::StopWork(parent.0)).await?; + update.deactivated.push(parent.0); } if !self.active_leaves.contains(&(block.hash, block.number)) { - self.broadcast_signal(OverseerSignal::StartWork(block.hash)).await?; + update.activated.push(block.hash); self.active_leaves.insert((block.hash, block.number)); } + self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; + Ok(()) } async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> { - let mut stop_these = Vec::new(); + let mut update = ActiveLeavesUpdate::default(); self.active_leaves.retain(|(h, n)| { if *n <= block.number { - stop_these.push(*h); + update.deactivated.push(*h); false } else { true } }); - for hash in stop_these.into_iter() { - self.broadcast_signal(OverseerSignal::StopWork(hash)).await? - } + self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; Ok(()) } @@ -1273,11 +1278,15 @@ mod tests { handler.block_imported(third_block).await.unwrap(); let expected_heartbeats = vec![ - OverseerSignal::StartWork(first_block_hash), - OverseerSignal::StopWork(first_block_hash), - OverseerSignal::StartWork(second_block_hash), - OverseerSignal::StopWork(second_block_hash), - OverseerSignal::StartWork(third_block_hash), + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(first_block_hash)), + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { + activated: [second_block_hash].as_ref().into(), + deactivated: [first_block_hash].as_ref().into(), + }), + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { + activated: [third_block_hash].as_ref().into(), + deactivated: [second_block_hash].as_ref().into(), + }), ]; loop { @@ -1371,10 +1380,14 @@ mod tests { handler.block_finalized(third_block).await.unwrap(); let expected_heartbeats = vec![ - OverseerSignal::StartWork(first_block_hash), - OverseerSignal::StartWork(second_block_hash), - OverseerSignal::StopWork(first_block_hash), - OverseerSignal::StopWork(second_block_hash), + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { + activated: [first_block_hash, second_block_hash].as_ref().into(), + ..Default::default() + }), + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { + deactivated: [first_block_hash, second_block_hash].as_ref().into(), + ..Default::default() + }), ]; loop { diff --git a/polkadot/node/subsystem/Cargo.toml b/polkadot/node/subsystem/Cargo.toml index 701a197c2fce596555042891097db90b74e5db4e..3befba7f7b27e6e79f22a2a99da129f7583f8be7 100644 --- a/polkadot/node/subsystem/Cargo.toml +++ b/polkadot/node/subsystem/Cargo.toml @@ -19,6 +19,7 @@ polkadot-node-primitives = { path = "../primitives" } polkadot-primitives = { path = "../../primitives" } polkadot-statement-table = { path = "../../statement-table" } sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } +smallvec = "1.4.1" sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } streamunordered = "0.5.1" diff --git a/polkadot/node/subsystem/src/lib.rs b/polkadot/node/subsystem/src/lib.rs index 430a8418d90b47d7678e8c16e45495e29b491635..0bdc9ffcf8b40eed6aa900f3364a3093d2c523cf 100644 --- a/polkadot/node/subsystem/src/lib.rs +++ b/polkadot/node/subsystem/src/lib.rs @@ -30,6 +30,7 @@ use futures::future::BoxFuture; use polkadot_primitives::v1::Hash; use async_trait::async_trait; +use smallvec::SmallVec; use crate::messages::AllMessages; @@ -38,13 +39,51 @@ pub mod util; #[cfg(any(test, feature = "test-helpers"))] pub mod test_helpers; +/// How many slots are stack-reserved for active leaves updates +/// +/// If there are fewer than this number of slots, then we've wasted some stack space. +/// If there are greater than this number of slots, then we fall back to a heap vector. +const ACTIVE_LEAVES_SMALLVEC_CAPACITY: usize = 8; + +/// Changes in the set of active leaves: the parachain heads which we care to work on. +/// +/// Note that the activated and deactivated fields indicate deltas, not complete sets. +#[derive(Clone, Debug, Default, Eq)] +pub struct ActiveLeavesUpdate { + /// New relay chain block hashes of interest. + pub activated: SmallVec<[Hash; ACTIVE_LEAVES_SMALLVEC_CAPACITY]>, + /// Relay chain block hashes no longer of interest. + pub deactivated: SmallVec<[Hash; ACTIVE_LEAVES_SMALLVEC_CAPACITY]>, +} + +impl ActiveLeavesUpdate { + /// Create a ActiveLeavesUpdate with a single activated hash + pub fn start_work(hash: Hash) -> Self { + Self { activated: [hash].as_ref().into(), ..Default::default() } + } + + /// Create a ActiveLeavesUpdate with a single deactivated hash + pub fn stop_work(hash: Hash) -> Self { + Self { deactivated: [hash].as_ref().into(), ..Default::default() } + } +} + +impl PartialEq for ActiveLeavesUpdate { + /// Equality for `ActiveLeavesUpdate` doesnt imply bitwise equality. + /// + /// Instead, it means equality when `activated` and `deactivated` are considered as sets. + fn eq(&self, other: &Self) -> bool { + use std::collections::HashSet; + self.activated.iter().collect::<HashSet<_>>() == other.activated.iter().collect::<HashSet<_>>() && + self.deactivated.iter().collect::<HashSet<_>>() == other.deactivated.iter().collect::<HashSet<_>>() + } +} + /// Signals sent by an overseer to a subsystem. #[derive(PartialEq, Clone, Debug)] pub enum OverseerSignal { - /// `Subsystem` should start working on block-based work, given by the relay-chain block hash. - StartWork(Hash), - /// `Subsystem` should stop working on block-based work specified by the relay-chain block hash. - StopWork(Hash), + /// Subsystems should adjust their jobs to start and stop work on appropriate block hashes. + ActiveLeaves(ActiveLeavesUpdate), /// Conclude the work of the `Overseer` and all `Subsystem`s. Conclude, } diff --git a/polkadot/node/subsystem/src/util.rs b/polkadot/node/subsystem/src/util.rs index ee5cf2ded287995d177d41998a232fbc809741f1..d37e46ad05c0c2ed67aed9734f396e2e36e34b3b 100644 --- a/polkadot/node/subsystem/src/util.rs +++ b/polkadot/node/subsystem/src/util.rs @@ -602,21 +602,25 @@ where err_tx: &mut Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>> ) -> bool { use crate::FromOverseer::{Communication, Signal}; - use crate::OverseerSignal::{Conclude, StartWork, StopWork}; + use crate::ActiveLeavesUpdate; + use crate::OverseerSignal::{Conclude, ActiveLeaves}; match incoming { - Ok(Signal(StartWork(hash))) => { - if let Err(e) = jobs.spawn_job(hash, run_args.clone()) { - log::error!("Failed to spawn a job: {:?}", e); - Self::fwd_err(Some(hash), e.into(), err_tx).await; - return true; + Ok(Signal(ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }))) => { + for hash in activated { + if let Err(e) = jobs.spawn_job(hash, run_args.clone()) { + log::error!("Failed to spawn a job: {:?}", e); + Self::fwd_err(Some(hash), e.into(), err_tx).await; + return true; + } } - } - Ok(Signal(StopWork(hash))) => { - if let Err(e) = jobs.stop_job(hash).await { - log::error!("Failed to stop a job: {:?}", e); - Self::fwd_err(Some(hash), e.into(), err_tx).await; - return true; + + for hash in deactivated { + if let Err(e) = jobs.stop_job(hash).await { + log::error!("Failed to stop a job: {:?}", e); + Self::fwd_err(Some(hash), e.into(), err_tx).await; + return true; + } } } Ok(Signal(Conclude)) => { @@ -725,6 +729,7 @@ mod tests { JobTrait, ToJobTrait, }, + ActiveLeavesUpdate, FromOverseer, OverseerSignal, }; @@ -920,12 +925,12 @@ mod tests { run_args.insert(relay_parent.clone(), vec![FromJob::Test(test_message.clone())]); test_harness(run_args, |mut overseer_handle, err_rx| async move { - overseer_handle.send(FromOverseer::Signal(OverseerSignal::StartWork(relay_parent))).await; + overseer_handle.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(relay_parent)))).await; assert_matches!( overseer_handle.recv().await, AllMessages::Test(msg) if msg == test_message ); - overseer_handle.send(FromOverseer::Signal(OverseerSignal::StopWork(relay_parent))).await; + overseer_handle.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::stop_work(relay_parent)))).await; let errs: Vec<_> = err_rx.collect().await; assert_eq!(errs.len(), 0); @@ -938,7 +943,7 @@ mod tests { let run_args = HashMap::new(); test_harness(run_args, |mut overseer_handle, err_rx| async move { - overseer_handle.send(FromOverseer::Signal(OverseerSignal::StopWork(relay_parent))).await; + overseer_handle.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::stop_work(relay_parent)))).await; let errs: Vec<_> = err_rx.collect().await; assert_eq!(errs.len(), 1);