diff --git a/substrate/bin/node-template/node/src/service.rs b/substrate/bin/node-template/node/src/service.rs index 552705f299b8413b35557d27c873886c1ebd4c0c..92518ef22dee2d099bd17610947017c3bc1bdd85 100644 --- a/substrate/bin/node-template/node/src/service.rs +++ b/substrate/bin/node-template/node/src/service.rs @@ -73,7 +73,7 @@ pub fn new_partial(config: &Configuration) -> Result<sc_service::PartialComponen Some(Box::new(grandpa_block_import.clone())), client.clone(), inherent_data_providers.clone(), - &task_manager.spawn_handle(), + &task_manager.spawn_essential_handle(), config.prometheus_registry(), sp_consensus::CanAuthorWithNativeVersion::new(client.executor().clone()), )?; @@ -295,7 +295,7 @@ pub fn new_light(mut config: Configuration) -> Result<TaskManager, ServiceError> Some(Box::new(grandpa_block_import)), client.clone(), InherentDataProviders::new(), - &task_manager.spawn_handle(), + &task_manager.spawn_essential_handle(), config.prometheus_registry(), sp_consensus::NeverCanAuthor, )?; diff --git a/substrate/bin/node/cli/src/service.rs b/substrate/bin/node/cli/src/service.rs index df3802d3d802cd741141fd555ac27be181907dfa..db4ed3f4f1dc4af6bf30d53eb46271d778c93b22 100644 --- a/substrate/bin/node/cli/src/service.rs +++ b/substrate/bin/node/cli/src/service.rs @@ -94,7 +94,7 @@ pub fn new_partial(config: &Configuration) -> Result<sc_service::PartialComponen client.clone(), select_chain.clone(), inherent_data_providers.clone(), - &task_manager.spawn_handle(), + &task_manager.spawn_essential_handle(), config.prometheus_registry(), sp_consensus::CanAuthorWithNativeVersion::new(client.executor().clone()), )?; @@ -405,7 +405,7 @@ pub fn new_light_base(mut config: Configuration) -> Result<( client.clone(), select_chain.clone(), inherent_data_providers.clone(), - &task_manager.spawn_handle(), + &task_manager.spawn_essential_handle(), config.prometheus_registry(), sp_consensus::NeverCanAuthor, )?; diff --git a/substrate/client/consensus/aura/src/lib.rs b/substrate/client/consensus/aura/src/lib.rs index eb3c2e93e7044f612a59d931357b03e879aedf03..0702ccd7f1357f9d32e47618c4e1382c94069178 100644 --- a/substrate/client/consensus/aura/src/lib.rs +++ b/substrate/client/consensus/aura/src/lib.rs @@ -849,7 +849,7 @@ pub fn import_queue<B, I, C, P, S, CAW>( P: Pair + Send + Sync + 'static, P::Public: Clone + Eq + Send + Sync + Hash + Debug + Encode + Decode, P::Signature: Encode + Decode, - S: sp_core::traits::SpawnNamed, + S: sp_core::traits::SpawnEssentialNamed, CAW: CanAuthorWith<B> + Send + Sync + 'static, { register_aura_inherent_data_provider(&inherent_data_providers, slot_duration.get())?; diff --git a/substrate/client/consensus/babe/src/lib.rs b/substrate/client/consensus/babe/src/lib.rs index 61be3a2f5e5bca8455814a7fa679e7310b62b27b..a6530dea08dc54c41f4c3ad70f946b8cd4dd3835 100644 --- a/substrate/client/consensus/babe/src/lib.rs +++ b/substrate/client/consensus/babe/src/lib.rs @@ -1491,7 +1491,7 @@ pub fn import_queue<Block: BlockT, Client, SelectChain, Inner, CAW>( client: Arc<Client>, select_chain: SelectChain, inherent_data_providers: InherentDataProviders, - spawner: &impl sp_core::traits::SpawnNamed, + spawner: &impl sp_core::traits::SpawnEssentialNamed, registry: Option<&Registry>, can_author_with: CAW, ) -> ClientResult<DefaultImportQueue<Block, Client>> where diff --git a/substrate/client/consensus/manual-seal/src/lib.rs b/substrate/client/consensus/manual-seal/src/lib.rs index 3ec68588573ebbd48fb3f39f0aa833d619720508..320f196c1052cbae3e55de11d9546303283177ea 100644 --- a/substrate/client/consensus/manual-seal/src/lib.rs +++ b/substrate/client/consensus/manual-seal/src/lib.rs @@ -73,7 +73,7 @@ impl<B: BlockT> Verifier<B> for ManualSealVerifier { /// Instantiate the import queue for the manual seal consensus engine. pub fn import_queue<Block, Transaction>( block_import: BoxBlockImport<Block, Transaction>, - spawner: &impl sp_core::traits::SpawnNamed, + spawner: &impl sp_core::traits::SpawnEssentialNamed, registry: Option<&Registry>, ) -> BasicQueue<Block, Transaction> where diff --git a/substrate/client/consensus/pow/src/lib.rs b/substrate/client/consensus/pow/src/lib.rs index 5ac8a41417a8812dddc7cc035f511cb8955881b4..3c7f1a832d3cd98210a89e958cff52548b9b7c38 100644 --- a/substrate/client/consensus/pow/src/lib.rs +++ b/substrate/client/consensus/pow/src/lib.rs @@ -505,7 +505,7 @@ pub fn import_queue<B, Transaction, Algorithm>( justification_import: Option<BoxJustificationImport<B>>, algorithm: Algorithm, inherent_data_providers: InherentDataProviders, - spawner: &impl sp_core::traits::SpawnNamed, + spawner: &impl sp_core::traits::SpawnEssentialNamed, registry: Option<&Registry>, ) -> Result< PowImportQueue<B, Transaction>, diff --git a/substrate/client/service/src/task_manager/mod.rs b/substrate/client/service/src/task_manager/mod.rs index 02d83e6dce7c1469067844262eb8225d237c5295..c7254f1f894de9e63dcdeba0c37bc3cde9f2b8db 100644 --- a/substrate/client/service/src/task_manager/mod.rs +++ b/substrate/client/service/src/task_manager/mod.rs @@ -150,6 +150,7 @@ impl sp_core::traits::SpawnNamed for SpawnTaskHandle { /// task spawned through it fails. The service should be on the receiver side /// and will shut itself down whenever it receives any message, i.e. an /// essential task has failed. +#[derive(Clone)] pub struct SpawnEssentialTaskHandle { essential_failed_tx: TracingUnboundedSender<()>, inner: SpawnTaskHandle, @@ -203,6 +204,16 @@ impl SpawnEssentialTaskHandle { } } +impl sp_core::traits::SpawnEssentialNamed for SpawnEssentialTaskHandle { + fn spawn_essential_blocking(&self, name: &'static str, future: BoxFuture<'static, ()>) { + self.spawn_blocking(name, future); + } + + fn spawn_essential(&self, name: &'static str, future: BoxFuture<'static, ()>) { + self.spawn(name, future); + } +} + /// Helper struct to manage background/async tasks in Service. pub struct TaskManager { /// A future that resolves when the service has exited, this is useful to diff --git a/substrate/primitives/consensus/common/src/import_queue/basic_queue.rs b/substrate/primitives/consensus/common/src/import_queue/basic_queue.rs index 541c1ff0f4ed7000cd33e510e8e09e6be4dfd435..f1b42e1460e59165b35ba02f4d300622495b7972 100644 --- a/substrate/primitives/consensus/common/src/import_queue/basic_queue.rs +++ b/substrate/primitives/consensus/common/src/import_queue/basic_queue.rs @@ -62,7 +62,7 @@ impl<B: BlockT, Transaction: Send + 'static> BasicQueue<B, Transaction> { verifier: V, block_import: BoxBlockImport<B, Transaction>, justification_import: Option<BoxJustificationImport<B>>, - spawner: &impl sp_core::traits::SpawnNamed, + spawner: &impl sp_core::traits::SpawnEssentialNamed, prometheus_registry: Option<&Registry>, ) -> Self { let (result_sender, result_port) = buffered_link::buffered_link(); @@ -83,7 +83,7 @@ impl<B: BlockT, Transaction: Send + 'static> BasicQueue<B, Transaction> { metrics, ); - spawner.spawn_blocking("basic-block-import-worker", future.boxed()); + spawner.spawn_essential_blocking("basic-block-import-worker", future.boxed()); Self { justification_sender, @@ -164,7 +164,13 @@ async fn block_import_process<B: BlockT, Transaction: Send>( loop { let worker_messages::ImportBlocks(origin, blocks) = match block_import_receiver.next().await { Some(blocks) => blocks, - None => return, + None => { + log::debug!( + target: "block-import", + "Stopping block import because the import channel was closed!", + ); + return + }, }; let res = import_many_blocks( @@ -236,6 +242,10 @@ impl<B: BlockT> BlockImportWorker<B> { // If the results sender is closed, that means that the import queue is shutting // down and we should end this future. if worker.result_sender.is_closed() { + log::debug!( + target: "block-import", + "Stopping block import because result channel was closed!", + ); return; } @@ -244,7 +254,13 @@ impl<B: BlockT> BlockImportWorker<B> { match justification { Some(ImportJustification(who, hash, number, justification)) => worker.import_justification(who, hash, number, justification), - None => return, + None => { + log::debug!( + target: "block-import", + "Stopping block import because justification channel was closed!", + ); + return + }, } } diff --git a/substrate/primitives/core/src/testing.rs b/substrate/primitives/core/src/testing.rs index 1506abb77f9c1cab6068c8d856d0c2019f021ebc..b33f518c32ee0db6f27699d950452656ab288d66 100644 --- a/substrate/primitives/core/src/testing.rs +++ b/substrate/primitives/core/src/testing.rs @@ -152,3 +152,13 @@ impl crate::traits::SpawnNamed for TaskExecutor { self.0.spawn_ok(future); } } + +#[cfg(feature = "std")] +impl crate::traits::SpawnEssentialNamed for TaskExecutor { + fn spawn_essential_blocking(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) { + self.0.spawn_ok(future); + } + fn spawn_essential(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) { + self.0.spawn_ok(future); + } +} diff --git a/substrate/primitives/core/src/traits.rs b/substrate/primitives/core/src/traits.rs index 15c1816331d00a87185c8cac8b0ed215d243c90c..90f8060f9a5654fe2a008ae7719b8fdbe5d0da34 100644 --- a/substrate/primitives/core/src/traits.rs +++ b/substrate/primitives/core/src/traits.rs @@ -205,7 +205,7 @@ sp_externalities::decl_extension! { pub struct RuntimeSpawnExt(Box<dyn RuntimeSpawn>); } -/// Something that can spawn futures (blocking and non-blocking) with an assigned name. +/// Something that can spawn tasks (blocking and non-blocking) with an assigned name. #[dyn_clonable::clonable] pub trait SpawnNamed: Clone + Send + Sync { /// Spawn the given blocking future. @@ -227,3 +227,28 @@ impl SpawnNamed for Box<dyn SpawnNamed> { (**self).spawn(name, future) } } + +/// Something that can spawn essential tasks (blocking and non-blocking) with an assigned name. +/// +/// Essential tasks are special tasks that should take down the node when they end. +#[dyn_clonable::clonable] +pub trait SpawnEssentialNamed: Clone + Send + Sync { + /// Spawn the given blocking future. + /// + /// The given `name` is used to identify the future in tracing. + fn spawn_essential_blocking(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>); + /// Spawn the given non-blocking future. + /// + /// The given `name` is used to identify the future in tracing. + fn spawn_essential(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>); +} + +impl SpawnEssentialNamed for Box<dyn SpawnEssentialNamed> { + fn spawn_essential_blocking(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>) { + (**self).spawn_essential_blocking(name, future) + } + + fn spawn_essential(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>) { + (**self).spawn_essential(name, future) + } +}