From 5952e790fad5a7cf9cb3995718387cd37333bcd5 Mon Sep 17 00:00:00 2001 From: Robert Habermeier <rphmeier@gmail.com> Date: Sun, 28 Mar 2021 17:55:10 +0200 Subject: [PATCH] Overseer: subsystems communicate directly (#2227) * overseer: pass messages directly between subsystems * test that message is held on to * Update node/overseer/src/lib.rs Co-authored-by: Peter Goodspeed-Niklaus <coriolinus@users.noreply.github.com> * give every subsystem an unbounded sender too * remove metered_channel::name 1. we don't provide good names 2. these names are never used anywhere * unused mut * remove unnecessary &mut * subsystem unbounded_send * remove unused MaybeTimer We have channel size metrics that serve the same purpose better now and the implementation of message timing was pretty ugly. * remove comment * split up senders and receivers * update metrics * fix tests * fix test subsystem context * fix flaky test * fix docs * doc * use select_biased to favor signals * Update node/subsystem/src/lib.rs Co-authored-by: Andronik Ordian <write@reusable.software> Co-authored-by: Peter Goodspeed-Niklaus <coriolinus@users.noreply.github.com> Co-authored-by: Andronik Ordian <write@reusable.software> --- polkadot/Cargo.lock | 8 +- polkadot/node/metered-channel/src/bounded.rs | 5 +- polkadot/node/metered-channel/src/lib.rs | 17 +- .../node/metered-channel/src/unbounded.rs | 7 +- polkadot/node/network/bridge/src/lib.rs | 2 +- polkadot/node/overseer/Cargo.toml | 3 +- polkadot/node/overseer/src/lib.rs | 3041 ++++++++++------- .../node/subsystem-test-helpers/src/lib.rs | 58 +- polkadot/node/subsystem/src/lib.rs | 52 +- 9 files changed, 1928 insertions(+), 1265 deletions(-) diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 3ed9d4ca2b2..09cdbe1930a 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -4236,12 +4236,6 @@ dependencies = [ "parking_lot 0.11.1", ] -[[package]] -name = "oorandom" -version = "11.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" - [[package]] name = "opaque-debug" version = "0.2.3" @@ -5886,12 +5880,12 @@ dependencies = [ name = "polkadot-overseer" version = "0.1.0" dependencies = [ + "assert_matches", "async-trait", "femme", "futures 0.3.13", "futures-timer 3.0.2", "kv-log-macro", - "oorandom", "polkadot-node-network-protocol", "polkadot-node-primitives", "polkadot-node-subsystem", diff --git a/polkadot/node/metered-channel/src/bounded.rs b/polkadot/node/metered-channel/src/bounded.rs index 66ecf302026..5ad1fae4205 100644 --- a/polkadot/node/metered-channel/src/bounded.rs +++ b/polkadot/node/metered-channel/src/bounded.rs @@ -25,10 +25,9 @@ use super::Meter; /// Create a wrapped `mpsc::channel` pair of `MeteredSender` and `MeteredReceiver`. -pub fn channel<T>(capacity: usize, name: &'static str) -> (MeteredSender<T>, MeteredReceiver<T>) { +pub fn channel<T>(capacity: usize) -> (MeteredSender<T>, MeteredReceiver<T>) { let (tx, rx) = mpsc::channel(capacity); - let mut shared_meter = Meter::default(); - shared_meter.name = name; + let shared_meter = Meter::default(); let tx = MeteredSender { meter: shared_meter.clone(), inner: tx }; let rx = MeteredReceiver { meter: shared_meter, inner: rx }; (tx, rx) diff --git a/polkadot/node/metered-channel/src/lib.rs b/polkadot/node/metered-channel/src/lib.rs index 2329e164823..e2fc0d84b50 100644 --- a/polkadot/node/metered-channel/src/lib.rs +++ b/polkadot/node/metered-channel/src/lib.rs @@ -30,8 +30,6 @@ pub use self::unbounded::*; /// A peek into the inner state of a meter. #[derive(Debug, Clone, Default)] pub struct Meter { - /// Name of the receiver and sender pair. - name: &'static str, // Number of sends on this channel. sent: Arc<AtomicUsize>, // Number of receives on this channel. @@ -60,11 +58,6 @@ impl Meter { } } - /// Obtain the name of the channel `Sender` and `Receiver` pair. - pub fn name(&self) -> &'static str { - self.name - } - fn note_sent(&self) { self.sent.fetch_add(1, Ordering::Relaxed); } @@ -92,7 +85,7 @@ mod tests { #[test] fn try_send_try_next() { block_on(async move { - let (mut tx, mut rx) = channel::<Msg>(5, "goofy"); + let (mut tx, mut rx) = channel::<Msg>(5); let msg = Msg::default(); assert_eq!(rx.meter().read(), Readout { sent: 0, received: 0 }); tx.try_send(msg).unwrap(); @@ -116,7 +109,7 @@ mod tests { fn with_tasks() { let (ready, go) = futures::channel::oneshot::channel(); - let (mut tx, mut rx) = channel::<Msg>(5, "goofy"); + let (mut tx, mut rx) = channel::<Msg>(5); block_on(async move { futures::join!( async move { @@ -149,7 +142,7 @@ mod tests { #[test] fn stream_and_sink() { - let (mut tx, mut rx) = channel::<Msg>(5, "goofy"); + let (mut tx, mut rx) = channel::<Msg>(5); block_on(async move { futures::join!( @@ -175,8 +168,8 @@ mod tests { #[test] fn failed_send_does_not_inc_sent() { - let (mut bounded, _) = channel::<Msg>(5, "pluto"); - let (mut unbounded, _) = unbounded::<Msg>("pluto"); + let (mut bounded, _) = channel::<Msg>(5); + let (mut unbounded, _) = unbounded::<Msg>(); block_on(async move { assert!(bounded.send(Msg::default()).await.is_err()); diff --git a/polkadot/node/metered-channel/src/unbounded.rs b/polkadot/node/metered-channel/src/unbounded.rs index 1d98b18dbe1..242b9198f4d 100644 --- a/polkadot/node/metered-channel/src/unbounded.rs +++ b/polkadot/node/metered-channel/src/unbounded.rs @@ -25,10 +25,9 @@ use super::Meter; /// Create a wrapped `mpsc::channel` pair of `MeteredSender` and `MeteredReceiver`. -pub fn unbounded<T>(name: &'static str) -> (UnboundedMeteredSender<T>, UnboundedMeteredReceiver<T>) { +pub fn unbounded<T>() -> (UnboundedMeteredSender<T>, UnboundedMeteredReceiver<T>) { let (tx, rx) = mpsc::unbounded(); - let mut shared_meter = Meter::default(); - shared_meter.name = name; + let shared_meter = Meter::default(); let tx = UnboundedMeteredSender { meter: shared_meter.clone(), inner: tx }; let rx = UnboundedMeteredReceiver { meter: shared_meter, inner: rx }; (tx, rx) @@ -147,7 +146,7 @@ impl<T> UnboundedMeteredSender<T> { /// Attempt to send message or fail immediately. - pub fn unbounded_send(&mut self, msg: T) -> result::Result<(), mpsc::TrySendError<T>> { + pub fn unbounded_send(&self, msg: T) -> result::Result<(), mpsc::TrySendError<T>> { self.meter.note_sent(); self.inner.unbounded_send(msg).map_err(|e| { self.meter.retract_sent(); diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index 6c79773bc15..62dc5d81721 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -734,7 +734,7 @@ mod tests { TestAuthorityDiscovery, ) { let (net_tx, net_rx) = polkadot_node_subsystem_test_helpers::single_item_sink(); - let (action_tx, action_rx) = metered::unbounded("test_action"); + let (action_tx, action_rx) = metered::unbounded(); ( TestNetwork { diff --git a/polkadot/node/overseer/Cargo.toml b/polkadot/node/overseer/Cargo.toml index 9a3ef00c640..1c0401a5361 100644 --- a/polkadot/node/overseer/Cargo.toml +++ b/polkadot/node/overseer/Cargo.toml @@ -9,7 +9,6 @@ async-trait = "0.1.42" client = { package = "sc-client-api", git = "https://github.com/paritytech/substrate", branch = "master" } futures = "0.3.12" futures-timer = "3.0.2" -oorandom = "11.1.3" polkadot-node-primitives = { package = "polkadot-node-primitives", path = "../primitives" } polkadot-node-subsystem-util = { path = "../subsystem-util" } polkadot-primitives = { path = "../../primitives" } @@ -20,6 +19,6 @@ tracing = "0.1.25" sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } polkadot-node-network-protocol = { path = "../network/protocol" } futures = { version = "0.3.12", features = ["thread-pool"] } -futures-timer = "3.0.2" femme = "2.1.1" kv-log-macro = "1.0.7" +assert_matches = "1.4.0" diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index 1661b6dd3d3..214e085f1d2 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -61,20 +61,19 @@ use std::fmt::{self, Debug}; use std::pin::Pin; -use std::sync::Arc; +use std::sync::{atomic::{self, AtomicUsize}, Arc}; use std::task::Poll; use std::time::Duration; use std::collections::{hash_map, HashMap}; -use futures::channel::{oneshot, mpsc}; +use futures::channel::{oneshot}; use futures::{ poll, select, future::BoxFuture, - stream::{FuturesUnordered, Fuse}, + stream::{self, FuturesUnordered, Fuse}, Future, FutureExt, StreamExt, }; use futures_timer::Delay; -use oorandom::Rand32; use polkadot_primitives::v1::{Block, BlockNumber, Hash}; use client::{BlockImportNotification, BlockchainEvents, FinalityNotification}; @@ -89,8 +88,8 @@ use polkadot_subsystem::messages::{ ApprovalVotingMessage, GossipSupportMessage, }; pub use polkadot_subsystem::{ - Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult, - SpawnedSubsystem, ActiveLeavesUpdate, ActivatedLeaf, DummySubsystem, jaeger, + Subsystem, SubsystemContext, SubsystemSender, OverseerSignal, FromOverseer, SubsystemError, + SubsystemResult, SpawnedSubsystem, ActiveLeavesUpdate, ActivatedLeaf, DummySubsystem, jaeger, }; use polkadot_node_subsystem_util::{TimeoutExt, metrics::{self, prometheus}, metered, Metronome}; use polkadot_node_primitives::SpawnNamed; @@ -101,597 +100,596 @@ const CHANNEL_CAPACITY: usize = 1024; const STOP_DELAY: u64 = 1; // Target for logs. const LOG_TARGET: &'static str = "parachain::overseer"; -// Rate at which messages are timed. -const MESSAGE_TIMER_METRIC_CAPTURE_RATE: f64 = 0.005; - +trait MapSubsystem<T> { + type Output; -/// A type of messages that are sent from [`Subsystem`] to [`Overseer`]. -/// -/// It wraps a system-wide [`AllMessages`] type that represents all possible -/// messages in the system. -/// -/// [`AllMessages`]: enum.AllMessages.html -/// [`Subsystem`]: trait.Subsystem.html -/// [`Overseer`]: struct.Overseer.html -enum ToOverseer { - /// This is a message sent by a `Subsystem`. - SubsystemMessage(AllMessages), + fn map_subsystem(&self, sub: T) -> Self::Output; +} - /// A message that wraps something the `Subsystem` is desiring to - /// spawn on the overseer and a `oneshot::Sender` to signal the result - /// of the spawn. - SpawnJob { - name: &'static str, - s: BoxFuture<'static, ()>, - }, +impl<F, T, U> MapSubsystem<T> for F where F: Fn(T) -> U { + type Output = U; - /// Same as `SpawnJob` but for blocking tasks to be executed on a - /// dedicated thread pool. - SpawnBlockingJob { - name: &'static str, - s: BoxFuture<'static, ()>, - }, + fn map_subsystem(&self, sub: T) -> U { + (self)(sub) + } } -/// An event telling the `Overseer` on the particular block -/// that has been imported or finalized. +/// This struct is passed as an argument to create a new instance of an [`Overseer`]. /// -/// This structure exists solely for the purposes of decoupling -/// `Overseer` code from the client code and the necessity to call -/// `HeaderBackend::block_number_from_id()`. +/// As any entity that satisfies the interface may act as a [`Subsystem`] this allows +/// mocking in the test code: +/// +/// Each [`Subsystem`] is supposed to implement some interface that is generic over +/// message type that is specific to this [`Subsystem`]. At the moment not all +/// subsystems are implemented and the rest can be mocked with the [`DummySubsystem`]. #[derive(Debug, Clone)] -pub struct BlockInfo { - /// hash of the block. - pub hash: Hash, - /// hash of the parent block. - pub parent_hash: Hash, - /// block's number. - pub number: BlockNumber, +pub struct AllSubsystems< + CV = (), CB = (), CS = (), SD = (), AD = (), AR = (), BS = (), BD = (), P = (), + RA = (), AS = (), NB = (), CA = (), CG = (), CP = (), ApD = (), ApV = (), + GS = (), +> { + /// A candidate validation subsystem. + pub candidate_validation: CV, + /// A candidate backing subsystem. + pub candidate_backing: CB, + /// A candidate selection subsystem. + pub candidate_selection: CS, + /// A statement distribution subsystem. + pub statement_distribution: SD, + /// An availability distribution subsystem. + pub availability_distribution: AD, + /// An availability recovery subsystem. + pub availability_recovery: AR, + /// A bitfield signing subsystem. + pub bitfield_signing: BS, + /// A bitfield distribution subsystem. + pub bitfield_distribution: BD, + /// A provisioner subsystem. + pub provisioner: P, + /// A runtime API subsystem. + pub runtime_api: RA, + /// An availability store subsystem. + pub availability_store: AS, + /// A network bridge subsystem. + pub network_bridge: NB, + /// A Chain API subsystem. + pub chain_api: CA, + /// A Collation Generation subsystem. + pub collation_generation: CG, + /// A Collator Protocol subsystem. + pub collator_protocol: CP, + /// An Approval Distribution subsystem. + pub approval_distribution: ApD, + /// An Approval Voting subsystem. + pub approval_voting: ApV, + /// A Connection Request Issuer subsystem. + pub gossip_support: GS, } -impl From<BlockImportNotification<Block>> for BlockInfo { - fn from(n: BlockImportNotification<Block>) -> Self { - BlockInfo { - hash: n.hash, - parent_hash: n.header.parent_hash, - number: n.header.number, +impl<CV, CB, CS, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> + AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> +{ + /// Create a new instance of [`AllSubsystems`]. + /// + /// Each subsystem is set to [`DummySystem`]. + /// + ///# Note + /// + /// Because of a bug in rustc it is required that when calling this function, + /// you provide a "random" type for the first generic parameter: + /// + /// ``` + /// polkadot_overseer::AllSubsystems::<()>::dummy(); + /// ``` + pub fn dummy() -> AllSubsystems< + DummySubsystem, + DummySubsystem, + DummySubsystem, + DummySubsystem, + DummySubsystem, + DummySubsystem, + DummySubsystem, + DummySubsystem, + DummySubsystem, + DummySubsystem, + DummySubsystem, + DummySubsystem, + DummySubsystem, + DummySubsystem, + DummySubsystem, + DummySubsystem, + DummySubsystem, + DummySubsystem, + > { + AllSubsystems { + candidate_validation: DummySubsystem, + candidate_backing: DummySubsystem, + candidate_selection: DummySubsystem, + statement_distribution: DummySubsystem, + availability_distribution: DummySubsystem, + availability_recovery: DummySubsystem, + bitfield_signing: DummySubsystem, + bitfield_distribution: DummySubsystem, + provisioner: DummySubsystem, + runtime_api: DummySubsystem, + availability_store: DummySubsystem, + network_bridge: DummySubsystem, + chain_api: DummySubsystem, + collation_generation: DummySubsystem, + collator_protocol: DummySubsystem, + approval_distribution: DummySubsystem, + approval_voting: DummySubsystem, + gossip_support: DummySubsystem, } } -} -impl From<FinalityNotification<Block>> for BlockInfo { - fn from(n: FinalityNotification<Block>) -> Self { - BlockInfo { - hash: n.hash, - parent_hash: n.header.parent_hash, - number: n.header.number, + /// Replace the `candidate_validation` instance in `self`. + pub fn replace_candidate_validation<NEW>( + self, + candidate_validation: NEW, + ) -> AllSubsystems<NEW, CB, CS, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> { + AllSubsystems { + candidate_validation, + candidate_backing: self.candidate_backing, + candidate_selection: self.candidate_selection, + statement_distribution: self.statement_distribution, + availability_distribution: self.availability_distribution, + availability_recovery: self.availability_recovery, + bitfield_signing: self.bitfield_signing, + bitfield_distribution: self.bitfield_distribution, + provisioner: self.provisioner, + runtime_api: self.runtime_api, + availability_store: self.availability_store, + network_bridge: self.network_bridge, + chain_api: self.chain_api, + collation_generation: self.collation_generation, + collator_protocol: self.collator_protocol, + approval_distribution: self.approval_distribution, + approval_voting: self.approval_voting, + gossip_support: self.gossip_support, } } -} - -/// Some event from the outer world. -enum Event { - BlockImported(BlockInfo), - BlockFinalized(BlockInfo), - MsgToSubsystem(AllMessages), - ExternalRequest(ExternalRequest), - Stop, -} - -/// Some request from outer world. -enum ExternalRequest { - WaitForActivation { - hash: Hash, - response_channel: oneshot::Sender<SubsystemResult<()>>, - }, -} - -/// A handler used to communicate with the [`Overseer`]. -/// -/// [`Overseer`]: struct.Overseer.html -#[derive(Clone)] -pub struct OverseerHandler { - events_tx: metered::MeteredSender<Event>, -} - -impl OverseerHandler { - /// Inform the `Overseer` that that some block was imported. - #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] - pub async fn block_imported(&mut self, block: BlockInfo) { - self.send_and_log_error(Event::BlockImported(block)).await - } - /// Send some message to one of the `Subsystem`s. - #[tracing::instrument(level = "trace", skip(self, msg), fields(subsystem = LOG_TARGET))] - pub async fn send_msg(&mut self, msg: impl Into<AllMessages>) { - self.send_and_log_error(Event::MsgToSubsystem(msg.into())).await - } - - /// Inform the `Overseer` that some block was finalized. - #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] - pub async fn block_finalized(&mut self, block: BlockInfo) { - self.send_and_log_error(Event::BlockFinalized(block)).await + /// Replace the `candidate_backing` instance in `self`. + pub fn replace_candidate_backing<NEW>( + self, + candidate_backing: NEW, + ) -> AllSubsystems<CV, NEW, CS, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> { + AllSubsystems { + candidate_validation: self.candidate_validation, + candidate_backing, + candidate_selection: self.candidate_selection, + statement_distribution: self.statement_distribution, + availability_distribution: self.availability_distribution, + availability_recovery: self.availability_recovery, + bitfield_signing: self.bitfield_signing, + bitfield_distribution: self.bitfield_distribution, + provisioner: self.provisioner, + runtime_api: self.runtime_api, + availability_store: self.availability_store, + network_bridge: self.network_bridge, + chain_api: self.chain_api, + collation_generation: self.collation_generation, + collator_protocol: self.collator_protocol, + approval_distribution: self.approval_distribution, + approval_voting: self.approval_voting, + gossip_support: self.gossip_support, + } } - /// Wait for a block with the given hash to be in the active-leaves set. - /// This method is used for external code like `Proposer` that doesn't subscribe to Overseer's signals. - /// - /// The response channel responds if the hash was activated and is closed if the hash was deactivated. - /// Note that due the fact the overseer doesn't store the whole active-leaves set, only deltas, - /// the response channel may never return if the hash was deactivated before this call. - /// In this case, it's the caller's responsibility to ensure a timeout is set. - #[tracing::instrument(level = "trace", skip(self, response_channel), fields(subsystem = LOG_TARGET))] - pub async fn wait_for_activation(&mut self, hash: Hash, response_channel: oneshot::Sender<SubsystemResult<()>>) { - self.send_and_log_error(Event::ExternalRequest(ExternalRequest::WaitForActivation { - hash, - response_channel - })).await + /// Replace the `candidate_selection` instance in `self`. + pub fn replace_candidate_selection<NEW>( + self, + candidate_selection: NEW, + ) -> AllSubsystems<CV, CB, NEW, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> { + AllSubsystems { + candidate_validation: self.candidate_validation, + candidate_backing: self.candidate_backing, + candidate_selection, + statement_distribution: self.statement_distribution, + availability_distribution: self.availability_distribution, + availability_recovery: self.availability_recovery, + bitfield_signing: self.bitfield_signing, + bitfield_distribution: self.bitfield_distribution, + provisioner: self.provisioner, + runtime_api: self.runtime_api, + availability_store: self.availability_store, + network_bridge: self.network_bridge, + chain_api: self.chain_api, + collation_generation: self.collation_generation, + collator_protocol: self.collator_protocol, + approval_distribution: self.approval_distribution, + approval_voting: self.approval_voting, + gossip_support: self.gossip_support, + } } - /// Tell `Overseer` to shutdown. - #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] - pub async fn stop(&mut self) { - self.send_and_log_error(Event::Stop).await + /// Replace the `statement_distribution` instance in `self`. + pub fn replace_statement_distribution<NEW>( + self, + statement_distribution: NEW, + ) -> AllSubsystems<CV, CB, CS, NEW, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> { + AllSubsystems { + candidate_validation: self.candidate_validation, + candidate_backing: self.candidate_backing, + candidate_selection: self.candidate_selection, + statement_distribution, + availability_distribution: self.availability_distribution, + availability_recovery: self.availability_recovery, + bitfield_signing: self.bitfield_signing, + bitfield_distribution: self.bitfield_distribution, + provisioner: self.provisioner, + runtime_api: self.runtime_api, + availability_store: self.availability_store, + network_bridge: self.network_bridge, + chain_api: self.chain_api, + collation_generation: self.collation_generation, + collator_protocol: self.collator_protocol, + approval_distribution: self.approval_distribution, + approval_voting: self.approval_voting, + gossip_support: self.gossip_support, + } } - async fn send_and_log_error(&mut self, event: Event) { - if self.events_tx.send(event).await.is_err() { - tracing::info!(target: LOG_TARGET, "Failed to send an event to Overseer"); + /// Replace the `availability_distribution` instance in `self`. + pub fn replace_availability_distribution<NEW>( + self, + availability_distribution: NEW, + ) -> AllSubsystems<CV, CB, CS, SD, NEW, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> { + AllSubsystems { + candidate_validation: self.candidate_validation, + candidate_backing: self.candidate_backing, + candidate_selection: self.candidate_selection, + statement_distribution: self.statement_distribution, + availability_distribution, + availability_recovery: self.availability_recovery, + bitfield_signing: self.bitfield_signing, + bitfield_distribution: self.bitfield_distribution, + provisioner: self.provisioner, + runtime_api: self.runtime_api, + availability_store: self.availability_store, + network_bridge: self.network_bridge, + chain_api: self.chain_api, + collation_generation: self.collation_generation, + collator_protocol: self.collator_protocol, + approval_distribution: self.approval_distribution, + approval_voting: self.approval_voting, + gossip_support: self.gossip_support, } } -} - -/// Glues together the [`Overseer`] and `BlockchainEvents` by forwarding -/// import and finality notifications into the [`OverseerHandler`]. -/// -/// [`Overseer`]: struct.Overseer.html -/// [`OverseerHandler`]: struct.OverseerHandler.html -pub async fn forward_events<P: BlockchainEvents<Block>>( - client: Arc<P>, - mut handler: OverseerHandler, -) { - let mut finality = client.finality_notification_stream(); - let mut imports = client.import_notification_stream(); - loop { - select! { - f = finality.next() => { - match f { - Some(block) => { - handler.block_finalized(block.into()).await; - } - None => break, - } - }, - i = imports.next() => { - match i { - Some(block) => { - handler.block_imported(block.into()).await; - } - None => break, - } - }, - complete => break, + /// Replace the `availability_recovery` instance in `self`. + pub fn replace_availability_recovery<NEW>( + self, + availability_recovery: NEW, + ) -> AllSubsystems<CV, CB, CS, SD, AD, NEW, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> { + AllSubsystems { + candidate_validation: self.candidate_validation, + candidate_backing: self.candidate_backing, + candidate_selection: self.candidate_selection, + statement_distribution: self.statement_distribution, + availability_distribution: self.availability_distribution, + availability_recovery, + bitfield_signing: self.bitfield_signing, + bitfield_distribution: self.bitfield_distribution, + provisioner: self.provisioner, + runtime_api: self.runtime_api, + availability_store: self.availability_store, + network_bridge: self.network_bridge, + chain_api: self.chain_api, + collation_generation: self.collation_generation, + collator_protocol: self.collator_protocol, + approval_distribution: self.approval_distribution, + approval_voting: self.approval_voting, + gossip_support: self.gossip_support, } } -} -impl Debug for ToOverseer { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - ToOverseer::SubsystemMessage(msg) => { - write!(f, "OverseerMessage::SubsystemMessage({:?})", msg) - } - ToOverseer::SpawnJob { .. } => write!(f, "OverseerMessage::Spawn(..)"), - ToOverseer::SpawnBlockingJob { .. } => write!(f, "OverseerMessage::SpawnBlocking(..)") + /// Replace the `bitfield_signing` instance in `self`. + pub fn replace_bitfield_signing<NEW>( + self, + bitfield_signing: NEW, + ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, NEW, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> { + AllSubsystems { + candidate_validation: self.candidate_validation, + candidate_backing: self.candidate_backing, + candidate_selection: self.candidate_selection, + statement_distribution: self.statement_distribution, + availability_distribution: self.availability_distribution, + availability_recovery: self.availability_recovery, + bitfield_signing, + bitfield_distribution: self.bitfield_distribution, + provisioner: self.provisioner, + runtime_api: self.runtime_api, + availability_store: self.availability_store, + network_bridge: self.network_bridge, + chain_api: self.chain_api, + collation_generation: self.collation_generation, + collator_protocol: self.collator_protocol, + approval_distribution: self.approval_distribution, + approval_voting: self.approval_voting, + gossip_support: self.gossip_support, } } -} - -/// A running instance of some [`Subsystem`]. -/// -/// [`Subsystem`]: trait.Subsystem.html -struct SubsystemInstance<M> { - tx: metered::MeteredSender<FromOverseer<M>>, - name: &'static str, -} -type MaybeTimer = Option<metrics::prometheus::prometheus::HistogramTimer>; - -#[derive(Debug)] -struct MaybeTimed<T> { - timer: MaybeTimer, - t: T, -} - -impl<T> MaybeTimed<T> { - fn into_inner(self) -> T { - self.t + /// Replace the `bitfield_distribution` instance in `self`. + pub fn replace_bitfield_distribution<NEW>( + self, + bitfield_distribution: NEW, + ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, NEW, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> { + AllSubsystems { + candidate_validation: self.candidate_validation, + candidate_backing: self.candidate_backing, + candidate_selection: self.candidate_selection, + statement_distribution: self.statement_distribution, + availability_distribution: self.availability_distribution, + availability_recovery: self.availability_recovery, + bitfield_signing: self.bitfield_signing, + bitfield_distribution, + provisioner: self.provisioner, + runtime_api: self.runtime_api, + availability_store: self.availability_store, + network_bridge: self.network_bridge, + chain_api: self.chain_api, + collation_generation: self.collation_generation, + collator_protocol: self.collator_protocol, + approval_distribution: self.approval_distribution, + approval_voting: self.approval_voting, + gossip_support: self.gossip_support, + } } -} -impl<T> From<T> for MaybeTimed<T> { - fn from(t: T) -> Self { - Self { timer: None, t } + /// Replace the `provisioner` instance in `self`. + pub fn replace_provisioner<NEW>( + self, + provisioner: NEW, + ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, NEW, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> { + AllSubsystems { + candidate_validation: self.candidate_validation, + candidate_backing: self.candidate_backing, + candidate_selection: self.candidate_selection, + statement_distribution: self.statement_distribution, + availability_distribution: self.availability_distribution, + availability_recovery: self.availability_recovery, + bitfield_signing: self.bitfield_signing, + bitfield_distribution: self.bitfield_distribution, + provisioner, + runtime_api: self.runtime_api, + availability_store: self.availability_store, + network_bridge: self.network_bridge, + chain_api: self.chain_api, + collation_generation: self.collation_generation, + collator_protocol: self.collator_protocol, + approval_distribution: self.approval_distribution, + approval_voting: self.approval_voting, + gossip_support: self.gossip_support, + } } -} -/// A context type that is given to the [`Subsystem`] upon spawning. -/// It can be used by [`Subsystem`] to communicate with other [`Subsystem`]s -/// or to spawn it's [`SubsystemJob`]s. -/// -/// [`Overseer`]: struct.Overseer.html -/// [`Subsystem`]: trait.Subsystem.html -/// [`SubsystemJob`]: trait.SubsystemJob.html -#[derive(Debug)] -pub struct OverseerSubsystemContext<M>{ - rx: metered::MeteredReceiver<FromOverseer<M>>, - tx: metered::UnboundedMeteredSender<MaybeTimed<ToOverseer>>, - metrics: Metrics, - rng: Rand32, - threshold: u32, -} - -impl<M> OverseerSubsystemContext<M> { - /// Create a new `OverseerSubsystemContext`. - /// - /// `increment` determines the initial increment of the internal RNG. - /// The internal RNG is used to determine which messages are timed. - /// - /// `capture_rate` determines what fraction of messages are timed. Its value is clamped - /// to the range `0.0..=1.0`. - fn new( - rx: metered::MeteredReceiver<FromOverseer<M>>, - tx: metered::UnboundedMeteredSender<MaybeTimed<ToOverseer>>, - metrics: Metrics, - increment: u64, - mut capture_rate: f64, - ) -> Self { - let rng = Rand32::new_inc(0, increment); - - if capture_rate < 0.0 { - capture_rate = 0.0; - } else if capture_rate > 1.0 { - capture_rate = 1.0; + /// Replace the `runtime_api` instance in `self`. + pub fn replace_runtime_api<NEW>( + self, + runtime_api: NEW, + ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, NEW, AS, NB, CA, CG, CP, ApD, ApV, GS> { + AllSubsystems { + candidate_validation: self.candidate_validation, + candidate_backing: self.candidate_backing, + candidate_selection: self.candidate_selection, + statement_distribution: self.statement_distribution, + availability_distribution: self.availability_distribution, + availability_recovery: self.availability_recovery, + bitfield_signing: self.bitfield_signing, + bitfield_distribution: self.bitfield_distribution, + provisioner: self.provisioner, + runtime_api, + availability_store: self.availability_store, + network_bridge: self.network_bridge, + chain_api: self.chain_api, + collation_generation: self.collation_generation, + collator_protocol: self.collator_protocol, + approval_distribution: self.approval_distribution, + approval_voting: self.approval_voting, + gossip_support: self.gossip_support, } - let threshold = (capture_rate * u32::MAX as f64) as u32; - - OverseerSubsystemContext { rx, tx, metrics, rng, threshold } - } - - /// Create a new `OverseserSubsystemContext` with no metering. - /// - /// Intended for tests. - #[allow(unused)] - fn new_unmetered( - rx: metered::MeteredReceiver<FromOverseer<M>>, - tx: metered::UnboundedMeteredSender<MaybeTimed<ToOverseer>>, - ) -> Self { - let metrics = Metrics::default(); - OverseerSubsystemContext::new(rx, tx, metrics, 0, 0.0) - } - - fn maybe_timed<T>(&mut self, t: T) -> MaybeTimed<T> { - let timer = if self.rng.rand_u32() <= self.threshold { - self.metrics.time_message_hold() - } else { - None - }; - - MaybeTimed { timer, t } } -} - -#[async_trait::async_trait] -impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> { - type Message = M; - async fn try_recv(&mut self) -> Result<Option<FromOverseer<M>>, ()> { - match poll!(self.rx.next()) { - Poll::Ready(Some(msg)) => Ok(Some(msg)), - Poll::Ready(None) => Err(()), - Poll::Pending => Ok(None), + /// Replace the `availability_store` instance in `self`. + pub fn replace_availability_store<NEW>( + self, + availability_store: NEW, + ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, RA, NEW, NB, CA, CG, CP, ApD, ApV, GS> { + AllSubsystems { + candidate_validation: self.candidate_validation, + candidate_backing: self.candidate_backing, + candidate_selection: self.candidate_selection, + statement_distribution: self.statement_distribution, + availability_distribution: self.availability_distribution, + availability_recovery: self.availability_recovery, + bitfield_signing: self.bitfield_signing, + bitfield_distribution: self.bitfield_distribution, + provisioner: self.provisioner, + runtime_api: self.runtime_api, + availability_store, + network_bridge: self.network_bridge, + chain_api: self.chain_api, + collation_generation: self.collation_generation, + collator_protocol: self.collator_protocol, + approval_distribution: self.approval_distribution, + approval_voting: self.approval_voting, + gossip_support: self.gossip_support, } } - async fn recv(&mut self) -> SubsystemResult<FromOverseer<M>> { - self.rx.next().await - .ok_or(SubsystemError::Context( - "No more messages in rx queue to process" - .to_owned() - )) - } - - async fn spawn(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>) - -> SubsystemResult<()> - { - self.send_timed(ToOverseer::SpawnJob { - name, - s, - }).map_err(|s| s.into_send_error().into()) + /// Replace the `network_bridge` instance in `self`. + pub fn replace_network_bridge<NEW>( + self, + network_bridge: NEW, + ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, RA, AS, NEW, CA, CG, CP, ApD, ApV, GS> { + AllSubsystems { + candidate_validation: self.candidate_validation, + candidate_backing: self.candidate_backing, + candidate_selection: self.candidate_selection, + statement_distribution: self.statement_distribution, + availability_distribution: self.availability_distribution, + availability_recovery: self.availability_recovery, + bitfield_signing: self.bitfield_signing, + bitfield_distribution: self.bitfield_distribution, + provisioner: self.provisioner, + runtime_api: self.runtime_api, + availability_store: self.availability_store, + network_bridge, + chain_api: self.chain_api, + collation_generation: self.collation_generation, + collator_protocol: self.collator_protocol, + approval_distribution: self.approval_distribution, + approval_voting: self.approval_voting, + gossip_support: self.gossip_support, + } } - async fn spawn_blocking(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>) - -> SubsystemResult<()> - { - self.send_timed(ToOverseer::SpawnBlockingJob { - name, - s, - }).map_err(|s| s.into_send_error().into()) + /// Replace the `chain_api` instance in `self`. + pub fn replace_chain_api<NEW>( + self, + chain_api: NEW, + ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, RA, AS, NB, NEW, CG, CP, ApD, ApV, GS> { + AllSubsystems { + candidate_validation: self.candidate_validation, + candidate_backing: self.candidate_backing, + candidate_selection: self.candidate_selection, + statement_distribution: self.statement_distribution, + availability_distribution: self.availability_distribution, + availability_recovery: self.availability_recovery, + bitfield_signing: self.bitfield_signing, + bitfield_distribution: self.bitfield_distribution, + provisioner: self.provisioner, + runtime_api: self.runtime_api, + availability_store: self.availability_store, + network_bridge: self.network_bridge, + chain_api, + collation_generation: self.collation_generation, + collator_protocol: self.collator_protocol, + approval_distribution: self.approval_distribution, + approval_voting: self.approval_voting, + gossip_support: self.gossip_support, + } } - async fn send_message(&mut self, msg: AllMessages) { - self.send_and_log_error(ToOverseer::SubsystemMessage(msg)) + /// Replace the `collation_generation` instance in `self`. + pub fn replace_collation_generation<NEW>( + self, + collation_generation: NEW, + ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, NEW, CP, ApD, ApV, GS> { + AllSubsystems { + candidate_validation: self.candidate_validation, + candidate_backing: self.candidate_backing, + candidate_selection: self.candidate_selection, + statement_distribution: self.statement_distribution, + availability_distribution: self.availability_distribution, + availability_recovery: self.availability_recovery, + bitfield_signing: self.bitfield_signing, + bitfield_distribution: self.bitfield_distribution, + provisioner: self.provisioner, + runtime_api: self.runtime_api, + availability_store: self.availability_store, + network_bridge: self.network_bridge, + chain_api: self.chain_api, + collation_generation, + collator_protocol: self.collator_protocol, + approval_distribution: self.approval_distribution, + approval_voting: self.approval_voting, + gossip_support: self.gossip_support, + } } - async fn send_messages<T>(&mut self, msgs: T) - where T: IntoIterator<Item = AllMessages> + Send, T::IntoIter: Send - { - for msg in msgs { - self.send_and_log_error(ToOverseer::SubsystemMessage(msg)); + /// Replace the `collator_protocol` instance in `self`. + pub fn replace_collator_protocol<NEW>( + self, + collator_protocol: NEW, + ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, NEW, ApD, ApV, GS> { + AllSubsystems { + candidate_validation: self.candidate_validation, + candidate_backing: self.candidate_backing, + candidate_selection: self.candidate_selection, + statement_distribution: self.statement_distribution, + availability_distribution: self.availability_distribution, + availability_recovery: self.availability_recovery, + bitfield_signing: self.bitfield_signing, + bitfield_distribution: self.bitfield_distribution, + provisioner: self.provisioner, + runtime_api: self.runtime_api, + availability_store: self.availability_store, + network_bridge: self.network_bridge, + chain_api: self.chain_api, + collation_generation: self.collation_generation, + collator_protocol, + approval_distribution: self.approval_distribution, + approval_voting: self.approval_voting, + gossip_support: self.gossip_support, } } -} -impl<M> OverseerSubsystemContext<M> { - fn send_and_log_error(&mut self, msg: ToOverseer) { - if self.send_timed(msg).is_err() { - tracing::debug!( - target: LOG_TARGET, - msg_type = std::any::type_name::<M>(), - "Failed to send a message to Overseer", - ); + /// Replace the `approval_distribution` instance in `self`. + pub fn replace_approval_distribution<NEW>( + self, + approval_distribution: NEW, + ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, NEW, ApV, GS> { + AllSubsystems { + candidate_validation: self.candidate_validation, + candidate_backing: self.candidate_backing, + candidate_selection: self.candidate_selection, + statement_distribution: self.statement_distribution, + availability_distribution: self.availability_distribution, + availability_recovery: self.availability_recovery, + bitfield_signing: self.bitfield_signing, + bitfield_distribution: self.bitfield_distribution, + provisioner: self.provisioner, + runtime_api: self.runtime_api, + availability_store: self.availability_store, + network_bridge: self.network_bridge, + chain_api: self.chain_api, + collation_generation: self.collation_generation, + collator_protocol: self.collator_protocol, + approval_distribution, + approval_voting: self.approval_voting, + gossip_support: self.gossip_support, } } - fn send_timed(&mut self, msg: ToOverseer) -> Result< - (), - mpsc::TrySendError<MaybeTimed<ToOverseer>>, - > - { - let msg = self.maybe_timed(msg); - self.tx.unbounded_send(msg) + /// Replace the `approval_voting` instance in `self`. + pub fn replace_approval_voting<NEW>( + self, + approval_voting: NEW, + ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, NEW, GS> { + AllSubsystems { + candidate_validation: self.candidate_validation, + candidate_backing: self.candidate_backing, + candidate_selection: self.candidate_selection, + statement_distribution: self.statement_distribution, + availability_distribution: self.availability_distribution, + availability_recovery: self.availability_recovery, + bitfield_signing: self.bitfield_signing, + bitfield_distribution: self.bitfield_distribution, + provisioner: self.provisioner, + runtime_api: self.runtime_api, + availability_store: self.availability_store, + network_bridge: self.network_bridge, + chain_api: self.chain_api, + collation_generation: self.collation_generation, + collator_protocol: self.collator_protocol, + approval_distribution: self.approval_distribution, + approval_voting, + gossip_support: self.gossip_support, + } } -} -/// A subsystem that we oversee. -/// -/// Ties together the [`Subsystem`] itself and it's running instance -/// (which may be missing if the [`Subsystem`] is not running at the moment -/// for whatever reason). -/// -/// [`Subsystem`]: trait.Subsystem.html -struct OverseenSubsystem<M> { - instance: Option<SubsystemInstance<M>>, -} - -impl<M> OverseenSubsystem<M> { - /// Send a message to the wrapped subsystem. - /// - /// If the inner `instance` is `None`, nothing is happening. - async fn send_message(&mut self, msg: M) -> SubsystemResult<()> { - const MESSAGE_TIMEOUT: Duration = Duration::from_secs(10); - - if let Some(ref mut instance) = self.instance { - match instance.tx.send( - FromOverseer::Communication { msg } - ).timeout(MESSAGE_TIMEOUT).await - { - None => { - tracing::error!(target: LOG_TARGET, "Subsystem {} appears unresponsive.", instance.name); - Err(SubsystemError::SubsystemStalled(instance.name)) - } - Some(res) => res.map_err(Into::into), - } - } else { - Ok(()) - } - } - - /// Send a signal to the wrapped subsystem. - /// - /// If the inner `instance` is `None`, nothing is happening. - async fn send_signal(&mut self, signal: OverseerSignal) -> SubsystemResult<()> { - const SIGNAL_TIMEOUT: Duration = Duration::from_secs(10); - - if let Some(ref mut instance) = self.instance { - match instance.tx.send(FromOverseer::Signal(signal)).timeout(SIGNAL_TIMEOUT).await { - None => { - tracing::error!(target: LOG_TARGET, "Subsystem {} appears unresponsive.", instance.name); - Err(SubsystemError::SubsystemStalled(instance.name)) - } - Some(res) => res.map_err(Into::into), - } - } else { - Ok(()) - } - } -} - -/// The `Overseer` itself. -pub struct Overseer<S> { - /// Handles to all subsystems. - subsystems: AllSubsystems< - OverseenSubsystem<CandidateValidationMessage>, - OverseenSubsystem<CandidateBackingMessage>, - OverseenSubsystem<CandidateSelectionMessage>, - OverseenSubsystem<StatementDistributionMessage>, - OverseenSubsystem<AvailabilityDistributionMessage>, - OverseenSubsystem<AvailabilityRecoveryMessage>, - OverseenSubsystem<BitfieldSigningMessage>, - OverseenSubsystem<BitfieldDistributionMessage>, - OverseenSubsystem<ProvisionerMessage>, - OverseenSubsystem<RuntimeApiMessage>, - OverseenSubsystem<AvailabilityStoreMessage>, - OverseenSubsystem<NetworkBridgeMessage>, - OverseenSubsystem<ChainApiMessage>, - OverseenSubsystem<CollationGenerationMessage>, - OverseenSubsystem<CollatorProtocolMessage>, - OverseenSubsystem<ApprovalDistributionMessage>, - OverseenSubsystem<ApprovalVotingMessage>, - OverseenSubsystem<GossipSupportMessage>, - >, - - /// Spawner to spawn tasks to. - s: S, - - /// Here we keep handles to spawned subsystems to be notified when they terminate. - running_subsystems: FuturesUnordered<BoxFuture<'static, SubsystemResult<()>>>, - - /// Gather running subsystems' outbound streams into one. - to_overseer_rx: Fuse<metered::UnboundedMeteredReceiver<MaybeTimed<ToOverseer>>>, - - /// Events that are sent to the overseer from the outside world - events_rx: metered::MeteredReceiver<Event>, - - /// External listeners waiting for a hash to be in the active-leave set. - activation_external_listeners: HashMap<Hash, Vec<oneshot::Sender<SubsystemResult<()>>>>, - - /// Stores the [`jaeger::Span`] per active leaf. - span_per_active_leaf: HashMap<Hash, Arc<jaeger::Span>>, - - /// A set of leaves that `Overseer` starts working with. - /// - /// Drained at the beginning of `run` and never used again. - leaves: Vec<(Hash, BlockNumber)>, - - /// The set of the "active leaves". - active_leaves: HashMap<Hash, BlockNumber>, - - /// Various Prometheus metrics. - metrics: Metrics, -} - -trait MapSubsystem<T> { - type Output; - - fn map_subsystem(&self, sub: T) -> Self::Output; -} - -impl<F, T, U> MapSubsystem<T> for F where F: Fn(T) -> U { - type Output = U; - - fn map_subsystem(&self, sub: T) -> U { - (self)(sub) - } -} - -/// This struct is passed as an argument to create a new instance of an [`Overseer`]. -/// -/// As any entity that satisfies the interface may act as a [`Subsystem`] this allows -/// mocking in the test code: -/// -/// Each [`Subsystem`] is supposed to implement some interface that is generic over -/// message type that is specific to this [`Subsystem`]. At the moment not all -/// subsystems are implemented and the rest can be mocked with the [`DummySubsystem`]. -pub struct AllSubsystems< - CV = (), CB = (), CS = (), SD = (), AD = (), AR = (), BS = (), BD = (), P = (), - RA = (), AS = (), NB = (), CA = (), CG = (), CP = (), ApD = (), ApV = (), - GS = (), -> { - /// A candidate validation subsystem. - pub candidate_validation: CV, - /// A candidate backing subsystem. - pub candidate_backing: CB, - /// A candidate selection subsystem. - pub candidate_selection: CS, - /// A statement distribution subsystem. - pub statement_distribution: SD, - /// An availability distribution subsystem. - pub availability_distribution: AD, - /// An availability recovery subsystem. - pub availability_recovery: AR, - /// A bitfield signing subsystem. - pub bitfield_signing: BS, - /// A bitfield distribution subsystem. - pub bitfield_distribution: BD, - /// A provisioner subsystem. - pub provisioner: P, - /// A runtime API subsystem. - pub runtime_api: RA, - /// An availability store subsystem. - pub availability_store: AS, - /// A network bridge subsystem. - pub network_bridge: NB, - /// A Chain API subsystem. - pub chain_api: CA, - /// A Collation Generation subsystem. - pub collation_generation: CG, - /// A Collator Protocol subsystem. - pub collator_protocol: CP, - /// An Approval Distribution subsystem. - pub approval_distribution: ApD, - /// An Approval Voting subsystem. - pub approval_voting: ApV, - /// A Connection Request Issuer subsystem. - pub gossip_support: GS, -} - -impl<CV, CB, CS, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> - AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> -{ - /// Create a new instance of [`AllSubsystems`]. - /// - /// Each subsystem is set to [`DummySystem`]. - /// - ///# Note - /// - /// Because of a bug in rustc it is required that when calling this function, - /// you provide a "random" type for the first generic parameter: - /// - /// ``` - /// polkadot_overseer::AllSubsystems::<()>::dummy(); - /// ``` - pub fn dummy() -> AllSubsystems< - DummySubsystem, - DummySubsystem, - DummySubsystem, - DummySubsystem, - DummySubsystem, - DummySubsystem, - DummySubsystem, - DummySubsystem, - DummySubsystem, - DummySubsystem, - DummySubsystem, - DummySubsystem, - DummySubsystem, - DummySubsystem, - DummySubsystem, - DummySubsystem, - DummySubsystem, - DummySubsystem, - > { - AllSubsystems { - candidate_validation: DummySubsystem, - candidate_backing: DummySubsystem, - candidate_selection: DummySubsystem, - statement_distribution: DummySubsystem, - availability_distribution: DummySubsystem, - availability_recovery: DummySubsystem, - bitfield_signing: DummySubsystem, - bitfield_distribution: DummySubsystem, - provisioner: DummySubsystem, - runtime_api: DummySubsystem, - availability_store: DummySubsystem, - network_bridge: DummySubsystem, - chain_api: DummySubsystem, - collation_generation: DummySubsystem, - collator_protocol: DummySubsystem, - approval_distribution: DummySubsystem, - approval_voting: DummySubsystem, - gossip_support: DummySubsystem, - } - } - - /// Replace the `candidate_validation` instance in `self`. - pub fn replace_candidate_validation<NEW>( + /// Replace the `gossip_support` instance in `self`. + pub fn replace_gossip_support<NEW>( self, - candidate_validation: NEW, - ) -> AllSubsystems<NEW, CB, CS, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> { + gossip_support: NEW, + ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, NEW> { AllSubsystems { - candidate_validation, + candidate_validation: self.candidate_validation, candidate_backing: self.candidate_backing, candidate_selection: self.candidate_selection, statement_distribution: self.statement_distribution, @@ -708,562 +706,864 @@ impl<CV, CB, CS, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> collator_protocol: self.collator_protocol, approval_distribution: self.approval_distribution, approval_voting: self.approval_voting, - gossip_support: self.gossip_support, + gossip_support, } } - /// Replace the `candidate_backing` instance in `self`. - pub fn replace_candidate_backing<NEW>( - self, - candidate_backing: NEW, - ) -> AllSubsystems<CV, NEW, CS, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> { + fn as_ref(&self) -> AllSubsystems<&'_ CV, &'_ CB, &'_ CS, &'_ SD, &'_ AD, &'_ AR, &'_ BS, &'_ BD, &'_ P, &'_ RA, &'_ AS, &'_ NB, &'_ CA, &'_ CG, &'_ CP, &'_ ApD, &'_ ApV, &'_ GS> { AllSubsystems { - candidate_validation: self.candidate_validation, - candidate_backing, - candidate_selection: self.candidate_selection, - statement_distribution: self.statement_distribution, - availability_distribution: self.availability_distribution, - availability_recovery: self.availability_recovery, - bitfield_signing: self.bitfield_signing, - bitfield_distribution: self.bitfield_distribution, - provisioner: self.provisioner, - runtime_api: self.runtime_api, - availability_store: self.availability_store, - network_bridge: self.network_bridge, - chain_api: self.chain_api, - collation_generation: self.collation_generation, - collator_protocol: self.collator_protocol, - approval_distribution: self.approval_distribution, - approval_voting: self.approval_voting, - gossip_support: self.gossip_support, - } - } - - /// Replace the `candidate_selection` instance in `self`. - pub fn replace_candidate_selection<NEW>( - self, - candidate_selection: NEW, - ) -> AllSubsystems<CV, CB, NEW, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> { - AllSubsystems { - candidate_validation: self.candidate_validation, - candidate_backing: self.candidate_backing, - candidate_selection, - statement_distribution: self.statement_distribution, - availability_distribution: self.availability_distribution, - availability_recovery: self.availability_recovery, - bitfield_signing: self.bitfield_signing, - bitfield_distribution: self.bitfield_distribution, - provisioner: self.provisioner, - runtime_api: self.runtime_api, - availability_store: self.availability_store, - network_bridge: self.network_bridge, - chain_api: self.chain_api, - collation_generation: self.collation_generation, - collator_protocol: self.collator_protocol, - approval_distribution: self.approval_distribution, - approval_voting: self.approval_voting, - gossip_support: self.gossip_support, + candidate_validation: &self.candidate_validation, + candidate_backing: &self.candidate_backing, + candidate_selection: &self.candidate_selection, + statement_distribution: &self.statement_distribution, + availability_distribution: &self.availability_distribution, + availability_recovery: &self.availability_recovery, + bitfield_signing: &self.bitfield_signing, + bitfield_distribution: &self.bitfield_distribution, + provisioner: &self.provisioner, + runtime_api: &self.runtime_api, + availability_store: &self.availability_store, + network_bridge: &self.network_bridge, + chain_api: &self.chain_api, + collation_generation: &self.collation_generation, + collator_protocol: &self.collator_protocol, + approval_distribution: &self.approval_distribution, + approval_voting: &self.approval_voting, + gossip_support: &self.gossip_support, } } - /// Replace the `statement_distribution` instance in `self`. - pub fn replace_statement_distribution<NEW>( - self, - statement_distribution: NEW, - ) -> AllSubsystems<CV, CB, CS, NEW, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> { + fn map_subsystems<M>(self, m: M) + -> AllSubsystems< + <M as MapSubsystem<CV>>::Output, + <M as MapSubsystem<CB>>::Output, + <M as MapSubsystem<CS>>::Output, + <M as MapSubsystem<SD>>::Output, + <M as MapSubsystem<AD>>::Output, + <M as MapSubsystem<AR>>::Output, + <M as MapSubsystem<BS>>::Output, + <M as MapSubsystem<BD>>::Output, + <M as MapSubsystem<P>>::Output, + <M as MapSubsystem<RA>>::Output, + <M as MapSubsystem<AS>>::Output, + <M as MapSubsystem<NB>>::Output, + <M as MapSubsystem<CA>>::Output, + <M as MapSubsystem<CG>>::Output, + <M as MapSubsystem<CP>>::Output, + <M as MapSubsystem<ApD>>::Output, + <M as MapSubsystem<ApV>>::Output, + <M as MapSubsystem<GS>>::Output, + > + where + M: MapSubsystem<CV>, + M: MapSubsystem<CB>, + M: MapSubsystem<CS>, + M: MapSubsystem<SD>, + M: MapSubsystem<AD>, + M: MapSubsystem<AR>, + M: MapSubsystem<BS>, + M: MapSubsystem<BD>, + M: MapSubsystem<P>, + M: MapSubsystem<RA>, + M: MapSubsystem<AS>, + M: MapSubsystem<NB>, + M: MapSubsystem<CA>, + M: MapSubsystem<CG>, + M: MapSubsystem<CP>, + M: MapSubsystem<ApD>, + M: MapSubsystem<ApV>, + M: MapSubsystem<GS>, + { AllSubsystems { - candidate_validation: self.candidate_validation, - candidate_backing: self.candidate_backing, - candidate_selection: self.candidate_selection, - statement_distribution, - availability_distribution: self.availability_distribution, - availability_recovery: self.availability_recovery, - bitfield_signing: self.bitfield_signing, - bitfield_distribution: self.bitfield_distribution, - provisioner: self.provisioner, - runtime_api: self.runtime_api, - availability_store: self.availability_store, - network_bridge: self.network_bridge, - chain_api: self.chain_api, - collation_generation: self.collation_generation, - collator_protocol: self.collator_protocol, - approval_distribution: self.approval_distribution, - approval_voting: self.approval_voting, - gossip_support: self.gossip_support, + candidate_validation: m.map_subsystem(self.candidate_validation), + candidate_backing: m.map_subsystem(self.candidate_backing), + candidate_selection: m.map_subsystem(self.candidate_selection), + statement_distribution: m.map_subsystem(self.statement_distribution), + availability_distribution: m.map_subsystem(self.availability_distribution), + availability_recovery: m.map_subsystem(self.availability_recovery), + bitfield_signing: m.map_subsystem(self.bitfield_signing), + bitfield_distribution: m.map_subsystem(self.bitfield_distribution), + provisioner: m.map_subsystem(self.provisioner), + runtime_api: m.map_subsystem(self.runtime_api), + availability_store: m.map_subsystem(self.availability_store), + network_bridge: m.map_subsystem(self.network_bridge), + chain_api: m.map_subsystem(self.chain_api), + collation_generation: m.map_subsystem(self.collation_generation), + collator_protocol: m.map_subsystem(self.collator_protocol), + approval_distribution: m.map_subsystem(self.approval_distribution), + approval_voting: m.map_subsystem(self.approval_voting), + gossip_support: m.map_subsystem(self.gossip_support), + } + } +} + +type AllSubsystemsSame<T> = AllSubsystems< + T, T, T, T, T, + T, T, T, T, T, + T, T, T, T, T, + T, T, T, +>; + +/// A type of messages that are sent from [`Subsystem`] to [`Overseer`]. +/// +/// It wraps a system-wide [`AllMessages`] type that represents all possible +/// messages in the system. +/// +/// [`AllMessages`]: enum.AllMessages.html +/// [`Subsystem`]: trait.Subsystem.html +/// [`Overseer`]: struct.Overseer.html +enum ToOverseer { + /// A message that wraps something the `Subsystem` is desiring to + /// spawn on the overseer and a `oneshot::Sender` to signal the result + /// of the spawn. + SpawnJob { + name: &'static str, + s: BoxFuture<'static, ()>, + }, + + /// Same as `SpawnJob` but for blocking tasks to be executed on a + /// dedicated thread pool. + SpawnBlockingJob { + name: &'static str, + s: BoxFuture<'static, ()>, + }, +} + +/// An event telling the `Overseer` on the particular block +/// that has been imported or finalized. +/// +/// This structure exists solely for the purposes of decoupling +/// `Overseer` code from the client code and the necessity to call +/// `HeaderBackend::block_number_from_id()`. +#[derive(Debug, Clone)] +pub struct BlockInfo { + /// hash of the block. + pub hash: Hash, + /// hash of the parent block. + pub parent_hash: Hash, + /// block's number. + pub number: BlockNumber, +} + +impl From<BlockImportNotification<Block>> for BlockInfo { + fn from(n: BlockImportNotification<Block>) -> Self { + BlockInfo { + hash: n.hash, + parent_hash: n.header.parent_hash, + number: n.header.number, + } + } +} + +impl From<FinalityNotification<Block>> for BlockInfo { + fn from(n: FinalityNotification<Block>) -> Self { + BlockInfo { + hash: n.hash, + parent_hash: n.header.parent_hash, + number: n.header.number, + } + } +} + +/// Some event from the outer world. +enum Event { + BlockImported(BlockInfo), + BlockFinalized(BlockInfo), + MsgToSubsystem(AllMessages), + ExternalRequest(ExternalRequest), + Stop, +} + +/// Some request from outer world. +enum ExternalRequest { + WaitForActivation { + hash: Hash, + response_channel: oneshot::Sender<SubsystemResult<()>>, + }, +} + +/// A handler used to communicate with the [`Overseer`]. +/// +/// [`Overseer`]: struct.Overseer.html +#[derive(Clone)] +pub struct OverseerHandler { + events_tx: metered::MeteredSender<Event>, +} + +impl OverseerHandler { + /// Inform the `Overseer` that that some block was imported. + #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] + pub async fn block_imported(&mut self, block: BlockInfo) { + self.send_and_log_error(Event::BlockImported(block)).await + } + + /// Send some message to one of the `Subsystem`s. + #[tracing::instrument(level = "trace", skip(self, msg), fields(subsystem = LOG_TARGET))] + pub async fn send_msg(&mut self, msg: impl Into<AllMessages>) { + self.send_and_log_error(Event::MsgToSubsystem(msg.into())).await + } + + /// Inform the `Overseer` that some block was finalized. + #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] + pub async fn block_finalized(&mut self, block: BlockInfo) { + self.send_and_log_error(Event::BlockFinalized(block)).await + } + + /// Wait for a block with the given hash to be in the active-leaves set. + /// This method is used for external code like `Proposer` that doesn't subscribe to Overseer's signals. + /// + /// The response channel responds if the hash was activated and is closed if the hash was deactivated. + /// Note that due the fact the overseer doesn't store the whole active-leaves set, only deltas, + /// the response channel may never return if the hash was deactivated before this call. + /// In this case, it's the caller's responsibility to ensure a timeout is set. + #[tracing::instrument(level = "trace", skip(self, response_channel), fields(subsystem = LOG_TARGET))] + pub async fn wait_for_activation(&mut self, hash: Hash, response_channel: oneshot::Sender<SubsystemResult<()>>) { + self.send_and_log_error(Event::ExternalRequest(ExternalRequest::WaitForActivation { + hash, + response_channel + })).await + } + + /// Tell `Overseer` to shutdown. + #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] + pub async fn stop(&mut self) { + self.send_and_log_error(Event::Stop).await + } + + async fn send_and_log_error(&mut self, event: Event) { + if self.events_tx.send(event).await.is_err() { + tracing::info!(target: LOG_TARGET, "Failed to send an event to Overseer"); + } + } +} + +/// Glues together the [`Overseer`] and `BlockchainEvents` by forwarding +/// import and finality notifications into the [`OverseerHandler`]. +/// +/// [`Overseer`]: struct.Overseer.html +/// [`OverseerHandler`]: struct.OverseerHandler.html +pub async fn forward_events<P: BlockchainEvents<Block>>( + client: Arc<P>, + mut handler: OverseerHandler, +) { + let mut finality = client.finality_notification_stream(); + let mut imports = client.import_notification_stream(); + + loop { + select! { + f = finality.next() => { + match f { + Some(block) => { + handler.block_finalized(block.into()).await; + } + None => break, + } + }, + i = imports.next() => { + match i { + Some(block) => { + handler.block_imported(block.into()).await; + } + None => break, + } + }, + complete => break, + } + } +} + +impl Debug for ToOverseer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ToOverseer::SpawnJob { .. } => write!(f, "OverseerMessage::Spawn(..)"), + ToOverseer::SpawnBlockingJob { .. } => write!(f, "OverseerMessage::SpawnBlocking(..)") + } + } +} + +/// A running instance of some [`Subsystem`]. +/// +/// [`Subsystem`]: trait.Subsystem.html +struct SubsystemInstance<M> { + tx_signal: metered::MeteredSender<OverseerSignal>, + tx_bounded: metered::MeteredSender<MessagePacket<M>>, + meters: SubsystemMeters, + signals_received: usize, + name: &'static str, +} + +#[derive(Debug)] +struct MessagePacket<T> { + signals_received: usize, + message: T, +} + +fn make_packet<T>(signals_received: usize, message: T) -> MessagePacket<T> { + MessagePacket { + signals_received, + message, + } +} + +// The channels held by every subsystem to communicate with every other subsystem. +#[derive(Debug, Clone)] +struct ChannelsOut { + candidate_validation: metered::MeteredSender<MessagePacket<CandidateValidationMessage>>, + candidate_backing: metered::MeteredSender<MessagePacket<CandidateBackingMessage>>, + candidate_selection: metered::MeteredSender<MessagePacket<CandidateSelectionMessage>>, + statement_distribution: metered::MeteredSender<MessagePacket<StatementDistributionMessage>>, + availability_distribution: metered::MeteredSender<MessagePacket<AvailabilityDistributionMessage>>, + availability_recovery: metered::MeteredSender<MessagePacket<AvailabilityRecoveryMessage>>, + bitfield_signing: metered::MeteredSender<MessagePacket<BitfieldSigningMessage>>, + bitfield_distribution: metered::MeteredSender<MessagePacket<BitfieldDistributionMessage>>, + provisioner: metered::MeteredSender<MessagePacket<ProvisionerMessage>>, + runtime_api: metered::MeteredSender<MessagePacket<RuntimeApiMessage>>, + availability_store: metered::MeteredSender<MessagePacket<AvailabilityStoreMessage>>, + network_bridge: metered::MeteredSender<MessagePacket<NetworkBridgeMessage>>, + chain_api: metered::MeteredSender<MessagePacket<ChainApiMessage>>, + collation_generation: metered::MeteredSender<MessagePacket<CollationGenerationMessage>>, + collator_protocol: metered::MeteredSender<MessagePacket<CollatorProtocolMessage>>, + approval_distribution: metered::MeteredSender<MessagePacket<ApprovalDistributionMessage>>, + approval_voting: metered::MeteredSender<MessagePacket<ApprovalVotingMessage>>, + gossip_support: metered::MeteredSender<MessagePacket<GossipSupportMessage>>, + + candidate_validation_unbounded: metered::UnboundedMeteredSender<MessagePacket<CandidateValidationMessage>>, + candidate_backing_unbounded: metered::UnboundedMeteredSender<MessagePacket<CandidateBackingMessage>>, + candidate_selection_unbounded: metered::UnboundedMeteredSender<MessagePacket<CandidateSelectionMessage>>, + statement_distribution_unbounded: metered::UnboundedMeteredSender<MessagePacket<StatementDistributionMessage>>, + availability_distribution_unbounded: metered::UnboundedMeteredSender<MessagePacket<AvailabilityDistributionMessage>>, + availability_recovery_unbounded: metered::UnboundedMeteredSender<MessagePacket<AvailabilityRecoveryMessage>>, + bitfield_signing_unbounded: metered::UnboundedMeteredSender<MessagePacket<BitfieldSigningMessage>>, + bitfield_distribution_unbounded: metered::UnboundedMeteredSender<MessagePacket<BitfieldDistributionMessage>>, + provisioner_unbounded: metered::UnboundedMeteredSender<MessagePacket<ProvisionerMessage>>, + runtime_api_unbounded: metered::UnboundedMeteredSender<MessagePacket<RuntimeApiMessage>>, + availability_store_unbounded: metered::UnboundedMeteredSender<MessagePacket<AvailabilityStoreMessage>>, + network_bridge_unbounded: metered::UnboundedMeteredSender<MessagePacket<NetworkBridgeMessage>>, + chain_api_unbounded: metered::UnboundedMeteredSender<MessagePacket<ChainApiMessage>>, + collation_generation_unbounded: metered::UnboundedMeteredSender<MessagePacket<CollationGenerationMessage>>, + collator_protocol_unbounded: metered::UnboundedMeteredSender<MessagePacket<CollatorProtocolMessage>>, + approval_distribution_unbounded: metered::UnboundedMeteredSender<MessagePacket<ApprovalDistributionMessage>>, + approval_voting_unbounded: metered::UnboundedMeteredSender<MessagePacket<ApprovalVotingMessage>>, + gossip_support_unbounded: metered::UnboundedMeteredSender<MessagePacket<GossipSupportMessage>>, +} + +impl ChannelsOut { + async fn send_and_log_error( + &mut self, + signals_received: usize, + message: AllMessages, + ) { + let res = match message { + AllMessages::CandidateValidation(msg) => { + self.candidate_validation.send(make_packet(signals_received, msg)).await + }, + AllMessages::CandidateBacking(msg) => { + self.candidate_backing.send(make_packet(signals_received, msg)).await + }, + AllMessages::CandidateSelection(msg) => { + self.candidate_selection.send(make_packet(signals_received, msg)).await + }, + AllMessages::StatementDistribution(msg) => { + self.statement_distribution.send(make_packet(signals_received, msg)).await + }, + AllMessages::AvailabilityDistribution(msg) => { + self.availability_distribution.send(make_packet(signals_received, msg)).await + }, + AllMessages::AvailabilityRecovery(msg) => { + self.availability_recovery.send(make_packet(signals_received, msg)).await + }, + AllMessages::BitfieldDistribution(msg) => { + self.bitfield_distribution.send(make_packet(signals_received, msg)).await + }, + AllMessages::BitfieldSigning(msg) => { + self.bitfield_signing.send(make_packet(signals_received, msg)).await + }, + AllMessages::Provisioner(msg) => { + self.provisioner.send(make_packet(signals_received, msg)).await + }, + AllMessages::RuntimeApi(msg) => { + self.runtime_api.send(make_packet(signals_received, msg)).await + }, + AllMessages::AvailabilityStore(msg) => { + self.availability_store.send(make_packet(signals_received, msg)).await + }, + AllMessages::NetworkBridge(msg) => { + self.network_bridge.send(make_packet(signals_received, msg)).await + }, + AllMessages::ChainApi(msg) => { + self.chain_api.send(make_packet(signals_received, msg)).await + }, + AllMessages::CollationGeneration(msg) => { + self.collation_generation.send(make_packet(signals_received, msg)).await + }, + AllMessages::CollatorProtocol(msg) => { + self.collator_protocol.send(make_packet(signals_received, msg)).await + }, + AllMessages::ApprovalDistribution(msg) => { + self.approval_distribution.send(make_packet(signals_received, msg)).await + }, + AllMessages::ApprovalVoting(msg) => { + self.approval_voting.send(make_packet(signals_received, msg)).await + }, + AllMessages::GossipSupport(msg) => { + self.gossip_support.send(make_packet(signals_received, msg)).await + }, + }; + + if res.is_err() { + tracing::debug!( + target: LOG_TARGET, + "Failed to send a message to another subsystem", + ); } } - /// Replace the `availability_distribution` instance in `self`. - pub fn replace_availability_distribution<NEW>( - self, - availability_distribution: NEW, - ) -> AllSubsystems<CV, CB, CS, SD, NEW, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> { - AllSubsystems { - candidate_validation: self.candidate_validation, - candidate_backing: self.candidate_backing, - candidate_selection: self.candidate_selection, - statement_distribution: self.statement_distribution, - availability_distribution, - availability_recovery: self.availability_recovery, - bitfield_signing: self.bitfield_signing, - bitfield_distribution: self.bitfield_distribution, - provisioner: self.provisioner, - runtime_api: self.runtime_api, - availability_store: self.availability_store, - network_bridge: self.network_bridge, - chain_api: self.chain_api, - collation_generation: self.collation_generation, - collator_protocol: self.collator_protocol, - approval_distribution: self.approval_distribution, - approval_voting: self.approval_voting, - gossip_support: self.gossip_support, + + fn send_unbounded_and_log_error( + &self, + signals_received: usize, + message: AllMessages, + ) { + let res = match message { + AllMessages::CandidateValidation(msg) => { + self.candidate_validation_unbounded + .unbounded_send(make_packet(signals_received, msg)) + .map_err(|e| e.into_send_error()) + }, + AllMessages::CandidateBacking(msg) => { + self.candidate_backing_unbounded + .unbounded_send(make_packet(signals_received, msg)) + .map_err(|e| e.into_send_error()) + }, + AllMessages::CandidateSelection(msg) => { + self.candidate_selection_unbounded + .unbounded_send(make_packet(signals_received, msg)) + .map_err(|e| e.into_send_error()) + }, + AllMessages::StatementDistribution(msg) => { + self.statement_distribution_unbounded + .unbounded_send(make_packet(signals_received, msg)) + .map_err(|e| e.into_send_error()) + }, + AllMessages::AvailabilityDistribution(msg) => { + self.availability_distribution_unbounded + .unbounded_send(make_packet(signals_received, msg)) + .map_err(|e| e.into_send_error()) + }, + AllMessages::AvailabilityRecovery(msg) => { + self.availability_recovery_unbounded + .unbounded_send(make_packet(signals_received, msg)) + .map_err(|e| e.into_send_error()) + }, + AllMessages::BitfieldDistribution(msg) => { + self.bitfield_distribution_unbounded + .unbounded_send(make_packet(signals_received, msg)) + .map_err(|e| e.into_send_error()) + }, + AllMessages::BitfieldSigning(msg) => { + self.bitfield_signing_unbounded + .unbounded_send(make_packet(signals_received, msg)) + .map_err(|e| e.into_send_error()) + }, + AllMessages::Provisioner(msg) => { + self.provisioner_unbounded + .unbounded_send(make_packet(signals_received, msg)) + .map_err(|e| e.into_send_error()) + }, + AllMessages::RuntimeApi(msg) => { + self.runtime_api_unbounded + .unbounded_send(make_packet(signals_received, msg)) + .map_err(|e| e.into_send_error()) + }, + AllMessages::AvailabilityStore(msg) => { + self.availability_store_unbounded + .unbounded_send(make_packet(signals_received, msg)) + .map_err(|e| e.into_send_error()) + }, + AllMessages::NetworkBridge(msg) => { + self.network_bridge_unbounded + .unbounded_send(make_packet(signals_received, msg)) + .map_err(|e| e.into_send_error()) + }, + AllMessages::ChainApi(msg) => { + self.chain_api_unbounded + .unbounded_send(make_packet(signals_received, msg)) + .map_err(|e| e.into_send_error()) + }, + AllMessages::CollationGeneration(msg) => { + self.collation_generation_unbounded + .unbounded_send(make_packet(signals_received, msg)) + .map_err(|e| e.into_send_error()) + }, + AllMessages::CollatorProtocol(msg) => { + self.collator_protocol_unbounded + .unbounded_send(make_packet(signals_received, msg)) + .map_err(|e| e.into_send_error()) + }, + AllMessages::ApprovalDistribution(msg) => { + self.approval_distribution_unbounded + .unbounded_send(make_packet(signals_received, msg)) + .map_err(|e| e.into_send_error()) + }, + AllMessages::ApprovalVoting(msg) => { + self.approval_voting_unbounded + .unbounded_send(make_packet(signals_received, msg)) + .map_err(|e| e.into_send_error()) + }, + AllMessages::GossipSupport(msg) => { + self.gossip_support_unbounded + .unbounded_send(make_packet(signals_received, msg)) + .map_err(|e| e.into_send_error()) + }, + }; + + if res.is_err() { + tracing::debug!( + target: LOG_TARGET, + "Failed to send a message to another subsystem", + ); } } +} - /// Replace the `availability_recovery` instance in `self`. - pub fn replace_availability_recovery<NEW>( - self, - availability_recovery: NEW, - ) -> AllSubsystems<CV, CB, CS, SD, AD, NEW, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> { - AllSubsystems { - candidate_validation: self.candidate_validation, - candidate_backing: self.candidate_backing, - candidate_selection: self.candidate_selection, - statement_distribution: self.statement_distribution, - availability_distribution: self.availability_distribution, - availability_recovery, - bitfield_signing: self.bitfield_signing, - bitfield_distribution: self.bitfield_distribution, - provisioner: self.provisioner, - runtime_api: self.runtime_api, - availability_store: self.availability_store, - network_bridge: self.network_bridge, - chain_api: self.chain_api, - collation_generation: self.collation_generation, - collator_protocol: self.collator_protocol, - approval_distribution: self.approval_distribution, - approval_voting: self.approval_voting, - gossip_support: self.gossip_support, - } +type SubsystemIncomingMessages<M> = stream::Select< + metered::MeteredReceiver<MessagePacket<M>>, + metered::UnboundedMeteredReceiver<MessagePacket<M>>, +>; + +#[derive(Debug, Default, Clone)] +struct SignalsReceived(Arc<AtomicUsize>); + +impl SignalsReceived { + fn load(&self) -> usize { + self.0.load(atomic::Ordering::SeqCst) } - /// Replace the `bitfield_signing` instance in `self`. - pub fn replace_bitfield_signing<NEW>( - self, - bitfield_signing: NEW, - ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, NEW, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> { - AllSubsystems { - candidate_validation: self.candidate_validation, - candidate_backing: self.candidate_backing, - candidate_selection: self.candidate_selection, - statement_distribution: self.statement_distribution, - availability_distribution: self.availability_distribution, - availability_recovery: self.availability_recovery, - bitfield_signing, - bitfield_distribution: self.bitfield_distribution, - provisioner: self.provisioner, - runtime_api: self.runtime_api, - availability_store: self.availability_store, - network_bridge: self.network_bridge, - chain_api: self.chain_api, - collation_generation: self.collation_generation, - collator_protocol: self.collator_protocol, - approval_distribution: self.approval_distribution, - approval_voting: self.approval_voting, - gossip_support: self.gossip_support, - } + fn inc(&self) { + self.0.fetch_add(1, atomic::Ordering::SeqCst); } +} - /// Replace the `bitfield_distribution` instance in `self`. - pub fn replace_bitfield_distribution<NEW>( - self, - bitfield_distribution: NEW, - ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, NEW, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> { - AllSubsystems { - candidate_validation: self.candidate_validation, - candidate_backing: self.candidate_backing, - candidate_selection: self.candidate_selection, - statement_distribution: self.statement_distribution, - availability_distribution: self.availability_distribution, - availability_recovery: self.availability_recovery, - bitfield_signing: self.bitfield_signing, - bitfield_distribution, - provisioner: self.provisioner, - runtime_api: self.runtime_api, - availability_store: self.availability_store, - network_bridge: self.network_bridge, - chain_api: self.chain_api, - collation_generation: self.collation_generation, - collator_protocol: self.collator_protocol, - approval_distribution: self.approval_distribution, - approval_voting: self.approval_voting, - gossip_support: self.gossip_support, - } +/// A sender from subsystems to other subsystems. +#[derive(Debug, Clone)] +pub struct OverseerSubsystemSender { + channels: ChannelsOut, + signals_received: SignalsReceived, +} + +#[async_trait::async_trait] +impl SubsystemSender for OverseerSubsystemSender { + async fn send_message(&mut self, msg: AllMessages) { + self.channels.send_and_log_error(self.signals_received.load(), msg).await; } - /// Replace the `provisioner` instance in `self`. - pub fn replace_provisioner<NEW>( - self, - provisioner: NEW, - ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, NEW, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> { - AllSubsystems { - candidate_validation: self.candidate_validation, - candidate_backing: self.candidate_backing, - candidate_selection: self.candidate_selection, - statement_distribution: self.statement_distribution, - availability_distribution: self.availability_distribution, - availability_recovery: self.availability_recovery, - bitfield_signing: self.bitfield_signing, - bitfield_distribution: self.bitfield_distribution, - provisioner, - runtime_api: self.runtime_api, - availability_store: self.availability_store, - network_bridge: self.network_bridge, - chain_api: self.chain_api, - collation_generation: self.collation_generation, - collator_protocol: self.collator_protocol, - approval_distribution: self.approval_distribution, - approval_voting: self.approval_voting, - gossip_support: self.gossip_support, + async fn send_messages<T>(&mut self, msgs: T) + where T: IntoIterator<Item = AllMessages> + Send, T::IntoIter: Send + { + // This can definitely be optimized if necessary. + for msg in msgs { + self.send_message(msg).await; } } - /// Replace the `runtime_api` instance in `self`. - pub fn replace_runtime_api<NEW>( - self, - runtime_api: NEW, - ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, NEW, AS, NB, CA, CG, CP, ApD, ApV, GS> { - AllSubsystems { - candidate_validation: self.candidate_validation, - candidate_backing: self.candidate_backing, - candidate_selection: self.candidate_selection, - statement_distribution: self.statement_distribution, - availability_distribution: self.availability_distribution, - availability_recovery: self.availability_recovery, - bitfield_signing: self.bitfield_signing, - bitfield_distribution: self.bitfield_distribution, - provisioner: self.provisioner, - runtime_api, - availability_store: self.availability_store, - network_bridge: self.network_bridge, - chain_api: self.chain_api, - collation_generation: self.collation_generation, - collator_protocol: self.collator_protocol, - approval_distribution: self.approval_distribution, - approval_voting: self.approval_voting, - gossip_support: self.gossip_support, - } + fn send_unbounded_message(&mut self, msg: AllMessages) { + self.channels.send_unbounded_and_log_error(self.signals_received.load(), msg); } +} - /// Replace the `availability_store` instance in `self`. - pub fn replace_availability_store<NEW>( - self, - availability_store: NEW, - ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, RA, NEW, NB, CA, CG, CP, ApD, ApV, GS> { - AllSubsystems { - candidate_validation: self.candidate_validation, - candidate_backing: self.candidate_backing, - candidate_selection: self.candidate_selection, - statement_distribution: self.statement_distribution, - availability_distribution: self.availability_distribution, - availability_recovery: self.availability_recovery, - bitfield_signing: self.bitfield_signing, - bitfield_distribution: self.bitfield_distribution, - provisioner: self.provisioner, - runtime_api: self.runtime_api, - availability_store, - network_bridge: self.network_bridge, - chain_api: self.chain_api, - collation_generation: self.collation_generation, - collator_protocol: self.collator_protocol, - approval_distribution: self.approval_distribution, - approval_voting: self.approval_voting, - gossip_support: self.gossip_support, - } +/// A context type that is given to the [`Subsystem`] upon spawning. +/// It can be used by [`Subsystem`] to communicate with other [`Subsystem`]s +/// or to spawn it's [`SubsystemJob`]s. +/// +/// [`Overseer`]: struct.Overseer.html +/// [`Subsystem`]: trait.Subsystem.html +/// [`SubsystemJob`]: trait.SubsystemJob.html +#[derive(Debug)] +pub struct OverseerSubsystemContext<M>{ + signals: metered::MeteredReceiver<OverseerSignal>, + messages: SubsystemIncomingMessages<M>, + to_subsystems: OverseerSubsystemSender, + to_overseer: metered::UnboundedMeteredSender<ToOverseer>, + signals_received: SignalsReceived, + pending_incoming: Option<(usize, M)>, + metrics: Metrics, +} + +impl<M> OverseerSubsystemContext<M> { + /// Create a new `OverseerSubsystemContext`. + fn new( + signals: metered::MeteredReceiver<OverseerSignal>, + messages: SubsystemIncomingMessages<M>, + to_subsystems: ChannelsOut, + to_overseer: metered::UnboundedMeteredSender<ToOverseer>, + metrics: Metrics, + ) -> Self { + let signals_received = SignalsReceived::default(); + OverseerSubsystemContext { + signals, + messages, + to_subsystems: OverseerSubsystemSender { + channels: to_subsystems, + signals_received: signals_received.clone(), + }, + to_overseer, + signals_received, + pending_incoming: None, + metrics, + } } - /// Replace the `network_bridge` instance in `self`. - pub fn replace_network_bridge<NEW>( - self, - network_bridge: NEW, - ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, RA, AS, NEW, CA, CG, CP, ApD, ApV, GS> { - AllSubsystems { - candidate_validation: self.candidate_validation, - candidate_backing: self.candidate_backing, - candidate_selection: self.candidate_selection, - statement_distribution: self.statement_distribution, - availability_distribution: self.availability_distribution, - availability_recovery: self.availability_recovery, - bitfield_signing: self.bitfield_signing, - bitfield_distribution: self.bitfield_distribution, - provisioner: self.provisioner, - runtime_api: self.runtime_api, - availability_store: self.availability_store, - network_bridge, - chain_api: self.chain_api, - collation_generation: self.collation_generation, - collator_protocol: self.collator_protocol, - approval_distribution: self.approval_distribution, - approval_voting: self.approval_voting, - gossip_support: self.gossip_support, - } + /// Create a new `OverseserSubsystemContext` with no metering. + /// + /// Intended for tests. + #[allow(unused)] + fn new_unmetered( + signals: metered::MeteredReceiver<OverseerSignal>, + messages: SubsystemIncomingMessages<M>, + to_subsystems: ChannelsOut, + to_overseer: metered::UnboundedMeteredSender<ToOverseer>, + ) -> Self { + let metrics = Metrics::default(); + OverseerSubsystemContext::new(signals, messages, to_subsystems, to_overseer, metrics) } +} - /// Replace the `chain_api` instance in `self`. - pub fn replace_chain_api<NEW>( - self, - chain_api: NEW, - ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, RA, AS, NB, NEW, CG, CP, ApD, ApV, GS> { - AllSubsystems { - candidate_validation: self.candidate_validation, - candidate_backing: self.candidate_backing, - candidate_selection: self.candidate_selection, - statement_distribution: self.statement_distribution, - availability_distribution: self.availability_distribution, - availability_recovery: self.availability_recovery, - bitfield_signing: self.bitfield_signing, - bitfield_distribution: self.bitfield_distribution, - provisioner: self.provisioner, - runtime_api: self.runtime_api, - availability_store: self.availability_store, - network_bridge: self.network_bridge, - chain_api, - collation_generation: self.collation_generation, - collator_protocol: self.collator_protocol, - approval_distribution: self.approval_distribution, - approval_voting: self.approval_voting, - gossip_support: self.gossip_support, +#[async_trait::async_trait] +impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> { + type Message = M; + type Sender = OverseerSubsystemSender; + + async fn try_recv(&mut self) -> Result<Option<FromOverseer<M>>, ()> { + match poll!(self.recv()) { + Poll::Ready(msg) => Ok(Some(msg.map_err(|_| ())?)), + Poll::Pending => Ok(None), } } - /// Replace the `collation_generation` instance in `self`. - pub fn replace_collation_generation<NEW>( - self, - collation_generation: NEW, - ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, NEW, CP, ApD, ApV, GS> { - AllSubsystems { - candidate_validation: self.candidate_validation, - candidate_backing: self.candidate_backing, - candidate_selection: self.candidate_selection, - statement_distribution: self.statement_distribution, - availability_distribution: self.availability_distribution, - availability_recovery: self.availability_recovery, - bitfield_signing: self.bitfield_signing, - bitfield_distribution: self.bitfield_distribution, - provisioner: self.provisioner, - runtime_api: self.runtime_api, - availability_store: self.availability_store, - network_bridge: self.network_bridge, - chain_api: self.chain_api, - collation_generation, - collator_protocol: self.collator_protocol, - approval_distribution: self.approval_distribution, - approval_voting: self.approval_voting, - gossip_support: self.gossip_support, + async fn recv(&mut self) -> SubsystemResult<FromOverseer<M>> { + loop { + // If we have a message pending an overseer signal, we only poll for signals + // in the meantime. + if let Some((needs_signals_received, msg)) = self.pending_incoming.take() { + if needs_signals_received <= self.signals_received.load() { + return Ok(FromOverseer::Communication { msg }); + } else { + self.pending_incoming = Some((needs_signals_received, msg)); + + // wait for next signal. + let signal = self.signals.next().await + .ok_or(SubsystemError::Context( + "No more messages in rx queue to process" + .to_owned() + ))?; + + self.signals_received.inc(); + return Ok(FromOverseer::Signal(signal)) + } + } + + let mut await_message = self.messages.next().fuse(); + let mut await_signal = self.signals.next().fuse(); + let signals_received = self.signals_received.load(); + let pending_incoming = &mut self.pending_incoming; + + // Otherwise, wait for the next signal or incoming message. + let from_overseer = futures::select_biased! { + signal = await_signal => { + let signal = signal + .ok_or(SubsystemError::Context( + "No more messages in rx queue to process" + .to_owned() + ))?; + + FromOverseer::Signal(signal) + } + msg = await_message => { + let packet = msg + .ok_or(SubsystemError::Context( + "No more messages in rx queue to process" + .to_owned() + ))?; + + if packet.signals_received > signals_received { + // wait until we've received enough signals to return this message. + *pending_incoming = Some((packet.signals_received, packet.message)); + continue; + } else { + // we know enough to return this message. + FromOverseer::Communication { msg: packet.message} + } + } + }; + + if let FromOverseer::Signal(_) = from_overseer { + self.signals_received.inc(); + } + + return Ok(from_overseer); } } - /// Replace the `collator_protocol` instance in `self`. - pub fn replace_collator_protocol<NEW>( - self, - collator_protocol: NEW, - ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, NEW, ApD, ApV, GS> { - AllSubsystems { - candidate_validation: self.candidate_validation, - candidate_backing: self.candidate_backing, - candidate_selection: self.candidate_selection, - statement_distribution: self.statement_distribution, - availability_distribution: self.availability_distribution, - availability_recovery: self.availability_recovery, - bitfield_signing: self.bitfield_signing, - bitfield_distribution: self.bitfield_distribution, - provisioner: self.provisioner, - runtime_api: self.runtime_api, - availability_store: self.availability_store, - network_bridge: self.network_bridge, - chain_api: self.chain_api, - collation_generation: self.collation_generation, - collator_protocol, - approval_distribution: self.approval_distribution, - approval_voting: self.approval_voting, - gossip_support: self.gossip_support, - } + async fn spawn(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>) + -> SubsystemResult<()> + { + self.to_overseer.send(ToOverseer::SpawnJob { + name, + s, + }).await.map_err(Into::into) } - /// Replace the `approval_distribution` instance in `self`. - pub fn replace_approval_distribution<NEW>( - self, - approval_distribution: NEW, - ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, NEW, ApV, GS> { - AllSubsystems { - candidate_validation: self.candidate_validation, - candidate_backing: self.candidate_backing, - candidate_selection: self.candidate_selection, - statement_distribution: self.statement_distribution, - availability_distribution: self.availability_distribution, - availability_recovery: self.availability_recovery, - bitfield_signing: self.bitfield_signing, - bitfield_distribution: self.bitfield_distribution, - provisioner: self.provisioner, - runtime_api: self.runtime_api, - availability_store: self.availability_store, - network_bridge: self.network_bridge, - chain_api: self.chain_api, - collation_generation: self.collation_generation, - collator_protocol: self.collator_protocol, - approval_distribution, - approval_voting: self.approval_voting, - gossip_support: self.gossip_support, - } + async fn spawn_blocking(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>) + -> SubsystemResult<()> + { + self.to_overseer.send(ToOverseer::SpawnBlockingJob { + name, + s, + }).await.map_err(Into::into) } - /// Replace the `approval_voting` instance in `self`. - pub fn replace_approval_voting<NEW>( - self, - approval_voting: NEW, - ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, NEW, GS> { - AllSubsystems { - candidate_validation: self.candidate_validation, - candidate_backing: self.candidate_backing, - candidate_selection: self.candidate_selection, - statement_distribution: self.statement_distribution, - availability_distribution: self.availability_distribution, - availability_recovery: self.availability_recovery, - bitfield_signing: self.bitfield_signing, - bitfield_distribution: self.bitfield_distribution, - provisioner: self.provisioner, - runtime_api: self.runtime_api, - availability_store: self.availability_store, - network_bridge: self.network_bridge, - chain_api: self.chain_api, - collation_generation: self.collation_generation, - collator_protocol: self.collator_protocol, - approval_distribution: self.approval_distribution, - approval_voting, - gossip_support: self.gossip_support, - } + fn sender(&mut self) -> &mut OverseerSubsystemSender { + &mut self.to_subsystems } +} - /// Replace the `gossip_support` instance in `self`. - pub fn replace_gossip_support<NEW>( - self, - gossip_support: NEW, - ) -> AllSubsystems<CV, CB, CS, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, NEW> { - AllSubsystems { - candidate_validation: self.candidate_validation, - candidate_backing: self.candidate_backing, - candidate_selection: self.candidate_selection, - statement_distribution: self.statement_distribution, - availability_distribution: self.availability_distribution, - availability_recovery: self.availability_recovery, - bitfield_signing: self.bitfield_signing, - bitfield_distribution: self.bitfield_distribution, - provisioner: self.provisioner, - runtime_api: self.runtime_api, - availability_store: self.availability_store, - network_bridge: self.network_bridge, - chain_api: self.chain_api, - collation_generation: self.collation_generation, - collator_protocol: self.collator_protocol, - approval_distribution: self.approval_distribution, - approval_voting: self.approval_voting, - gossip_support, +/// A subsystem that we oversee. +/// +/// Ties together the [`Subsystem`] itself and it's running instance +/// (which may be missing if the [`Subsystem`] is not running at the moment +/// for whatever reason). +/// +/// [`Subsystem`]: trait.Subsystem.html +struct OverseenSubsystem<M> { + instance: Option<SubsystemInstance<M>>, +} + +impl<M> OverseenSubsystem<M> { + /// Send a message to the wrapped subsystem. + /// + /// If the inner `instance` is `None`, nothing is happening. + async fn send_message(&mut self, msg: M) -> SubsystemResult<()> { + const MESSAGE_TIMEOUT: Duration = Duration::from_secs(10); + + if let Some(ref mut instance) = self.instance { + match instance.tx_bounded.send(MessagePacket { + signals_received: instance.signals_received, + message: msg.into() + }).timeout(MESSAGE_TIMEOUT).await + { + None => { + tracing::error!(target: LOG_TARGET, "Subsystem {} appears unresponsive.", instance.name); + Err(SubsystemError::SubsystemStalled(instance.name)) + } + Some(res) => res.map_err(Into::into), + } + } else { + Ok(()) } } - fn as_ref(&self) -> AllSubsystems<&'_ CV, &'_ CB, &'_ CS, &'_ SD, &'_ AD, &'_ AR, &'_ BS, &'_ BD, &'_ P, &'_ RA, &'_ AS, &'_ NB, &'_ CA, &'_ CG, &'_ CP, &'_ ApD, &'_ ApV, &'_ GS> { - AllSubsystems { - candidate_validation: &self.candidate_validation, - candidate_backing: &self.candidate_backing, - candidate_selection: &self.candidate_selection, - statement_distribution: &self.statement_distribution, - availability_distribution: &self.availability_distribution, - availability_recovery: &self.availability_recovery, - bitfield_signing: &self.bitfield_signing, - bitfield_distribution: &self.bitfield_distribution, - provisioner: &self.provisioner, - runtime_api: &self.runtime_api, - availability_store: &self.availability_store, - network_bridge: &self.network_bridge, - chain_api: &self.chain_api, - collation_generation: &self.collation_generation, - collator_protocol: &self.collator_protocol, - approval_distribution: &self.approval_distribution, - approval_voting: &self.approval_voting, - gossip_support: &self.gossip_support, + /// Send a signal to the wrapped subsystem. + /// + /// If the inner `instance` is `None`, nothing is happening. + async fn send_signal(&mut self, signal: OverseerSignal) -> SubsystemResult<()> { + const SIGNAL_TIMEOUT: Duration = Duration::from_secs(10); + + if let Some(ref mut instance) = self.instance { + match instance.tx_signal.send(signal).timeout(SIGNAL_TIMEOUT).await { + None => { + tracing::error!(target: LOG_TARGET, "Subsystem {} appears unresponsive.", instance.name); + Err(SubsystemError::SubsystemStalled(instance.name)) + } + Some(res) => { + let res = res.map_err(Into::into); + if res.is_ok() { + instance.signals_received += 1; + } + res + } + } + } else { + Ok(()) } } +} - fn map_subsystems<M>(self, m: M) - -> AllSubsystems< - <M as MapSubsystem<CV>>::Output, - <M as MapSubsystem<CB>>::Output, - <M as MapSubsystem<CS>>::Output, - <M as MapSubsystem<SD>>::Output, - <M as MapSubsystem<AD>>::Output, - <M as MapSubsystem<AR>>::Output, - <M as MapSubsystem<BS>>::Output, - <M as MapSubsystem<BD>>::Output, - <M as MapSubsystem<P>>::Output, - <M as MapSubsystem<RA>>::Output, - <M as MapSubsystem<AS>>::Output, - <M as MapSubsystem<NB>>::Output, - <M as MapSubsystem<CA>>::Output, - <M as MapSubsystem<CG>>::Output, - <M as MapSubsystem<CP>>::Output, - <M as MapSubsystem<ApD>>::Output, - <M as MapSubsystem<ApV>>::Output, - <M as MapSubsystem<GS>>::Output, - > - where - M: MapSubsystem<CV>, - M: MapSubsystem<CB>, - M: MapSubsystem<CS>, - M: MapSubsystem<SD>, - M: MapSubsystem<AD>, - M: MapSubsystem<AR>, - M: MapSubsystem<BS>, - M: MapSubsystem<BD>, - M: MapSubsystem<P>, - M: MapSubsystem<RA>, - M: MapSubsystem<AS>, - M: MapSubsystem<NB>, - M: MapSubsystem<CA>, - M: MapSubsystem<CG>, - M: MapSubsystem<CP>, - M: MapSubsystem<ApD>, - M: MapSubsystem<ApV>, - M: MapSubsystem<GS>, - { - AllSubsystems { - candidate_validation: m.map_subsystem(self.candidate_validation), - candidate_backing: m.map_subsystem(self.candidate_backing), - candidate_selection: m.map_subsystem(self.candidate_selection), - statement_distribution: m.map_subsystem(self.statement_distribution), - availability_distribution: m.map_subsystem(self.availability_distribution), - availability_recovery: m.map_subsystem(self.availability_recovery), - bitfield_signing: m.map_subsystem(self.bitfield_signing), - bitfield_distribution: m.map_subsystem(self.bitfield_distribution), - provisioner: m.map_subsystem(self.provisioner), - runtime_api: m.map_subsystem(self.runtime_api), - availability_store: m.map_subsystem(self.availability_store), - network_bridge: m.map_subsystem(self.network_bridge), - chain_api: m.map_subsystem(self.chain_api), - collation_generation: m.map_subsystem(self.collation_generation), - collator_protocol: m.map_subsystem(self.collator_protocol), - approval_distribution: m.map_subsystem(self.approval_distribution), - approval_voting: m.map_subsystem(self.approval_voting), - gossip_support: m.map_subsystem(self.gossip_support), +#[derive(Clone)] +struct SubsystemMeters { + bounded: metered::Meter, + unbounded: metered::Meter, + signals: metered::Meter, +} + +impl SubsystemMeters { + fn read(&self) -> SubsystemMeterReadouts { + SubsystemMeterReadouts { + bounded: self.bounded.read(), + unbounded: self.unbounded.read(), + signals: self.signals.read(), } } } -type AllSubsystemsSame<T> = AllSubsystems< - T, T, T, T, T, - T, T, T, T, T, - T, T, T, T, T, - T, T, T, ->; +struct SubsystemMeterReadouts { + bounded: metered::Readout, + unbounded: metered::Readout, + signals: metered::Readout, +} + +/// The `Overseer` itself. +pub struct Overseer<S> { + /// Handles to all subsystems. + subsystems: AllSubsystems< + OverseenSubsystem<CandidateValidationMessage>, + OverseenSubsystem<CandidateBackingMessage>, + OverseenSubsystem<CandidateSelectionMessage>, + OverseenSubsystem<StatementDistributionMessage>, + OverseenSubsystem<AvailabilityDistributionMessage>, + OverseenSubsystem<AvailabilityRecoveryMessage>, + OverseenSubsystem<BitfieldSigningMessage>, + OverseenSubsystem<BitfieldDistributionMessage>, + OverseenSubsystem<ProvisionerMessage>, + OverseenSubsystem<RuntimeApiMessage>, + OverseenSubsystem<AvailabilityStoreMessage>, + OverseenSubsystem<NetworkBridgeMessage>, + OverseenSubsystem<ChainApiMessage>, + OverseenSubsystem<CollationGenerationMessage>, + OverseenSubsystem<CollatorProtocolMessage>, + OverseenSubsystem<ApprovalDistributionMessage>, + OverseenSubsystem<ApprovalVotingMessage>, + OverseenSubsystem<GossipSupportMessage>, + >, + + /// Spawner to spawn tasks to. + s: S, + + /// Here we keep handles to spawned subsystems to be notified when they terminate. + running_subsystems: FuturesUnordered<BoxFuture<'static, SubsystemResult<()>>>, + + /// Gather running subsystems' outbound streams into one. + to_overseer_rx: Fuse<metered::UnboundedMeteredReceiver<ToOverseer>>, + + /// Events that are sent to the overseer from the outside world + events_rx: metered::MeteredReceiver<Event>, + + /// External listeners waiting for a hash to be in the active-leave set. + activation_external_listeners: HashMap<Hash, Vec<oneshot::Sender<SubsystemResult<()>>>>, + + /// Stores the [`jaeger::Span`] per active leaf. + span_per_active_leaf: HashMap<Hash, Arc<jaeger::Span>>, + + /// A set of leaves that `Overseer` starts working with. + /// + /// Drained at the beginning of `run` and never used again. + leaves: Vec<(Hash, BlockNumber)>, + + /// The set of the "active leaves". + active_leaves: HashMap<Hash, BlockNumber>, + + /// Various Prometheus metrics. + metrics: Metrics, +} /// Overseer Prometheus metrics. #[derive(Clone)] @@ -1271,11 +1571,12 @@ struct MetricsInner { activated_heads_total: prometheus::Counter<prometheus::U64>, deactivated_heads_total: prometheus::Counter<prometheus::U64>, messages_relayed_total: prometheus::Counter<prometheus::U64>, - message_relay_timings: prometheus::Histogram, - to_overseer_sent: prometheus::Gauge<prometheus::U64>, - to_overseer_received: prometheus::Gauge<prometheus::U64>, - from_overseer_sent: prometheus::GaugeVec<prometheus::U64>, - from_overseer_received: prometheus::GaugeVec<prometheus::U64>, + to_subsystem_bounded_sent: prometheus::GaugeVec<prometheus::U64>, + to_subsystem_bounded_received: prometheus::GaugeVec<prometheus::U64>, + to_subsystem_unbounded_sent: prometheus::GaugeVec<prometheus::U64>, + to_subsystem_unbounded_received: prometheus::GaugeVec<prometheus::U64>, + signals_sent: prometheus::GaugeVec<prometheus::U64>, + signals_received: prometheus::GaugeVec<prometheus::U64>, } #[derive(Default, Clone)] @@ -1300,27 +1601,31 @@ impl Metrics { } } - /// Provide a timer for the duration between receiving a message and passing it to `route_message` - fn time_message_hold(&self) -> MaybeTimer { - self.0.as_ref().map(|metrics| metrics.message_relay_timings.start_timer()) - } - fn channel_fill_level_snapshot( &self, - from_overseer: AllSubsystemsSame<(&'static str, metered::Readout)>, - to_overseer: metered::Readout, + to_subsystem: AllSubsystemsSame<(&'static str, SubsystemMeterReadouts)>, ) { self.0.as_ref().map(|metrics| { - from_overseer.map_subsystems(|(name, readout): (_, metered::Readout)| { - metrics.from_overseer_sent.with_label_values(&[name]) - .set(readout.sent as u64); + to_subsystem.map_subsystems( + |(name, readouts): (_, SubsystemMeterReadouts)| { + metrics.to_subsystem_bounded_sent.with_label_values(&[name]) + .set(readouts.bounded.sent as u64); - metrics.from_overseer_received.with_label_values(&[name]) - .set(readout.received as u64); - }); + metrics.to_subsystem_bounded_received.with_label_values(&[name]) + .set(readouts.bounded.received as u64); + + metrics.to_subsystem_unbounded_sent.with_label_values(&[name]) + .set(readouts.unbounded.sent as u64); + + metrics.to_subsystem_unbounded_received.with_label_values(&[name]) + .set(readouts.unbounded.received as u64); + + metrics.signals_sent.with_label_values(&[name]) + .set(readouts.signals.sent as u64); - metrics.to_overseer_sent.set(to_overseer.sent as u64); - metrics.to_overseer_received.set(to_overseer.received as u64); + metrics.signals_received.with_label_values(&[name]) + .set(readouts.signals.received as u64); + }); }); } } @@ -1349,33 +1654,35 @@ impl metrics::Metrics for Metrics { )?, registry, )?, - message_relay_timings: prometheus::register( - prometheus::Histogram::with_opts( - prometheus::HistogramOpts { - common_opts: prometheus::Opts::new( - "parachain_overseer_messages_relay_timings", - "Time spent holding a message in the overseer before passing it to `route_message`", - ), - // guessing at the desired resolution, but we know that messages will time - // out after 0.5 seconds, so the bucket set below seems plausible: - // `0.0001 * (1.6 ^ 18) ~= 0.472`. Prometheus auto-generates a final bucket - // for all values between the final value and `+Inf`, so this should work. - // - // The documented legal range for the inputs are: - // - // - `> 0.0` - // - `> 1.0` - // - `! 0` - buckets: prometheus::exponential_buckets(0.0001, 1.6, 18).expect("inputs are within documented range; qed"), - } + to_subsystem_bounded_sent: prometheus::register( + prometheus::GaugeVec::<prometheus::U64>::new( + prometheus::Opts::new( + "parachain_subsystem_bounded_sent", + "Number of elements sent to subsystems' bounded queues", + ), + &[ + "subsystem_name", + ], + )?, + registry, + )?, + to_subsystem_bounded_received: prometheus::register( + prometheus::GaugeVec::<prometheus::U64>::new( + prometheus::Opts::new( + "parachain_subsystem_bounded_received", + "Number of elements received by subsystems' bounded queues", + ), + &[ + "subsystem_name", + ], )?, registry, )?, - from_overseer_sent: prometheus::register( + to_subsystem_unbounded_sent: prometheus::register( prometheus::GaugeVec::<prometheus::U64>::new( prometheus::Opts::new( - "parachain_from_overseer_sent", - "Number of elements sent by the overseer to subsystems", + "parachain_subsystem_unbounded_sent", + "Number of elements sent to subsystems' unbounded queues", ), &[ "subsystem_name", @@ -1383,11 +1690,11 @@ impl metrics::Metrics for Metrics { )?, registry, )?, - from_overseer_received: prometheus::register( + to_subsystem_unbounded_received: prometheus::register( prometheus::GaugeVec::<prometheus::U64>::new( prometheus::Opts::new( - "parachain_from_overseer_received", - "Number of elements received by subsystems from overseer", + "parachain_subsystem_unbounded_received", + "Number of elements received by subsystems' unbounded queues", ), &[ "subsystem_name", @@ -1395,21 +1702,27 @@ impl metrics::Metrics for Metrics { )?, registry, )?, - to_overseer_sent: prometheus::register( - prometheus::Gauge::<prometheus::U64>::with_opts( + signals_sent: prometheus::register( + prometheus::GaugeVec::<prometheus::U64>::new( prometheus::Opts::new( - "parachain_to_overseer_sent", - "Number of elements sent by subsystems to overseer", + "parachain_overseer_signals_sent", + "Number of signals sent by overseer to subsystems", ), + &[ + "subsystem_name", + ], )?, registry, )?, - to_overseer_received: prometheus::register( - prometheus::Gauge::<prometheus::U64>::with_opts( + signals_received: prometheus::register( + prometheus::GaugeVec::<prometheus::U64>::new( prometheus::Opts::new( - "parachain_to_overseer_received", - "Number of element received by overseer from subsystems", + "parachain_overseer_signals_received", + "Number of signals received by subsystems from overseer", ), + &[ + "subsystem_name", + ], )?, registry, )?, @@ -1537,7 +1850,7 @@ where ApV: Subsystem<OverseerSubsystemContext<ApprovalVotingMessage>> + Send, GS: Subsystem<OverseerSubsystemContext<GossipSupportMessage>> + Send, { - let (events_tx, events_rx) = metered::channel(CHANNEL_CAPACITY, "overseer_events"); + let (events_tx, events_rx) = metered::channel(CHANNEL_CAPACITY); let handler = OverseerHandler { events_tx: events_tx.clone(), @@ -1545,190 +1858,355 @@ where let metrics = <Metrics as metrics::Metrics>::register(prometheus_registry)?; - let (to_overseer_tx, to_overseer_rx) = metered::unbounded("to_overseer"); + let (to_overseer_tx, to_overseer_rx) = metered::unbounded(); let mut running_subsystems = FuturesUnordered::new(); - let mut seed = 0x533d; // arbitrary + let (candidate_validation_bounded_tx, candidate_validation_bounded_rx) + = metered::channel(CHANNEL_CAPACITY); + let (candidate_backing_bounded_tx, candidate_backing_bounded_rx) + = metered::channel(CHANNEL_CAPACITY); + let (candidate_selection_bounded_tx, candidate_selection_bounded_rx) + = metered::channel(CHANNEL_CAPACITY); + let (statement_distribution_bounded_tx, statement_distribution_bounded_rx) + = metered::channel(CHANNEL_CAPACITY); + let (availability_distribution_bounded_tx, availability_distribution_bounded_rx) + = metered::channel(CHANNEL_CAPACITY); + let (availability_recovery_bounded_tx, availability_recovery_bounded_rx) + = metered::channel(CHANNEL_CAPACITY); + let (bitfield_signing_bounded_tx, bitfield_signing_bounded_rx) + = metered::channel(CHANNEL_CAPACITY); + let (bitfield_distribution_bounded_tx, bitfield_distribution_bounded_rx) + = metered::channel(CHANNEL_CAPACITY); + let (provisioner_bounded_tx, provisioner_bounded_rx) + = metered::channel(CHANNEL_CAPACITY); + let (runtime_api_bounded_tx, runtime_api_bounded_rx) + = metered::channel(CHANNEL_CAPACITY); + let (availability_store_bounded_tx, availability_store_bounded_rx) + = metered::channel(CHANNEL_CAPACITY); + let (network_bridge_bounded_tx, network_bridge_bounded_rx) + = metered::channel(CHANNEL_CAPACITY); + let (chain_api_bounded_tx, chain_api_bounded_rx) + = metered::channel(CHANNEL_CAPACITY); + let (collator_protocol_bounded_tx, collator_protocol_bounded_rx) + = metered::channel(CHANNEL_CAPACITY); + let (collation_generation_bounded_tx, collation_generation_bounded_rx) + = metered::channel(CHANNEL_CAPACITY); + let (approval_distribution_bounded_tx, approval_distribution_bounded_rx) + = metered::channel(CHANNEL_CAPACITY); + let (approval_voting_bounded_tx, approval_voting_bounded_rx) + = metered::channel(CHANNEL_CAPACITY); + let (gossip_support_bounded_tx, gossip_support_bounded_rx) + = metered::channel(CHANNEL_CAPACITY); + + let (candidate_validation_unbounded_tx, candidate_validation_unbounded_rx) + = metered::unbounded(); + let (candidate_backing_unbounded_tx, candidate_backing_unbounded_rx) + = metered::unbounded(); + let (candidate_selection_unbounded_tx, candidate_selection_unbounded_rx) + = metered::unbounded(); + let (statement_distribution_unbounded_tx, statement_distribution_unbounded_rx) + = metered::unbounded(); + let (availability_distribution_unbounded_tx, availability_distribution_unbounded_rx) + = metered::unbounded(); + let (availability_recovery_unbounded_tx, availability_recovery_unbounded_rx) + = metered::unbounded(); + let (bitfield_signing_unbounded_tx, bitfield_signing_unbounded_rx) + = metered::unbounded(); + let (bitfield_distribution_unbounded_tx, bitfield_distribution_unbounded_rx) + = metered::unbounded(); + let (provisioner_unbounded_tx, provisioner_unbounded_rx) + = metered::unbounded(); + let (runtime_api_unbounded_tx, runtime_api_unbounded_rx) + = metered::unbounded(); + let (availability_store_unbounded_tx, availability_store_unbounded_rx) + = metered::unbounded(); + let (network_bridge_unbounded_tx, network_bridge_unbounded_rx) + = metered::unbounded(); + let (chain_api_unbounded_tx, chain_api_unbounded_rx) + = metered::unbounded(); + let (collator_protocol_unbounded_tx, collator_protocol_unbounded_rx) + = metered::unbounded(); + let (collation_generation_unbounded_tx, collation_generation_unbounded_rx) + = metered::unbounded(); + let (approval_distribution_unbounded_tx, approval_distribution_unbounded_rx) + = metered::unbounded(); + let (approval_voting_unbounded_tx, approval_voting_unbounded_rx) + = metered::unbounded(); + let (gossip_support_unbounded_tx, gossip_support_unbounded_rx) + = metered::unbounded(); + + let channels_out = ChannelsOut { + candidate_validation: candidate_validation_bounded_tx.clone(), + candidate_backing: candidate_backing_bounded_tx.clone(), + candidate_selection: candidate_selection_bounded_tx.clone(), + statement_distribution: statement_distribution_bounded_tx.clone(), + availability_distribution: availability_distribution_bounded_tx.clone(), + availability_recovery: availability_recovery_bounded_tx.clone(), + bitfield_signing: bitfield_signing_bounded_tx.clone(), + bitfield_distribution: bitfield_distribution_bounded_tx.clone(), + provisioner: provisioner_bounded_tx.clone(), + runtime_api: runtime_api_bounded_tx.clone(), + availability_store: availability_store_bounded_tx.clone(), + network_bridge: network_bridge_bounded_tx.clone(), + chain_api: chain_api_bounded_tx.clone(), + collator_protocol: collator_protocol_bounded_tx.clone(), + collation_generation: collation_generation_bounded_tx.clone(), + approval_distribution: approval_distribution_bounded_tx.clone(), + approval_voting: approval_voting_bounded_tx.clone(), + gossip_support: gossip_support_bounded_tx.clone(), + + candidate_validation_unbounded: candidate_validation_unbounded_tx.clone(), + candidate_backing_unbounded: candidate_backing_unbounded_tx.clone(), + candidate_selection_unbounded: candidate_selection_unbounded_tx.clone(), + statement_distribution_unbounded: statement_distribution_unbounded_tx.clone(), + availability_distribution_unbounded: availability_distribution_unbounded_tx.clone(), + availability_recovery_unbounded: availability_recovery_unbounded_tx.clone(), + bitfield_signing_unbounded: bitfield_signing_unbounded_tx.clone(), + bitfield_distribution_unbounded: bitfield_distribution_unbounded_tx.clone(), + provisioner_unbounded: provisioner_unbounded_tx.clone(), + runtime_api_unbounded: runtime_api_unbounded_tx.clone(), + availability_store_unbounded: availability_store_unbounded_tx.clone(), + network_bridge_unbounded: network_bridge_unbounded_tx.clone(), + chain_api_unbounded: chain_api_unbounded_tx.clone(), + collator_protocol_unbounded: collator_protocol_unbounded_tx.clone(), + collation_generation_unbounded: collation_generation_unbounded_tx.clone(), + approval_distribution_unbounded: approval_distribution_unbounded_tx.clone(), + approval_voting_unbounded: approval_voting_unbounded_tx.clone(), + gossip_support_unbounded: gossip_support_unbounded_tx.clone(), + }; let candidate_validation_subsystem = spawn( &mut s, - &mut running_subsystems, - metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), + candidate_validation_bounded_tx, + stream::select(candidate_validation_bounded_rx, candidate_validation_unbounded_rx), + candidate_validation_unbounded_tx.meter().clone(), + channels_out.clone(), + to_overseer_tx.clone(), all_subsystems.candidate_validation, &metrics, - &mut seed, + &mut running_subsystems, TaskKind::Regular, )?; let candidate_backing_subsystem = spawn( &mut s, - &mut running_subsystems, - metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), + candidate_backing_bounded_tx, + stream::select(candidate_backing_bounded_rx, candidate_backing_unbounded_rx), + candidate_backing_unbounded_tx.meter().clone(), + channels_out.clone(), + to_overseer_tx.clone(), all_subsystems.candidate_backing, &metrics, - &mut seed, + &mut running_subsystems, TaskKind::Regular, )?; let candidate_selection_subsystem = spawn( &mut s, - &mut running_subsystems, - metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), + candidate_selection_bounded_tx, + stream::select(candidate_selection_bounded_rx, candidate_selection_unbounded_rx), + candidate_selection_unbounded_tx.meter().clone(), + channels_out.clone(), + to_overseer_tx.clone(), all_subsystems.candidate_selection, &metrics, - &mut seed, + &mut running_subsystems, TaskKind::Regular, )?; let statement_distribution_subsystem = spawn( &mut s, - &mut running_subsystems, - metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), + statement_distribution_bounded_tx, + stream::select(statement_distribution_bounded_rx, statement_distribution_unbounded_rx), + candidate_validation_unbounded_tx.meter().clone(), + channels_out.clone(), + to_overseer_tx.clone(), all_subsystems.statement_distribution, &metrics, - &mut seed, + &mut running_subsystems, TaskKind::Regular, )?; let availability_distribution_subsystem = spawn( &mut s, - &mut running_subsystems, - metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), + availability_distribution_bounded_tx, + stream::select(availability_distribution_bounded_rx, availability_distribution_unbounded_rx), + availability_distribution_unbounded_tx.meter().clone(), + channels_out.clone(), + to_overseer_tx.clone(), all_subsystems.availability_distribution, &metrics, - &mut seed, + &mut running_subsystems, TaskKind::Regular, )?; let availability_recovery_subsystem = spawn( &mut s, - &mut running_subsystems, - metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), + availability_recovery_bounded_tx, + stream::select(availability_recovery_bounded_rx, availability_recovery_unbounded_rx), + availability_recovery_unbounded_tx.meter().clone(), + channels_out.clone(), + to_overseer_tx.clone(), all_subsystems.availability_recovery, &metrics, - &mut seed, + &mut running_subsystems, TaskKind::Regular, )?; let bitfield_signing_subsystem = spawn( &mut s, - &mut running_subsystems, - metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), + bitfield_signing_bounded_tx, + stream::select(bitfield_signing_bounded_rx, bitfield_signing_unbounded_rx), + bitfield_signing_unbounded_tx.meter().clone(), + channels_out.clone(), + to_overseer_tx.clone(), all_subsystems.bitfield_signing, &metrics, - &mut seed, + &mut running_subsystems, TaskKind::Regular, )?; let bitfield_distribution_subsystem = spawn( &mut s, - &mut running_subsystems, - metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), + bitfield_distribution_bounded_tx, + stream::select(bitfield_distribution_bounded_rx, bitfield_distribution_unbounded_rx), + bitfield_distribution_unbounded_tx.meter().clone(), + channels_out.clone(), + to_overseer_tx.clone(), all_subsystems.bitfield_distribution, &metrics, - &mut seed, + &mut running_subsystems, TaskKind::Regular, )?; let provisioner_subsystem = spawn( &mut s, - &mut running_subsystems, - metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), + provisioner_bounded_tx, + stream::select(provisioner_bounded_rx, provisioner_unbounded_rx), + provisioner_unbounded_tx.meter().clone(), + channels_out.clone(), + to_overseer_tx.clone(), all_subsystems.provisioner, &metrics, - &mut seed, + &mut running_subsystems, TaskKind::Regular, )?; let runtime_api_subsystem = spawn( &mut s, - &mut running_subsystems, - metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), + runtime_api_bounded_tx, + stream::select(runtime_api_bounded_rx, runtime_api_unbounded_rx), + runtime_api_unbounded_tx.meter().clone(), + channels_out.clone(), + to_overseer_tx.clone(), all_subsystems.runtime_api, &metrics, - &mut seed, + &mut running_subsystems, TaskKind::Regular, )?; let availability_store_subsystem = spawn( &mut s, - &mut running_subsystems, - metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), + availability_store_bounded_tx, + stream::select(availability_store_bounded_rx, availability_store_unbounded_rx), + availability_store_unbounded_tx.meter().clone(), + channels_out.clone(), + to_overseer_tx.clone(), all_subsystems.availability_store, &metrics, - &mut seed, + &mut running_subsystems, TaskKind::Blocking, )?; let network_bridge_subsystem = spawn( &mut s, - &mut running_subsystems, - metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), + network_bridge_bounded_tx, + stream::select(network_bridge_bounded_rx, network_bridge_unbounded_rx), + network_bridge_unbounded_tx.meter().clone(), + channels_out.clone(), + to_overseer_tx.clone(), all_subsystems.network_bridge, &metrics, - &mut seed, + &mut running_subsystems, TaskKind::Regular, )?; let chain_api_subsystem = spawn( &mut s, - &mut running_subsystems, - metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), + chain_api_bounded_tx, + stream::select(chain_api_bounded_rx, chain_api_unbounded_rx), + chain_api_unbounded_tx.meter().clone(), + channels_out.clone(), + to_overseer_tx.clone(), all_subsystems.chain_api, &metrics, - &mut seed, + &mut running_subsystems, TaskKind::Blocking, )?; let collation_generation_subsystem = spawn( &mut s, - &mut running_subsystems, - metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), + collation_generation_bounded_tx, + stream::select(collation_generation_bounded_rx, collation_generation_unbounded_rx), + collation_generation_unbounded_tx.meter().clone(), + channels_out.clone(), + to_overseer_tx.clone(), all_subsystems.collation_generation, &metrics, - &mut seed, + &mut running_subsystems, TaskKind::Regular, )?; - let collator_protocol_subsystem = spawn( &mut s, - &mut running_subsystems, - metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), + collator_protocol_bounded_tx, + stream::select(collator_protocol_bounded_rx, collator_protocol_unbounded_rx), + collator_protocol_unbounded_tx.meter().clone(), + channels_out.clone(), + to_overseer_tx.clone(), all_subsystems.collator_protocol, &metrics, - &mut seed, + &mut running_subsystems, TaskKind::Regular, )?; let approval_distribution_subsystem = spawn( &mut s, - &mut running_subsystems, - metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), + approval_distribution_bounded_tx, + stream::select(approval_distribution_bounded_rx, approval_distribution_unbounded_rx), + approval_distribution_unbounded_tx.meter().clone(), + channels_out.clone(), + to_overseer_tx.clone(), all_subsystems.approval_distribution, &metrics, - &mut seed, + &mut running_subsystems, TaskKind::Regular, )?; let approval_voting_subsystem = spawn( &mut s, - &mut running_subsystems, - metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), + approval_voting_bounded_tx, + stream::select(approval_voting_bounded_rx, approval_voting_unbounded_rx), + approval_voting_unbounded_tx.meter().clone(), + channels_out.clone(), + to_overseer_tx.clone(), all_subsystems.approval_voting, &metrics, - &mut seed, + &mut running_subsystems, TaskKind::Blocking, )?; let gossip_support_subsystem = spawn( &mut s, - &mut running_subsystems, - metered::UnboundedMeteredSender::<_>::clone(&to_overseer_tx), + gossip_support_bounded_tx, + stream::select(gossip_support_bounded_rx, gossip_support_unbounded_rx), + gossip_support_unbounded_tx.meter().clone(), + channels_out.clone(), + to_overseer_tx.clone(), all_subsystems.gossip_support, &metrics, - &mut seed, + &mut running_subsystems, TaskKind::Regular, )?; @@ -1762,35 +2240,33 @@ where }; { - struct ExtractNameAndMeter; - impl<'a, T: 'a> MapSubsystem<&'a OverseenSubsystem<T>> for ExtractNameAndMeter { - type Output = (&'static str, metered::Meter); + struct ExtractNameAndMeters; + impl<'a, T: 'a> MapSubsystem<&'a OverseenSubsystem<T>> for ExtractNameAndMeters { + type Output = (&'static str, SubsystemMeters); fn map_subsystem(&self, subsystem: &'a OverseenSubsystem<T>) -> Self::Output { let instance = subsystem.instance.as_ref() .expect("Extraction is done directly after spawning when subsystems\ have not concluded; qed"); - (instance.name, instance.tx.meter().clone()) + ( + instance.name, + instance.meters.clone(), + ) } } - let meter_external_to_overseer = events_rx.meter().clone(); - let meter_subsystem_to_overseer = to_overseer_rx.meter().clone(); - let subsystem_meters = subsystems.as_ref().map_subsystems(ExtractNameAndMeter); + let subsystem_meters = subsystems.as_ref().map_subsystems(ExtractNameAndMeters); let metronome_metrics = metrics.clone(); let metronome = Metronome::new(std::time::Duration::from_millis(950)) .for_each(move |_| { - let to_subsystem_counts = subsystem_meters.as_ref() - .map_subsystems(|&(name, ref meter): &(_, metered::Meter)| (name, meter.read())); + let subsystem_meters = subsystem_meters.as_ref() + .map_subsystems(|&(name, ref meters): &(_, SubsystemMeters)| (name, meters.read())); // We combine the amount of messages from subsystems to the overseer // as well as the amount of messages from external sources to the overseer // into one to_overseer value. - metronome_metrics.channel_fill_level_snapshot( - to_subsystem_counts, - meter_subsystem_to_overseer.read() + meter_external_to_overseer.read(), - ); + metronome_metrics.channel_fill_level_snapshot(subsystem_meters); async move { () @@ -1899,7 +2375,7 @@ where } }, msg = self.to_overseer_rx.next() => { - let MaybeTimed { timer, t: msg } = match msg { + let msg = match msg { Some(m) => m, None => { // This is a fused stream so we will shut down after receiving all @@ -1909,10 +2385,6 @@ where }; match msg { - ToOverseer::SubsystemMessage(msg) => { - let msg = MaybeTimed { timer, t: msg }; - self.route_message(msg).await? - }, ToOverseer::SpawnJob { name, s } => { self.spawn_job(name, s); } @@ -2018,8 +2490,7 @@ where } #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] - async fn route_message(&mut self, msg: MaybeTimed<AllMessages>) -> SubsystemResult<()> { - let msg = msg.into_inner(); + async fn route_message(&mut self, msg: AllMessages) -> SubsystemResult<()> { self.metrics.on_message_relayed(); match msg { AllMessages::CandidateValidation(msg) => { @@ -2148,33 +2619,33 @@ enum TaskKind { fn spawn<S: SpawnNamed, M: Send + 'static>( spawner: &mut S, - futures: &mut FuturesUnordered<BoxFuture<'static, SubsystemResult<()>>>, - to_overseer: metered::UnboundedMeteredSender<MaybeTimed<ToOverseer>>, + message_tx: metered::MeteredSender<MessagePacket<M>>, + message_rx: SubsystemIncomingMessages<M>, + unbounded_meter: metered::Meter, + to_subsystems: ChannelsOut, + to_overseer_tx: metered::UnboundedMeteredSender<ToOverseer>, s: impl Subsystem<OverseerSubsystemContext<M>>, metrics: &Metrics, - seed: &mut u64, + futures: &mut FuturesUnordered<BoxFuture<'static, SubsystemResult<()>>>, task_kind: TaskKind, ) -> SubsystemResult<OverseenSubsystem<M>> { - let (to_tx, to_rx) = metered::channel(CHANNEL_CAPACITY, "subsystem_spawn"); + let (signal_tx, signal_rx) = metered::channel(CHANNEL_CAPACITY); let ctx = OverseerSubsystemContext::new( - to_rx, - to_overseer, + signal_rx, + message_rx, + to_subsystems, + to_overseer_tx, metrics.clone(), - *seed, - MESSAGE_TIMER_METRIC_CAPTURE_RATE, ); let SpawnedSubsystem { future, name } = s.start(ctx); - // increment the seed now that it's been used, so the next context will have its own distinct RNG - *seed += 1; - let (tx, rx) = oneshot::channel(); let fut = Box::pin(async move { if let Err(e) = future.await { - tracing::error!(target: LOG_TARGET, subsystem=name, err = ?e, "subsystem exited with error"); + tracing::error!(subsystem=name, err = ?e, "subsystem exited with error"); } else { - tracing::debug!(target: LOG_TARGET, subsystem=name, "subsystem exited without an error"); + tracing::debug!(subsystem=name, "subsystem exited without an error"); } let _ = tx.send(()); }); @@ -2187,7 +2658,14 @@ fn spawn<S: SpawnNamed, M: Send + 'static>( futures.push(Box::pin(rx.map(|e| { tracing::warn!(err = ?e, "dropping error"); Ok(()) }))); let instance = Some(SubsystemInstance { - tx: to_tx, + meters: SubsystemMeters { + unbounded: unbounded_meter, + bounded: message_tx.meter().clone(), + signals: signal_tx.meter().clone(), + }, + tx_signal: signal_tx, + tx_bounded: message_tx, + signals_received: 0, name, }); @@ -2209,6 +2687,7 @@ mod tests { use polkadot_node_subsystem_util::metered; use sp_core::crypto::Pair as _; + use assert_matches::assert_matches; use super::*; @@ -2311,8 +2790,8 @@ mod tests { let spawner = sp_core::testing::TaskExecutor::new(); executor::block_on(async move { - let (s1_tx, s1_rx) = metered::channel::<usize>(64, "overseer_test"); - let (s2_tx, s2_rx) = metered::channel::<usize>(64, "overseer_test"); + let (s1_tx, s1_rx) = metered::channel::<usize>(64); + let (s2_tx, s2_rx) = metered::channel::<usize>(64); let mut s1_rx = s1_rx.fuse(); let mut s2_rx = s2_rx.fuse(); @@ -2423,9 +2902,6 @@ mod tests { assert_eq!(gather[0].get_name(), "parachain_activated_heads_total"); assert_eq!(gather[1].get_name(), "parachain_deactivated_heads_total"); assert_eq!(gather[2].get_name(), "parachain_messages_relayed_total"); - assert_eq!(gather[3].get_name(), "parachain_overseer_messages_relay_timings"); - assert_eq!(gather[4].get_name(), "parachain_to_overseer_received"); - assert_eq!(gather[5].get_name(), "parachain_to_overseer_sent"); let activated = gather[0].get_metric()[0].get_counter().get_value() as u64; let deactivated = gather[1].get_metric()[0].get_counter().get_value() as u64; let relayed = gather[2].get_metric()[0].get_counter().get_value() as u64; @@ -2546,8 +3022,8 @@ mod tests { number: 3, }; - let (tx_5, mut rx_5) = metered::channel(64, "overseer_test"); - let (tx_6, mut rx_6) = metered::channel(64, "overseer_test"); + let (tx_5, mut rx_5) = metered::channel(64); + let (tx_6, mut rx_6) = metered::channel(64); let all_subsystems = AllSubsystems::<()>::dummy() .replace_candidate_validation(TestSubsystem5(tx_5)) .replace_candidate_backing(TestSubsystem6(tx_6)); @@ -2648,8 +3124,8 @@ mod tests { number: 3, }; - let (tx_5, mut rx_5) = metered::channel(64, "overseer_test"); - let (tx_6, mut rx_6) = metered::channel(64, "overseer_test"); + let (tx_5, mut rx_5) = metered::channel(64); + let (tx_6, mut rx_6) = metered::channel(64); let all_subsystems = AllSubsystems::<()>::dummy() .replace_candidate_validation(TestSubsystem5(tx_5)) @@ -2748,7 +3224,7 @@ mod tests { number: 1, }; - let (tx_5, mut rx_5) = metered::channel(64, "overseer_test"); + let (tx_5, mut rx_5) = metered::channel(64); let all_subsystems = AllSubsystems::<()>::dummy() .replace_candidate_backing(TestSubsystem6(tx_5)); @@ -2960,8 +3436,11 @@ mod tests { // Checks that `stop`, `broadcast_signal` and `broadcast_message` are implemented correctly. #[test] fn overseer_all_subsystems_receive_signals_and_messages() { - let spawner = sp_core::testing::TaskExecutor::new(); + const NUM_SUBSYSTEMS: usize = 18; + // -3 for BitfieldSigning, GossipSupport and AvailabilityDistribution + const NUM_SUBSYSTEMS_MESSAGED: usize = NUM_SUBSYSTEMS - 3; + let spawner = sp_core::testing::TaskExecutor::new(); executor::block_on(async move { let stop_signals_received = Arc::new(atomic::AtomicUsize::new(0)); let signals_received = Arc::new(atomic::AtomicUsize::new(0)); @@ -3030,22 +3509,158 @@ mod tests { handler.send_msg(AllMessages::ApprovalDistribution(test_approval_distribution_msg())).await; handler.send_msg(AllMessages::ApprovalVoting(test_approval_voting_msg())).await; + // Wait until all subsystems have received. Otherwise the messages might race against + // the conclude signal. + loop { + match (&mut overseer_fut).timeout(Duration::from_millis(100)).await { + None => { + let r = msgs_received.load(atomic::Ordering::SeqCst); + if r < NUM_SUBSYSTEMS_MESSAGED { + Delay::new(Duration::from_millis(100)).await; + } else if r > NUM_SUBSYSTEMS_MESSAGED { + panic!("too many messages received??"); + } else { + break + } + } + Some(_) => panic!("exited too early"), + } + } + // send a stop signal to each subsystems handler.stop().await; - select! { - res = overseer_fut => { - const NUM_SUBSYSTEMS: usize = 18; - - assert_eq!(stop_signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS); - assert_eq!(signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS); - // -3 for BitfieldSigning, GossipSupport and AvailabilityDistribution - assert_eq!(msgs_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS - 3); + let res = overseer_fut.await; + assert_eq!(stop_signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS); + assert_eq!(signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS); + assert_eq!(msgs_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS_MESSAGED); - assert!(res.is_ok()); - }, - complete => (), - } + assert!(res.is_ok()); }); } + + #[test] + fn context_holds_onto_message_until_enough_signals_received() { + let (candidate_validation_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (candidate_backing_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (candidate_selection_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (statement_distribution_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (availability_distribution_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (availability_recovery_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (bitfield_signing_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (bitfield_distribution_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (provisioner_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (runtime_api_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (availability_store_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (network_bridge_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (chain_api_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (collator_protocol_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (collation_generation_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (approval_distribution_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (approval_voting_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (gossip_support_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + + let (candidate_validation_unbounded_tx, _) = metered::unbounded(); + let (candidate_backing_unbounded_tx, _) = metered::unbounded(); + let (candidate_selection_unbounded_tx, _) = metered::unbounded(); + let (statement_distribution_unbounded_tx, _) = metered::unbounded(); + let (availability_distribution_unbounded_tx, _) = metered::unbounded(); + let (availability_recovery_unbounded_tx, _) = metered::unbounded(); + let (bitfield_signing_unbounded_tx, _) = metered::unbounded(); + let (bitfield_distribution_unbounded_tx, _) = metered::unbounded(); + let (provisioner_unbounded_tx, _) = metered::unbounded(); + let (runtime_api_unbounded_tx, _) = metered::unbounded(); + let (availability_store_unbounded_tx, _) = metered::unbounded(); + let (network_bridge_unbounded_tx, _) = metered::unbounded(); + let (chain_api_unbounded_tx, _) = metered::unbounded(); + let (collator_protocol_unbounded_tx, _) = metered::unbounded(); + let (collation_generation_unbounded_tx, _) = metered::unbounded(); + let (approval_distribution_unbounded_tx, _) = metered::unbounded(); + let (approval_voting_unbounded_tx, _) = metered::unbounded(); + let (gossip_support_unbounded_tx, _) = metered::unbounded(); + + let channels_out = ChannelsOut { + candidate_validation: candidate_validation_bounded_tx.clone(), + candidate_backing: candidate_backing_bounded_tx.clone(), + candidate_selection: candidate_selection_bounded_tx.clone(), + statement_distribution: statement_distribution_bounded_tx.clone(), + availability_distribution: availability_distribution_bounded_tx.clone(), + availability_recovery: availability_recovery_bounded_tx.clone(), + bitfield_signing: bitfield_signing_bounded_tx.clone(), + bitfield_distribution: bitfield_distribution_bounded_tx.clone(), + provisioner: provisioner_bounded_tx.clone(), + runtime_api: runtime_api_bounded_tx.clone(), + availability_store: availability_store_bounded_tx.clone(), + network_bridge: network_bridge_bounded_tx.clone(), + chain_api: chain_api_bounded_tx.clone(), + collator_protocol: collator_protocol_bounded_tx.clone(), + collation_generation: collation_generation_bounded_tx.clone(), + approval_distribution: approval_distribution_bounded_tx.clone(), + approval_voting: approval_voting_bounded_tx.clone(), + gossip_support: gossip_support_bounded_tx.clone(), + + candidate_validation_unbounded: candidate_validation_unbounded_tx.clone(), + candidate_backing_unbounded: candidate_backing_unbounded_tx.clone(), + candidate_selection_unbounded: candidate_selection_unbounded_tx.clone(), + statement_distribution_unbounded: statement_distribution_unbounded_tx.clone(), + availability_distribution_unbounded: availability_distribution_unbounded_tx.clone(), + availability_recovery_unbounded: availability_recovery_unbounded_tx.clone(), + bitfield_signing_unbounded: bitfield_signing_unbounded_tx.clone(), + bitfield_distribution_unbounded: bitfield_distribution_unbounded_tx.clone(), + provisioner_unbounded: provisioner_unbounded_tx.clone(), + runtime_api_unbounded: runtime_api_unbounded_tx.clone(), + availability_store_unbounded: availability_store_unbounded_tx.clone(), + network_bridge_unbounded: network_bridge_unbounded_tx.clone(), + chain_api_unbounded: chain_api_unbounded_tx.clone(), + collator_protocol_unbounded: collator_protocol_unbounded_tx.clone(), + collation_generation_unbounded: collation_generation_unbounded_tx.clone(), + approval_distribution_unbounded: approval_distribution_unbounded_tx.clone(), + approval_voting_unbounded: approval_voting_unbounded_tx.clone(), + gossip_support_unbounded: gossip_support_unbounded_tx.clone(), + }; + + let (mut signal_tx, signal_rx) = metered::channel(CHANNEL_CAPACITY); + let (mut bounded_tx, bounded_rx) = metered::channel(CHANNEL_CAPACITY); + let (unbounded_tx, unbounded_rx) = metered::unbounded(); + let (to_overseer_tx, _to_overseer_rx) = metered::unbounded(); + + let mut ctx = OverseerSubsystemContext::<()>::new_unmetered( + signal_rx, + stream::select(bounded_rx, unbounded_rx), + channels_out, + to_overseer_tx, + ); + + assert_eq!(ctx.signals_received.load(), 0); + + let test_fut = async move { + signal_tx.send(OverseerSignal::Conclude).await.unwrap(); + assert_matches!(ctx.recv().await.unwrap(), FromOverseer::Signal(OverseerSignal::Conclude)); + + assert_eq!(ctx.signals_received.load(), 1); + bounded_tx.send(MessagePacket { + signals_received: 2, + message: (), + }).await.unwrap(); + unbounded_tx.unbounded_send(MessagePacket { + signals_received: 2, + message: (), + }).unwrap(); + + match poll!(ctx.recv()) { + Poll::Pending => {} + Poll::Ready(_) => panic!("ready too early"), + }; + + assert!(ctx.pending_incoming.is_some()); + + signal_tx.send(OverseerSignal::Conclude).await.unwrap(); + assert_matches!(ctx.recv().await.unwrap(), FromOverseer::Signal(OverseerSignal::Conclude)); + assert_matches!(ctx.recv().await.unwrap(), FromOverseer::Communication { msg: () }); + assert_matches!(ctx.recv().await.unwrap(), FromOverseer::Communication { msg: () }); + assert!(ctx.pending_incoming.is_none()); + }; + + futures::executor::block_on(test_fut); + } } diff --git a/polkadot/node/subsystem-test-helpers/src/lib.rs b/polkadot/node/subsystem-test-helpers/src/lib.rs index 71d9631fd8a..76fd86a3f62 100644 --- a/polkadot/node/subsystem-test-helpers/src/lib.rs +++ b/polkadot/node/subsystem-test-helpers/src/lib.rs @@ -21,7 +21,7 @@ use polkadot_node_subsystem::messages::AllMessages; use polkadot_node_subsystem::{ FromOverseer, SubsystemContext, SubsystemError, SubsystemResult, Subsystem, - SpawnedSubsystem, OverseerSignal, + SpawnedSubsystem, OverseerSignal, SubsystemSender, }; use polkadot_node_subsystem_util::TimeoutExt; @@ -156,9 +156,41 @@ pub fn single_item_sink<T>() -> (SingleItemSink<T>, SingleItemStream<T>) { (SingleItemSink(inner.clone()), SingleItemStream(inner)) } +/// A test subsystem sender. +#[derive(Clone)] +pub struct TestSubsystemSender { + tx: mpsc::UnboundedSender<AllMessages>, +} + +#[async_trait::async_trait] +impl SubsystemSender for TestSubsystemSender { + async fn send_message(&mut self, msg: AllMessages) { + self.tx + .send(msg) + .await + .expect("test overseer no longer live"); + } + + async fn send_messages<T>(&mut self, msgs: T) + where + T: IntoIterator<Item = AllMessages> + Send, + T::IntoIter: Send, + { + let mut iter = stream::iter(msgs.into_iter().map(Ok)); + self.tx + .send_all(&mut iter) + .await + .expect("test overseer no longer live"); + } + + fn send_unbounded_message(&mut self, msg: AllMessages) { + self.tx.unbounded_send(msg).expect("test overseer no longer live"); + } +} + /// A test subsystem context. pub struct TestSubsystemContext<M, S> { - tx: mpsc::UnboundedSender<AllMessages>, + tx: TestSubsystemSender, rx: SingleItemStream<FromOverseer<M>>, spawn: S, } @@ -168,6 +200,7 @@ impl<M: Send + 'static, S: SpawnNamed + Send + 'static> SubsystemContext for TestSubsystemContext<M, S> { type Message = M; + type Sender = TestSubsystemSender; async fn try_recv(&mut self) -> Result<Option<FromOverseer<M>>, ()> { match poll!(self.rx.next()) { @@ -198,23 +231,8 @@ impl<M: Send + 'static, S: SpawnNamed + Send + 'static> SubsystemContext Ok(()) } - async fn send_message(&mut self, msg: AllMessages) { - self.tx - .send(msg) - .await - .expect("test overseer no longer live"); - } - - async fn send_messages<T>(&mut self, msgs: T) - where - T: IntoIterator<Item = AllMessages> + Send, - T::IntoIter: Send, - { - let mut iter = stream::iter(msgs.into_iter().map(Ok)); - self.tx - .send_all(&mut iter) - .await - .expect("test overseer no longer live"); + fn sender(&mut self) -> &mut TestSubsystemSender { + &mut self.tx } } @@ -260,7 +278,7 @@ pub fn make_subsystem_context<M, S>( ( TestSubsystemContext { - tx: all_messages_tx, + tx: TestSubsystemSender { tx: all_messages_tx }, rx: overseer_rx, spawn, }, diff --git a/polkadot/node/subsystem/src/lib.rs b/polkadot/node/subsystem/src/lib.rs index f7cba512aba..3cc4894a493 100644 --- a/polkadot/node/subsystem/src/lib.rs +++ b/polkadot/node/subsystem/src/lib.rs @@ -210,6 +210,27 @@ pub struct SpawnedSubsystem { /// [`SubsystemError`]: struct.SubsystemError.html pub type SubsystemResult<T> = Result<T, SubsystemError>; +/// A sender used by subsystems to communicate with other subsystems. +/// +/// Each clone of this type may add more capacity to the bounded buffer, so clones should +/// be used sparingly. +#[async_trait] +pub trait SubsystemSender: Send + Clone + 'static { + /// Send a direct message to some other `Subsystem`, routed based on message type. + async fn send_message(&mut self, msg: AllMessages); + + /// Send multiple direct messages to other `Subsystem`s, routed based on message type. + async fn send_messages<T>(&mut self, msgs: T) + where T: IntoIterator<Item = AllMessages> + Send, T::IntoIter: Send; + + /// Send a message onto the unbounded queue of some other `Subsystem`, routed based on message + /// type. + /// + /// This function should be used only when there is some other bounding factor on the messages + /// sent with it. Otherwise, it risks a memory leak. + fn send_unbounded_message(&mut self, msg: AllMessages); +} + /// A context type that is given to the [`Subsystem`] upon spawning. /// It can be used by [`Subsystem`] to communicate with other [`Subsystem`]s /// or spawn jobs. @@ -217,11 +238,14 @@ pub type SubsystemResult<T> = Result<T, SubsystemError>; /// [`Overseer`]: struct.Overseer.html /// [`SubsystemJob`]: trait.SubsystemJob.html #[async_trait] -pub trait SubsystemContext: Send + 'static { +pub trait SubsystemContext: Send + Sized + 'static { /// The message type of this context. Subsystems launched with this context will expect /// to receive messages of this type. type Message: Send; + /// The message sender type of this context. Clones of the sender should be used sparingly. + type Sender: SubsystemSender; + /// Try to asynchronously receive a message. /// /// This has to be used with caution, if you loop over this without @@ -241,12 +265,34 @@ pub trait SubsystemContext: Send + 'static { s: Pin<Box<dyn Future<Output = ()> + Send>>, ) -> SubsystemResult<()>; + /// Get a mutable reference to the sender. + fn sender(&mut self) -> &mut Self::Sender; + /// Send a direct message to some other `Subsystem`, routed based on message type. - async fn send_message(&mut self, msg: AllMessages); + async fn send_message(&mut self, msg: AllMessages) { + self.sender().send_message(msg).await + } /// Send multiple direct messages to other `Subsystem`s, routed based on message type. async fn send_messages<T>(&mut self, msgs: T) - where T: IntoIterator<Item = AllMessages> + Send, T::IntoIter: Send; + where T: IntoIterator<Item = AllMessages> + Send, T::IntoIter: Send + { + self.sender().send_messages(msgs).await + } + + + /// Send a message onto the unbounded queue of some other `Subsystem`, routed based on message + /// type. + /// + /// This function should be used only when there is some other bounding factor on the messages + /// sent with it. Otherwise, it risks a memory leak. + /// + /// Generally, for this method to be used, these conditions should be met: + /// * There is a communication cycle between subsystems + /// * One of the parts of the cycle has a clear bound on the number of messages produced. + fn send_unbounded_message(&mut self, msg: AllMessages) { + self.sender().send_unbounded_message(msg) + } } /// A trait that describes the [`Subsystem`]s that can run on the [`Overseer`]. -- GitLab