From 6957847b6b0f35de497cf2836d618d9b3602e84b Mon Sep 17 00:00:00 2001 From: Andronik Ordian <write@reusable.software> Date: Thu, 9 Jul 2020 22:25:40 +0200 Subject: [PATCH] Integrate all (dummy) subsystems with the Overseer (#1374) * overseer: introduce a utility typemap * it's ugly but it compiles * move DummySubsystem to subsystem crate * fix tests fallout * use a struct for all subsystems * more tests fallout * add missing pov_distribution subsystem * remove unused imports and bounds * fix minimal-example --- .../node/overseer/examples/minimal-example.rs | 25 +- polkadot/node/overseer/src/lib.rs | 423 +++++++++++++++--- polkadot/node/service/src/lib.rs | 53 +-- polkadot/node/subsystem/src/lib.rs | 18 + polkadot/node/subsystem/src/messages.rs | 12 +- 5 files changed, 425 insertions(+), 106 deletions(-) diff --git a/polkadot/node/overseer/examples/minimal-example.rs b/polkadot/node/overseer/examples/minimal-example.rs index aefb627f7f5..21e8eb3ab8f 100644 --- a/polkadot/node/overseer/examples/minimal-example.rs +++ b/polkadot/node/overseer/examples/minimal-example.rs @@ -28,11 +28,14 @@ use futures_timer::Delay; use kv_log_macro as log; use polkadot_primitives::parachain::{BlockData, PoVBlock}; -use polkadot_overseer::Overseer; +use polkadot_overseer::{Overseer, AllSubsystems}; -use polkadot_subsystem::{Subsystem, SubsystemContext, SpawnedSubsystem, FromOverseer}; +use polkadot_subsystem::{ + Subsystem, SubsystemContext, DummySubsystem, + SpawnedSubsystem, FromOverseer, +}; use polkadot_subsystem::messages::{ - AllMessages, CandidateBackingMessage, CandidateValidationMessage + CandidateValidationMessage, CandidateBackingMessage, AllMessages, }; struct Subsystem1; @@ -128,10 +131,22 @@ fn main() { Delay::new(Duration::from_secs(1)).await; }); + let all_subsystems = AllSubsystems { + candidate_validation: Subsystem2, + candidate_backing: Subsystem1, + candidate_selection: DummySubsystem, + statement_distribution: DummySubsystem, + availability_distribution: DummySubsystem, + bitfield_distribution: DummySubsystem, + provisioner: DummySubsystem, + pov_distribution: DummySubsystem, + runtime_api: DummySubsystem, + availability_store: DummySubsystem, + network_bridge: DummySubsystem, + }; let (overseer, _handler) = Overseer::new( vec![], - Subsystem2, - Subsystem1, + all_subsystems, spawner, ).unwrap(); let overseer_fut = overseer.run().fuse(); diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index ffebf33ce4f..458aae91e87 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -76,7 +76,11 @@ use polkadot_primitives::{Block, BlockNumber, Hash}; use client::{BlockImportNotification, BlockchainEvents, FinalityNotification}; use polkadot_subsystem::messages::{ - CandidateValidationMessage, CandidateBackingMessage, AllMessages + CandidateValidationMessage, CandidateBackingMessage, + CandidateSelectionMessage, StatementDistributionMessage, + AvailabilityDistributionMessage, BitfieldDistributionMessage, + ProvisionerMessage, PoVDistributionMessage, RuntimeApiMessage, + AvailabilityStoreMessage, NetworkBridgeMessage, AllMessages, }; pub use polkadot_subsystem::{ Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult, @@ -319,12 +323,40 @@ struct OverseenSubsystem<M> { /// The `Overseer` itself. pub struct Overseer<S: Spawn> { - /// A validation subsystem - validation_subsystem: OverseenSubsystem<CandidateValidationMessage>, + /// A candidate validation subsystem. + candidate_validation_subsystem: OverseenSubsystem<CandidateValidationMessage>, - /// A candidate backing subsystem + /// A candidate backing subsystem. candidate_backing_subsystem: OverseenSubsystem<CandidateBackingMessage>, + /// A candidate selection subsystem. + candidate_selection_subsystem: OverseenSubsystem<CandidateSelectionMessage>, + + /// A statement distribution subsystem. + statement_distribution_subsystem: OverseenSubsystem<StatementDistributionMessage>, + + /// An availability distribution subsystem. + availability_distribution_subsystem: OverseenSubsystem<AvailabilityDistributionMessage>, + + /// A bitfield distribution subsystem. + bitfield_distribution_subsystem: OverseenSubsystem<BitfieldDistributionMessage>, + + /// A provisioner subsystem. + provisioner_subsystem: OverseenSubsystem<ProvisionerMessage>, + + /// A PoV distribution subsystem. + pov_distribution_subsystem: OverseenSubsystem<PoVDistributionMessage>, + + /// A runtime API subsystem. + runtime_api_subsystem: OverseenSubsystem<RuntimeApiMessage>, + + /// An availability store subsystem. + availability_store_subsystem: OverseenSubsystem<AvailabilityStoreMessage>, + + /// A network bridge subsystem. + network_bridge_subsystem: OverseenSubsystem<NetworkBridgeMessage>, + + /// Spawner to spawn tasks to. s: S, @@ -346,22 +378,48 @@ pub struct Overseer<S: Spawn> { active_leaves: HashSet<(Hash, BlockNumber)>, } +/// This struct is passed as an argument to create a new instance of an [`Overseer`]. +/// +/// As any entity that satisfies the interface may act as a [`Subsystem`] this allows +/// mocking in the test code: +/// +/// Each [`Subsystem`] is supposed to implement some interface that is generic over +/// message type that is specific to this [`Subsystem`]. At the moment not all +/// subsystems are implemented and the rest can be mocked with the [`DummySubsystem`]. +/// +/// [`Subsystem`]: trait.Subsystem.html +/// [`DummySubsystem`]: struct.DummySubsystem.html +pub struct AllSubsystems<CV, CB, CS, SD, AD, BD, P, PoVD, RA, AS, NB> { + /// A candidate validation subsystem. + pub candidate_validation: CV, + /// A candidate backing subsystem. + pub candidate_backing: CB, + /// A candidate selection subsystem. + pub candidate_selection: CS, + /// A statement distribution subsystem. + pub statement_distribution: SD, + /// An availability distribution subsystem. + pub availability_distribution: AD, + /// A bitfield distribution subsystem. + pub bitfield_distribution: BD, + /// A provisioner subsystem. + pub provisioner: P, + /// A PoV distribution subsystem. + pub pov_distribution: PoVD, + /// A runtime API subsystem. + pub runtime_api: RA, + /// An availability store subsystem. + pub availability_store: AS, + /// A network bridge subsystem. + pub network_bridge: NB, +} + impl<S> Overseer<S> where S: Spawn, { /// Create a new intance of the `Overseer` with a fixed set of [`Subsystem`]s. /// - /// Each [`Subsystem`] is passed to this function as an explicit parameter - /// and is supposed to implement some interface that is generic over message type - /// that is specific to this [`Subsystem`]. At the moment there are only two - /// subsystems: - /// * Validation - /// * CandidateBacking - /// - /// As any entity that satisfies the interface may act as a [`Subsystem`] this allows - /// mocking in the test code: - /// /// ```text /// +------------------------------------+ /// | Overseer | @@ -388,16 +446,16 @@ where /// # Example /// /// The [`Subsystems`] may be any type as long as they implement an expected interface. - /// Here, we create two mock subsystems and start the `Overseer` with them. For the sake - /// of simplicity the termination of the example is done with a timeout. + /// Here, we create a mock validation subsystem and a few dummy ones and start the `Overseer` with them. + /// For the sake of simplicity the termination of the example is done with a timeout. /// ``` /// # use std::time::Duration; /// # use futures::{executor, pin_mut, select, FutureExt}; /// # use futures_timer::Delay; - /// # use polkadot_overseer::Overseer; + /// # use polkadot_overseer::{Overseer, AllSubsystems}; /// # use polkadot_subsystem::{ - /// # Subsystem, SpawnedSubsystem, SubsystemContext, - /// # messages::{CandidateValidationMessage, CandidateBackingMessage}, + /// # Subsystem, DummySubsystem, SpawnedSubsystem, SubsystemContext, + /// # messages::CandidateValidationMessage, /// # }; /// /// struct ValidationSubsystem; @@ -417,28 +475,24 @@ where /// } /// } /// - /// struct CandidateBackingSubsystem; - /// impl<C> Subsystem<C> for CandidateBackingSubsystem - /// where C: SubsystemContext<Message=CandidateBackingMessage> - /// { - /// fn start( - /// self, - /// mut ctx: C, - /// ) -> SpawnedSubsystem { - /// SpawnedSubsystem(Box::pin(async move { - /// loop { - /// Delay::new(Duration::from_secs(1)).await; - /// } - /// })) - /// } - /// } - /// /// # fn main() { executor::block_on(async move { /// let spawner = executor::ThreadPool::new().unwrap(); + /// let all_subsystems = AllSubsystems { + /// candidate_validation: ValidationSubsystem, + /// candidate_backing: DummySubsystem, + /// candidate_selection: DummySubsystem, + /// statement_distribution: DummySubsystem, + /// availability_distribution: DummySubsystem, + /// bitfield_distribution: DummySubsystem, + /// provisioner: DummySubsystem, + /// pov_distribution: DummySubsystem, + /// runtime_api: DummySubsystem, + /// availability_store: DummySubsystem, + /// network_bridge: DummySubsystem, + /// }; /// let (overseer, _handler) = Overseer::new( /// vec![], - /// ValidationSubsystem, - /// CandidateBackingSubsystem, + /// all_subsystems, /// spawner, /// ).unwrap(); /// @@ -455,12 +509,24 @@ where /// # /// # }); } /// ``` - pub fn new( + pub fn new<CV, CB, CS, SD, AD, BD, P, PoVD, RA, AS, NB>( leaves: impl IntoIterator<Item = BlockInfo>, - validation: impl Subsystem<OverseerSubsystemContext<CandidateValidationMessage>> + Send, - candidate_backing: impl Subsystem<OverseerSubsystemContext<CandidateBackingMessage>> + Send, + all_subsystems: AllSubsystems<CV, CB, CS, SD, AD, BD, P, PoVD, RA, AS, NB>, mut s: S, - ) -> SubsystemResult<(Self, OverseerHandler)> { + ) -> SubsystemResult<(Self, OverseerHandler)> + where + CV: Subsystem<OverseerSubsystemContext<CandidateValidationMessage>> + Send, + CB: Subsystem<OverseerSubsystemContext<CandidateBackingMessage>> + Send, + CS: Subsystem<OverseerSubsystemContext<CandidateSelectionMessage>> + Send, + SD: Subsystem<OverseerSubsystemContext<StatementDistributionMessage>> + Send, + AD: Subsystem<OverseerSubsystemContext<AvailabilityDistributionMessage>> + Send, + BD: Subsystem<OverseerSubsystemContext<BitfieldDistributionMessage>> + Send, + P: Subsystem<OverseerSubsystemContext<ProvisionerMessage>> + Send, + PoVD: Subsystem<OverseerSubsystemContext<PoVDistributionMessage>> + Send, + RA: Subsystem<OverseerSubsystemContext<RuntimeApiMessage>> + Send, + AS: Subsystem<OverseerSubsystemContext<AvailabilityStoreMessage>> + Send, + NB: Subsystem<OverseerSubsystemContext<NetworkBridgeMessage>> + Send, + { let (events_tx, events_rx) = mpsc::channel(CHANNEL_CAPACITY); let handler = OverseerHandler { @@ -470,18 +536,81 @@ where let mut running_subsystems_rx = StreamUnordered::new(); let mut running_subsystems = FuturesUnordered::new(); - let validation_subsystem = spawn( + let candidate_validation_subsystem = spawn( &mut s, &mut running_subsystems, &mut running_subsystems_rx, - validation, + all_subsystems.candidate_validation, )?; let candidate_backing_subsystem = spawn( &mut s, &mut running_subsystems, &mut running_subsystems_rx, - candidate_backing, + all_subsystems.candidate_backing, + )?; + + let candidate_selection_subsystem = spawn( + &mut s, + &mut running_subsystems, + &mut running_subsystems_rx, + all_subsystems.candidate_selection, + )?; + + let statement_distribution_subsystem = spawn( + &mut s, + &mut running_subsystems, + &mut running_subsystems_rx, + all_subsystems.statement_distribution, + )?; + + let availability_distribution_subsystem = spawn( + &mut s, + &mut running_subsystems, + &mut running_subsystems_rx, + all_subsystems.availability_distribution, + )?; + + let bitfield_distribution_subsystem = spawn( + &mut s, + &mut running_subsystems, + &mut running_subsystems_rx, + all_subsystems.bitfield_distribution, + )?; + + let provisioner_subsystem = spawn( + &mut s, + &mut running_subsystems, + &mut running_subsystems_rx, + all_subsystems.provisioner, + )?; + + let pov_distribution_subsystem = spawn( + &mut s, + &mut running_subsystems, + &mut running_subsystems_rx, + all_subsystems.pov_distribution, + )?; + + let runtime_api_subsystem = spawn( + &mut s, + &mut running_subsystems, + &mut running_subsystems_rx, + all_subsystems.runtime_api, + )?; + + let availability_store_subsystem = spawn( + &mut s, + &mut running_subsystems, + &mut running_subsystems_rx, + all_subsystems.availability_store, + )?; + + let network_bridge_subsystem = spawn( + &mut s, + &mut running_subsystems, + &mut running_subsystems_rx, + all_subsystems.network_bridge, )?; let active_leaves = HashSet::new(); @@ -492,8 +621,17 @@ where .collect(); let this = Self { - validation_subsystem, + candidate_validation_subsystem, candidate_backing_subsystem, + candidate_selection_subsystem, + statement_distribution_subsystem, + availability_distribution_subsystem, + bitfield_distribution_subsystem, + provisioner_subsystem, + pov_distribution_subsystem, + runtime_api_subsystem, + availability_store_subsystem, + network_bridge_subsystem, s, running_subsystems, running_subsystems_rx, @@ -507,7 +645,7 @@ where // Stop the overseer. async fn stop(mut self) { - if let Some(ref mut s) = self.validation_subsystem.instance { + if let Some(ref mut s) = self.candidate_validation_subsystem.instance { let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; } @@ -515,6 +653,42 @@ where let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; } + if let Some(ref mut s) = self.candidate_selection_subsystem.instance { + let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + } + + if let Some(ref mut s) = self.statement_distribution_subsystem.instance { + let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + } + + if let Some(ref mut s) = self.availability_distribution_subsystem.instance { + let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + } + + if let Some(ref mut s) = self.bitfield_distribution_subsystem.instance { + let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + } + + if let Some(ref mut s) = self.provisioner_subsystem.instance { + let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + } + + if let Some(ref mut s) = self.pov_distribution_subsystem.instance { + let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + } + + if let Some(ref mut s) = self.runtime_api_subsystem.instance { + let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + } + + if let Some(ref mut s) = self.availability_distribution_subsystem.instance { + let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + } + + if let Some(ref mut s) = self.network_bridge_subsystem.instance { + let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + } + let mut stop_delay = Delay::new(Duration::from_secs(STOP_DELAY)).fuse(); loop { @@ -616,11 +790,47 @@ where } async fn broadcast_signal(&mut self, signal: OverseerSignal) -> SubsystemResult<()> { - if let Some(ref mut s) = self.validation_subsystem.instance { + if let Some(ref mut s) = self.candidate_validation_subsystem.instance { s.tx.send(FromOverseer::Signal(signal.clone())).await?; } if let Some(ref mut s) = self.candidate_backing_subsystem.instance { + s.tx.send(FromOverseer::Signal(signal.clone())).await?; + } + + if let Some(ref mut s) = self.candidate_selection_subsystem.instance { + s.tx.send(FromOverseer::Signal(signal.clone())).await?; + } + + if let Some(ref mut s) = self.statement_distribution_subsystem.instance { + s.tx.send(FromOverseer::Signal(signal.clone())).await?; + } + + if let Some(ref mut s) = self.availability_distribution_subsystem.instance { + s.tx.send(FromOverseer::Signal(signal.clone())).await?; + } + + if let Some(ref mut s) = self.bitfield_distribution_subsystem.instance { + s.tx.send(FromOverseer::Signal(signal.clone())).await?; + } + + if let Some(ref mut s) = self.provisioner_subsystem.instance { + s.tx.send(FromOverseer::Signal(signal.clone())).await?; + } + + if let Some(ref mut s) = self.pov_distribution_subsystem.instance { + s.tx.send(FromOverseer::Signal(signal.clone())).await?; + } + + if let Some(ref mut s) = self.runtime_api_subsystem.instance { + s.tx.send(FromOverseer::Signal(signal.clone())).await?; + } + + if let Some(ref mut s) = self.availability_store_subsystem.instance { + s.tx.send(FromOverseer::Signal(signal.clone())).await?; + } + + if let Some(ref mut s) = self.network_bridge_subsystem.instance { s.tx.send(FromOverseer::Signal(signal)).await?; } @@ -630,7 +840,7 @@ where async fn route_message(&mut self, msg: AllMessages) { match msg { AllMessages::CandidateValidation(msg) => { - if let Some(ref mut s) = self.validation_subsystem.instance { + if let Some(ref mut s) = self.candidate_validation_subsystem.instance { let _= s.tx.send(FromOverseer::Communication { msg }).await; } } @@ -639,11 +849,50 @@ where let _ = s.tx.send(FromOverseer::Communication { msg }).await; } } - _ => { - // TODO: temporary catch-all until all subsystems are integrated with overseer. - // The overseer is not complete until this is an exhaustive match with all - // messages targeting an included subsystem. - // https://github.com/paritytech/polkadot/issues/1317 + AllMessages::CandidateSelection(msg) => { + if let Some(ref mut s) = self.candidate_selection_subsystem.instance { + let _ = s.tx.send(FromOverseer::Communication { msg }).await; + } + } + AllMessages::StatementDistribution(msg) => { + if let Some(ref mut s) = self.statement_distribution_subsystem.instance { + let _ = s.tx.send(FromOverseer::Communication { msg }).await; + } + } + AllMessages::AvailabilityDistribution(msg) => { + if let Some(ref mut s) = self.availability_distribution_subsystem.instance { + let _ = s.tx.send(FromOverseer::Communication { msg }).await; + } + } + AllMessages::BitfieldDistribution(msg) => { + if let Some(ref mut s) = self.bitfield_distribution_subsystem.instance { + let _ = s.tx.send(FromOverseer::Communication { msg }).await; + } + } + AllMessages::Provisioner(msg) => { + if let Some(ref mut s) = self.provisioner_subsystem.instance { + let _ = s.tx.send(FromOverseer::Communication { msg }).await; + } + } + AllMessages::PoVDistribution(msg) => { + if let Some(ref mut s) = self.pov_distribution_subsystem.instance { + let _ = s.tx.send(FromOverseer::Communication { msg }).await; + } + } + AllMessages::RuntimeApi(msg) => { + if let Some(ref mut s) = self.runtime_api_subsystem.instance { + let _ = s.tx.send(FromOverseer::Communication { msg }).await; + } + } + AllMessages::AvailabilityStore(msg) => { + if let Some(ref mut s) = self.availability_store_subsystem.instance { + let _ = s.tx.send(FromOverseer::Communication { msg }).await; + } + } + AllMessages::NetworkBridge(msg) => { + if let Some(ref mut s) = self.network_bridge_subsystem.instance { + let _ = s.tx.send(FromOverseer::Communication { msg }).await; + } } } } @@ -678,13 +927,16 @@ fn spawn<S: Spawn, M: Send + 'static>( }) } + #[cfg(test)] mod tests { use futures::{executor, pin_mut, select, channel::mpsc, FutureExt}; use polkadot_primitives::parachain::{BlockData, PoVBlock}; + use polkadot_subsystem::DummySubsystem; use super::*; + struct TestSubsystem1(mpsc::Sender<usize>); impl<C> Subsystem<C> for TestSubsystem1 @@ -776,10 +1028,22 @@ mod tests { let (s1_tx, mut s1_rx) = mpsc::channel(64); let (s2_tx, mut s2_rx) = mpsc::channel(64); + let all_subsystems = AllSubsystems { + candidate_validation: TestSubsystem1(s1_tx), + candidate_backing: TestSubsystem2(s2_tx), + candidate_selection: DummySubsystem, + statement_distribution: DummySubsystem, + availability_distribution: DummySubsystem, + bitfield_distribution: DummySubsystem, + provisioner: DummySubsystem, + pov_distribution: DummySubsystem, + runtime_api: DummySubsystem, + availability_store: DummySubsystem, + network_bridge: DummySubsystem, + }; let (overseer, mut handler) = Overseer::new( vec![], - TestSubsystem1(s1_tx), - TestSubsystem2(s2_tx), + all_subsystems, spawner, ).unwrap(); let overseer_fut = overseer.run().fuse(); @@ -826,10 +1090,22 @@ mod tests { executor::block_on(async move { let (s1_tx, _) = mpsc::channel(64); + let all_subsystems = AllSubsystems { + candidate_validation: TestSubsystem1(s1_tx), + candidate_backing: TestSubsystem4, + candidate_selection: DummySubsystem, + statement_distribution: DummySubsystem, + availability_distribution: DummySubsystem, + bitfield_distribution: DummySubsystem, + provisioner: DummySubsystem, + pov_distribution: DummySubsystem, + runtime_api: DummySubsystem, + availability_store: DummySubsystem, + network_bridge: DummySubsystem, + }; let (overseer, _handle) = Overseer::new( vec![], - TestSubsystem1(s1_tx), - TestSubsystem4, + all_subsystems, spawner, ).unwrap(); let overseer_fut = overseer.run().fuse(); @@ -923,11 +1199,22 @@ mod tests { let (tx_5, mut rx_5) = mpsc::channel(64); let (tx_6, mut rx_6) = mpsc::channel(64); - + let all_subsystems = AllSubsystems { + candidate_validation: TestSubsystem5(tx_5), + candidate_backing: TestSubsystem6(tx_6), + candidate_selection: DummySubsystem, + statement_distribution: DummySubsystem, + availability_distribution: DummySubsystem, + bitfield_distribution: DummySubsystem, + provisioner: DummySubsystem, + pov_distribution: DummySubsystem, + runtime_api: DummySubsystem, + availability_store: DummySubsystem, + network_bridge: DummySubsystem, + }; let (overseer, mut handler) = Overseer::new( vec![first_block], - TestSubsystem5(tx_5), - TestSubsystem6(tx_6), + all_subsystems, spawner, ).unwrap(); @@ -1008,11 +1295,23 @@ mod tests { let (tx_5, mut rx_5) = mpsc::channel(64); let (tx_6, mut rx_6) = mpsc::channel(64); + let all_subsystems = AllSubsystems { + candidate_validation: TestSubsystem5(tx_5), + candidate_backing: TestSubsystem6(tx_6), + candidate_selection: DummySubsystem, + statement_distribution: DummySubsystem, + availability_distribution: DummySubsystem, + bitfield_distribution: DummySubsystem, + provisioner: DummySubsystem, + pov_distribution: DummySubsystem, + runtime_api: DummySubsystem, + availability_store: DummySubsystem, + network_bridge: DummySubsystem, + }; // start with two forks of different height. let (overseer, mut handler) = Overseer::new( vec![first_block, second_block], - TestSubsystem5(tx_5), - TestSubsystem6(tx_6), + all_subsystems, spawner, ).unwrap(); diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs index 772481672ad..8a435e6b595 100644 --- a/polkadot/node/service/src/lib.rs +++ b/polkadot/node/service/src/lib.rs @@ -29,11 +29,8 @@ use grandpa::{self, FinalityProofProvider as GrandpaFinalityProofProvider}; use sc_executor::native_executor_instance; use log::info; use sp_blockchain::HeaderBackend; -use polkadot_overseer::{self as overseer, BlockInfo, Overseer, OverseerHandler}; -use polkadot_subsystem::{ - Subsystem, SubsystemContext, SpawnedSubsystem, - messages::{CandidateValidationMessage, CandidateBackingMessage}, -}; +use polkadot_overseer::{self as overseer, AllSubsystems, BlockInfo, Overseer, OverseerHandler}; +use polkadot_subsystem::DummySubsystem; use polkadot_node_core_proposer::ProposerFactory; use sp_trie::PrefixedMemoryDB; pub use service::{ @@ -275,38 +272,28 @@ macro_rules! new_full_start { }} } -struct CandidateValidationSubsystem; - -impl<C> Subsystem<C> for CandidateValidationSubsystem - where C: SubsystemContext<Message = CandidateValidationMessage> -{ - fn start(self, mut ctx: C) -> SpawnedSubsystem { - SpawnedSubsystem(Box::pin(async move { - while let Ok(_) = ctx.recv().await {} - })) - } -} - -struct CandidateBackingSubsystem; - -impl<C> Subsystem<C> for CandidateBackingSubsystem - where C: SubsystemContext<Message = CandidateBackingMessage> -{ - fn start(self, mut ctx: C) -> SpawnedSubsystem { - SpawnedSubsystem(Box::pin(async move { - while let Ok(_) = ctx.recv().await {} - })) - } -} - fn real_overseer<S: futures::task::Spawn>( leaves: impl IntoIterator<Item = BlockInfo>, s: S, ) -> Result<(Overseer<S>, OverseerHandler), ServiceError> { - let validation = CandidateValidationSubsystem; - let candidate_backing = CandidateBackingSubsystem; - Overseer::new(leaves, validation, candidate_backing, s) - .map_err(|e| ServiceError::Other(format!("Failed to create an Overseer: {:?}", e))) + let all_subsystems = AllSubsystems { + candidate_validation: DummySubsystem, + candidate_backing: DummySubsystem, + candidate_selection: DummySubsystem, + statement_distribution: DummySubsystem, + availability_distribution: DummySubsystem, + bitfield_distribution: DummySubsystem, + provisioner: DummySubsystem, + pov_distribution: DummySubsystem, + runtime_api: DummySubsystem, + availability_store: DummySubsystem, + network_bridge: DummySubsystem, + }; + Overseer::new( + leaves, + all_subsystems, + s, + ).map_err(|e| ServiceError::Other(format!("Failed to create an Overseer: {:?}", e))) } /// Builds a new service for a full client. diff --git a/polkadot/node/subsystem/src/lib.rs b/polkadot/node/subsystem/src/lib.rs index fd32d7cfdbc..db9a0629cfd 100644 --- a/polkadot/node/subsystem/src/lib.rs +++ b/polkadot/node/subsystem/src/lib.rs @@ -148,3 +148,21 @@ pub trait Subsystem<C: SubsystemContext> { /// Start this `Subsystem` and return `SpawnedSubsystem`. fn start(self, ctx: C) -> SpawnedSubsystem; } + +/// A dummy subsystem that implements [`Subsystem`] for all +/// types of messages. Used for tests or as a placeholder. +pub struct DummySubsystem; + +impl<C: SubsystemContext> Subsystem<C> for DummySubsystem { + fn start(self, mut ctx: C) -> SpawnedSubsystem { + SpawnedSubsystem(Box::pin(async move { + loop { + match ctx.recv().await { + Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return, + Err(_) => return, + _ => continue, + } + } + })) + } +} diff --git a/polkadot/node/subsystem/src/messages.rs b/polkadot/node/subsystem/src/messages.rs index c98658e4aea..36c293e49fe 100644 --- a/polkadot/node/subsystem/src/messages.rs +++ b/polkadot/node/subsystem/src/messages.rs @@ -246,12 +246,12 @@ pub enum PoVDistributionMessage { /// /// This `CandidateDescriptor` should correspond to a candidate seconded under the provided /// relay-parent hash. - FetchPoV(Hash, CandidateDescriptor, oneshot::Sender<Arc<PoVBlock>>), - /// Distribute a PoV for the given relay-parent and CandidateDescriptor. - /// The PoV should correctly hash to the PoV hash mentioned in the CandidateDescriptor - DistributePoV(Hash, CandidateDescriptor, Arc<PoVBlock>), - /// An update from the network bridge. - NetworkBridgeUpdate(NetworkBridgeEvent), + FetchPoV(Hash, CandidateDescriptor, oneshot::Sender<Arc<PoVBlock>>), + /// Distribute a PoV for the given relay-parent and CandidateDescriptor. + /// The PoV should correctly hash to the PoV hash mentioned in the CandidateDescriptor + DistributePoV(Hash, CandidateDescriptor, Arc<PoVBlock>), + /// An update from the network bridge. + NetworkBridgeUpdate(NetworkBridgeEvent), } /// A message type tying together all message types that are used across Subsystems. -- GitLab