diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index aef1632a9439d3745ade82c0bca115e2f66f42b9..6f90779afe62c17058ecb2b4d2a7a800e8672b64 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -164,7 +164,7 @@ impl Network for Arc<sc_network::NetworkService<Block, Hash>> { } /// The network bridge subsystem. -pub struct NetworkBridge<N>(Option<N>); +pub struct NetworkBridge<N>(N); impl<N> NetworkBridge<N> { /// Create a new network bridge subsystem with underlying network service. @@ -172,7 +172,7 @@ impl<N> NetworkBridge<N> { /// This assumes that the network service has had the notifications protocol for the network /// bridge already registered. See [`notifications_protocol_info`](notifications_protocol_info). pub fn new(net_service: N) -> Self { - NetworkBridge(Some(net_service)) + NetworkBridge(net_service) } } @@ -181,18 +181,10 @@ impl<Net, Context> Subsystem<Context> for NetworkBridge<Net> Net: Network, Context: SubsystemContext<Message=NetworkBridgeMessage>, { - fn start(&mut self, mut ctx: Context) -> SpawnedSubsystem { - SpawnedSubsystem(match self.0.take() { - None => async move { for _ in ctx.recv().await { } }.boxed(), - Some(net) => { - // Swallow error because failure is fatal to the node and we log with more precision - // within `run_network`. - run_network(net, ctx).map(|_| ()).boxed() - } - }) - - - + fn start(self, ctx: Context) -> SpawnedSubsystem { + // Swallow error because failure is fatal to the node and we log with more precision + // within `run_network`. + SpawnedSubsystem(run_network(self.0, ctx).map(|_| ()).boxed()) } } diff --git a/polkadot/node/overseer/examples/minimal-example.rs b/polkadot/node/overseer/examples/minimal-example.rs index 0edc87a6b8db7e63a6364a667cb7c5a1a1d4124e..cdef0340d04fec165ff7109848f156dd98dab26b 100644 --- a/polkadot/node/overseer/examples/minimal-example.rs +++ b/polkadot/node/overseer/examples/minimal-example.rs @@ -74,7 +74,7 @@ impl Subsystem1 { impl<C> Subsystem<C> for Subsystem1 where C: SubsystemContext<Message=CandidateBackingMessage> { - fn start(&mut self, ctx: C) -> SpawnedSubsystem { + fn start(self, ctx: C) -> SpawnedSubsystem { SpawnedSubsystem(Box::pin(async move { Self::run(ctx).await; })) @@ -111,7 +111,7 @@ impl Subsystem2 { impl<C> Subsystem<C> for Subsystem2 where C: SubsystemContext<Message=CandidateValidationMessage> { - fn start(&mut self, ctx: C) -> SpawnedSubsystem { + fn start(self, ctx: C) -> SpawnedSubsystem { SpawnedSubsystem(Box::pin(async move { Self::run(ctx).await; })) @@ -129,8 +129,8 @@ fn main() { let (overseer, _handler) = Overseer::new( vec![], - Box::new(Subsystem2), - Box::new(Subsystem1), + Subsystem2, + Subsystem1, spawner, ).unwrap(); let overseer_fut = overseer.run().fuse(); diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index 8fb8706be429251a22874ac20c7e96e20b5ae3c3..706ba58a5e22324a42d62dd38e577ea431119d97 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -314,7 +314,6 @@ pub type CompatibleSubsystem<M> = Box<dyn Subsystem<OverseerSubsystemContext<M>> /// [`Subsystem`]: trait.Subsystem.html #[allow(dead_code)] struct OverseenSubsystem<M> { - subsystem: CompatibleSubsystem<M>, instance: Option<SubsystemInstance<M>>, } @@ -407,7 +406,7 @@ where /// where C: SubsystemContext<Message=CandidateValidationMessage> /// { /// fn start( - /// &mut self, + /// self, /// mut ctx: C, /// ) -> SpawnedSubsystem { /// SpawnedSubsystem(Box::pin(async move { @@ -423,7 +422,7 @@ where /// where C: SubsystemContext<Message=CandidateBackingMessage> /// { /// fn start( - /// &mut self, + /// self, /// mut ctx: C, /// ) -> SpawnedSubsystem { /// SpawnedSubsystem(Box::pin(async move { @@ -438,8 +437,8 @@ where /// let spawner = executor::ThreadPool::new().unwrap(); /// let (overseer, _handler) = Overseer::new( /// vec![], - /// Box::new(ValidationSubsystem), - /// Box::new(CandidateBackingSubsystem), + /// ValidationSubsystem, + /// CandidateBackingSubsystem, /// spawner, /// ).unwrap(); /// @@ -458,8 +457,8 @@ where /// ``` pub fn new( leaves: impl IntoIterator<Item = BlockInfo>, - validation: CompatibleSubsystem<CandidateValidationMessage>, - candidate_backing: CompatibleSubsystem<CandidateBackingMessage>, + validation: impl Subsystem<OverseerSubsystemContext<CandidateValidationMessage>> + Send, + candidate_backing: impl Subsystem<OverseerSubsystemContext<CandidateBackingMessage>> + Send, mut s: S, ) -> SubsystemResult<(Self, OverseerHandler)> { let (events_tx, events_rx) = mpsc::channel(CHANNEL_CAPACITY); @@ -658,7 +657,7 @@ fn spawn<S: Spawn, M: Send + 'static>( spawner: &mut S, futures: &mut FuturesUnordered<RemoteHandle<()>>, streams: &mut StreamUnordered<mpsc::Receiver<ToOverseer>>, - mut s: CompatibleSubsystem<M>, + s: impl Subsystem<OverseerSubsystemContext<M>>, ) -> SubsystemResult<OverseenSubsystem<M>> { let (to_tx, to_rx) = mpsc::channel(CHANNEL_CAPACITY); let (from_tx, from_rx) = mpsc::channel(CHANNEL_CAPACITY); @@ -675,7 +674,6 @@ fn spawn<S: Spawn, M: Send + 'static>( }); Ok(OverseenSubsystem { - subsystem: s, instance, }) } @@ -692,8 +690,8 @@ mod tests { impl<C> Subsystem<C> for TestSubsystem1 where C: SubsystemContext<Message=CandidateValidationMessage> { - fn start(&mut self, mut ctx: C) -> SpawnedSubsystem { - let mut sender = self.0.clone(); + fn start(self, mut ctx: C) -> SpawnedSubsystem { + let mut sender = self.0; SpawnedSubsystem(Box::pin(async move { let mut i = 0; loop { @@ -717,8 +715,10 @@ mod tests { impl<C> Subsystem<C> for TestSubsystem2 where C: SubsystemContext<Message=CandidateBackingMessage> { - fn start(&mut self, mut ctx: C) -> SpawnedSubsystem { + fn start(self, mut ctx: C) -> SpawnedSubsystem { + let sender = self.0.clone(); SpawnedSubsystem(Box::pin(async move { + let _sender = sender; let mut c: usize = 0; loop { if c < 10 { @@ -759,7 +759,7 @@ mod tests { impl<C> Subsystem<C> for TestSubsystem4 where C: SubsystemContext<Message=CandidateBackingMessage> { - fn start(&mut self, mut _ctx: C) -> SpawnedSubsystem { + fn start(self, mut _ctx: C) -> SpawnedSubsystem { SpawnedSubsystem(Box::pin(async move { // Do nothing and exit. })) @@ -777,8 +777,8 @@ mod tests { let (overseer, mut handler) = Overseer::new( vec![], - Box::new(TestSubsystem1(s1_tx)), - Box::new(TestSubsystem2(s2_tx)), + TestSubsystem1(s1_tx), + TestSubsystem2(s2_tx), spawner, ).unwrap(); let overseer_fut = overseer.run().fuse(); @@ -827,8 +827,8 @@ mod tests { let (s1_tx, _) = mpsc::channel(64); let (overseer, _handle) = Overseer::new( vec![], - Box::new(TestSubsystem1(s1_tx)), - Box::new(TestSubsystem4), + TestSubsystem1(s1_tx), + TestSubsystem4, spawner, ).unwrap(); let overseer_fut = overseer.run().fuse(); @@ -846,7 +846,7 @@ mod tests { impl<C> Subsystem<C> for TestSubsystem5 where C: SubsystemContext<Message=CandidateValidationMessage> { - fn start(&mut self, mut ctx: C) -> SpawnedSubsystem { + fn start(self, mut ctx: C) -> SpawnedSubsystem { let mut sender = self.0.clone(); SpawnedSubsystem(Box::pin(async move { @@ -872,7 +872,7 @@ mod tests { impl<C> Subsystem<C> for TestSubsystem6 where C: SubsystemContext<Message=CandidateBackingMessage> { - fn start(&mut self, mut ctx: C) -> SpawnedSubsystem { + fn start(self, mut ctx: C) -> SpawnedSubsystem { let mut sender = self.0.clone(); SpawnedSubsystem(Box::pin(async move { @@ -925,8 +925,8 @@ mod tests { let (overseer, mut handler) = Overseer::new( vec![first_block], - Box::new(TestSubsystem5(tx_5)), - Box::new(TestSubsystem6(tx_6)), + TestSubsystem5(tx_5), + TestSubsystem6(tx_6), spawner, ).unwrap(); @@ -1010,8 +1010,8 @@ mod tests { // start with two forks of different height. let (overseer, mut handler) = Overseer::new( vec![first_block, second_block], - Box::new(TestSubsystem5(tx_5)), - Box::new(TestSubsystem6(tx_6)), + TestSubsystem5(tx_5), + TestSubsystem6(tx_6), spawner, ).unwrap(); diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs index 8b0e27b8a6b7b3cc3e2d65aedb1aacf28873abd0..c798d3e9aa7174121cd6e5f05f2dc1be74a539e2 100644 --- a/polkadot/node/service/src/lib.rs +++ b/polkadot/node/service/src/lib.rs @@ -272,7 +272,7 @@ struct CandidateValidationSubsystem; impl<C> Subsystem<C> for CandidateValidationSubsystem where C: SubsystemContext<Message = CandidateValidationMessage> { - fn start(&mut self, mut ctx: C) -> SpawnedSubsystem { + fn start(self, mut ctx: C) -> SpawnedSubsystem { SpawnedSubsystem(Box::pin(async move { while let Ok(_) = ctx.recv().await {} })) @@ -284,7 +284,7 @@ struct CandidateBackingSubsystem; impl<C> Subsystem<C> for CandidateBackingSubsystem where C: SubsystemContext<Message = CandidateBackingMessage> { - fn start(&mut self, mut ctx: C) -> SpawnedSubsystem { + fn start(self, mut ctx: C) -> SpawnedSubsystem { SpawnedSubsystem(Box::pin(async move { while let Ok(_) = ctx.recv().await {} })) @@ -295,8 +295,8 @@ fn real_overseer<S: futures::task::Spawn>( leaves: impl IntoIterator<Item = BlockInfo>, s: S, ) -> Result<(Overseer<S>, OverseerHandler), ServiceError> { - let validation = Box::new(CandidateValidationSubsystem); - let candidate_backing = Box::new(CandidateBackingSubsystem); + let validation = CandidateValidationSubsystem; + let candidate_backing = CandidateBackingSubsystem; Overseer::new(leaves, validation, candidate_backing, s) .map_err(|e| ServiceError::Other(format!("Failed to create an Overseer: {:?}", e))) } diff --git a/polkadot/node/subsystem/src/lib.rs b/polkadot/node/subsystem/src/lib.rs index 31d094907f517f79b37968768998825d218579cb..fd32d7cfdbc981742017daaaaee36c7f2b9bc10b 100644 --- a/polkadot/node/subsystem/src/lib.rs +++ b/polkadot/node/subsystem/src/lib.rs @@ -146,5 +146,5 @@ pub trait SubsystemContext: Send + 'static { /// [`Subsystem`]: trait.Subsystem.html pub trait Subsystem<C: SubsystemContext> { /// Start this `Subsystem` and return `SpawnedSubsystem`. - fn start(&mut self, ctx: C) -> SpawnedSubsystem; + fn start(self, ctx: C) -> SpawnedSubsystem; }