From 821e018d75690836c5fab90be11e6127dde2d7a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= <bkchr@users.noreply.github.com> Date: Fri, 19 Feb 2021 17:31:03 +0100 Subject: [PATCH] Ensure we spawn the block import worker as an essential task (#8155) * Ensure we spawn the block import worker as an essential task This pr ensures that we spawn the block import worker as an essential task. This is quite important as we need to bring down the node when the block import is done. Besides that it adds some debug output to the block import worker. * Don't be stupid :D --- .../bin/node-template/node/src/service.rs | 4 +-- substrate/bin/node/cli/src/service.rs | 4 +-- substrate/client/consensus/aura/src/lib.rs | 2 +- substrate/client/consensus/babe/src/lib.rs | 2 +- .../client/consensus/manual-seal/src/lib.rs | 2 +- substrate/client/consensus/pow/src/lib.rs | 2 +- .../client/service/src/task_manager/mod.rs | 11 ++++++++ .../common/src/import_queue/basic_queue.rs | 24 ++++++++++++++--- substrate/primitives/core/src/testing.rs | 10 +++++++ substrate/primitives/core/src/traits.rs | 27 ++++++++++++++++++- 10 files changed, 75 insertions(+), 13 deletions(-) diff --git a/substrate/bin/node-template/node/src/service.rs b/substrate/bin/node-template/node/src/service.rs index 552705f299b..92518ef22de 100644 --- a/substrate/bin/node-template/node/src/service.rs +++ b/substrate/bin/node-template/node/src/service.rs @@ -73,7 +73,7 @@ pub fn new_partial(config: &Configuration) -> Result<sc_service::PartialComponen Some(Box::new(grandpa_block_import.clone())), client.clone(), inherent_data_providers.clone(), - &task_manager.spawn_handle(), + &task_manager.spawn_essential_handle(), config.prometheus_registry(), sp_consensus::CanAuthorWithNativeVersion::new(client.executor().clone()), )?; @@ -295,7 +295,7 @@ pub fn new_light(mut config: Configuration) -> Result<TaskManager, ServiceError> Some(Box::new(grandpa_block_import)), client.clone(), InherentDataProviders::new(), - &task_manager.spawn_handle(), + &task_manager.spawn_essential_handle(), config.prometheus_registry(), sp_consensus::NeverCanAuthor, )?; diff --git a/substrate/bin/node/cli/src/service.rs b/substrate/bin/node/cli/src/service.rs index df3802d3d80..db4ed3f4f1d 100644 --- a/substrate/bin/node/cli/src/service.rs +++ b/substrate/bin/node/cli/src/service.rs @@ -94,7 +94,7 @@ pub fn new_partial(config: &Configuration) -> Result<sc_service::PartialComponen client.clone(), select_chain.clone(), inherent_data_providers.clone(), - &task_manager.spawn_handle(), + &task_manager.spawn_essential_handle(), config.prometheus_registry(), sp_consensus::CanAuthorWithNativeVersion::new(client.executor().clone()), )?; @@ -405,7 +405,7 @@ pub fn new_light_base(mut config: Configuration) -> Result<( client.clone(), select_chain.clone(), inherent_data_providers.clone(), - &task_manager.spawn_handle(), + &task_manager.spawn_essential_handle(), config.prometheus_registry(), sp_consensus::NeverCanAuthor, )?; diff --git a/substrate/client/consensus/aura/src/lib.rs b/substrate/client/consensus/aura/src/lib.rs index eb3c2e93e70..0702ccd7f13 100644 --- a/substrate/client/consensus/aura/src/lib.rs +++ b/substrate/client/consensus/aura/src/lib.rs @@ -849,7 +849,7 @@ pub fn import_queue<B, I, C, P, S, CAW>( P: Pair + Send + Sync + 'static, P::Public: Clone + Eq + Send + Sync + Hash + Debug + Encode + Decode, P::Signature: Encode + Decode, - S: sp_core::traits::SpawnNamed, + S: sp_core::traits::SpawnEssentialNamed, CAW: CanAuthorWith<B> + Send + Sync + 'static, { register_aura_inherent_data_provider(&inherent_data_providers, slot_duration.get())?; diff --git a/substrate/client/consensus/babe/src/lib.rs b/substrate/client/consensus/babe/src/lib.rs index 61be3a2f5e5..a6530dea08d 100644 --- a/substrate/client/consensus/babe/src/lib.rs +++ b/substrate/client/consensus/babe/src/lib.rs @@ -1491,7 +1491,7 @@ pub fn import_queue<Block: BlockT, Client, SelectChain, Inner, CAW>( client: Arc<Client>, select_chain: SelectChain, inherent_data_providers: InherentDataProviders, - spawner: &impl sp_core::traits::SpawnNamed, + spawner: &impl sp_core::traits::SpawnEssentialNamed, registry: Option<&Registry>, can_author_with: CAW, ) -> ClientResult<DefaultImportQueue<Block, Client>> where diff --git a/substrate/client/consensus/manual-seal/src/lib.rs b/substrate/client/consensus/manual-seal/src/lib.rs index 3ec68588573..320f196c105 100644 --- a/substrate/client/consensus/manual-seal/src/lib.rs +++ b/substrate/client/consensus/manual-seal/src/lib.rs @@ -73,7 +73,7 @@ impl<B: BlockT> Verifier<B> for ManualSealVerifier { /// Instantiate the import queue for the manual seal consensus engine. pub fn import_queue<Block, Transaction>( block_import: BoxBlockImport<Block, Transaction>, - spawner: &impl sp_core::traits::SpawnNamed, + spawner: &impl sp_core::traits::SpawnEssentialNamed, registry: Option<&Registry>, ) -> BasicQueue<Block, Transaction> where diff --git a/substrate/client/consensus/pow/src/lib.rs b/substrate/client/consensus/pow/src/lib.rs index 5ac8a41417a..3c7f1a832d3 100644 --- a/substrate/client/consensus/pow/src/lib.rs +++ b/substrate/client/consensus/pow/src/lib.rs @@ -505,7 +505,7 @@ pub fn import_queue<B, Transaction, Algorithm>( justification_import: Option<BoxJustificationImport<B>>, algorithm: Algorithm, inherent_data_providers: InherentDataProviders, - spawner: &impl sp_core::traits::SpawnNamed, + spawner: &impl sp_core::traits::SpawnEssentialNamed, registry: Option<&Registry>, ) -> Result< PowImportQueue<B, Transaction>, diff --git a/substrate/client/service/src/task_manager/mod.rs b/substrate/client/service/src/task_manager/mod.rs index 02d83e6dce7..c7254f1f894 100644 --- a/substrate/client/service/src/task_manager/mod.rs +++ b/substrate/client/service/src/task_manager/mod.rs @@ -150,6 +150,7 @@ impl sp_core::traits::SpawnNamed for SpawnTaskHandle { /// task spawned through it fails. The service should be on the receiver side /// and will shut itself down whenever it receives any message, i.e. an /// essential task has failed. +#[derive(Clone)] pub struct SpawnEssentialTaskHandle { essential_failed_tx: TracingUnboundedSender<()>, inner: SpawnTaskHandle, @@ -203,6 +204,16 @@ impl SpawnEssentialTaskHandle { } } +impl sp_core::traits::SpawnEssentialNamed for SpawnEssentialTaskHandle { + fn spawn_essential_blocking(&self, name: &'static str, future: BoxFuture<'static, ()>) { + self.spawn_blocking(name, future); + } + + fn spawn_essential(&self, name: &'static str, future: BoxFuture<'static, ()>) { + self.spawn(name, future); + } +} + /// Helper struct to manage background/async tasks in Service. pub struct TaskManager { /// A future that resolves when the service has exited, this is useful to diff --git a/substrate/primitives/consensus/common/src/import_queue/basic_queue.rs b/substrate/primitives/consensus/common/src/import_queue/basic_queue.rs index 541c1ff0f4e..f1b42e1460e 100644 --- a/substrate/primitives/consensus/common/src/import_queue/basic_queue.rs +++ b/substrate/primitives/consensus/common/src/import_queue/basic_queue.rs @@ -62,7 +62,7 @@ impl<B: BlockT, Transaction: Send + 'static> BasicQueue<B, Transaction> { verifier: V, block_import: BoxBlockImport<B, Transaction>, justification_import: Option<BoxJustificationImport<B>>, - spawner: &impl sp_core::traits::SpawnNamed, + spawner: &impl sp_core::traits::SpawnEssentialNamed, prometheus_registry: Option<&Registry>, ) -> Self { let (result_sender, result_port) = buffered_link::buffered_link(); @@ -83,7 +83,7 @@ impl<B: BlockT, Transaction: Send + 'static> BasicQueue<B, Transaction> { metrics, ); - spawner.spawn_blocking("basic-block-import-worker", future.boxed()); + spawner.spawn_essential_blocking("basic-block-import-worker", future.boxed()); Self { justification_sender, @@ -164,7 +164,13 @@ async fn block_import_process<B: BlockT, Transaction: Send>( loop { let worker_messages::ImportBlocks(origin, blocks) = match block_import_receiver.next().await { Some(blocks) => blocks, - None => return, + None => { + log::debug!( + target: "block-import", + "Stopping block import because the import channel was closed!", + ); + return + }, }; let res = import_many_blocks( @@ -236,6 +242,10 @@ impl<B: BlockT> BlockImportWorker<B> { // If the results sender is closed, that means that the import queue is shutting // down and we should end this future. if worker.result_sender.is_closed() { + log::debug!( + target: "block-import", + "Stopping block import because result channel was closed!", + ); return; } @@ -244,7 +254,13 @@ impl<B: BlockT> BlockImportWorker<B> { match justification { Some(ImportJustification(who, hash, number, justification)) => worker.import_justification(who, hash, number, justification), - None => return, + None => { + log::debug!( + target: "block-import", + "Stopping block import because justification channel was closed!", + ); + return + }, } } diff --git a/substrate/primitives/core/src/testing.rs b/substrate/primitives/core/src/testing.rs index 1506abb77f9..b33f518c32e 100644 --- a/substrate/primitives/core/src/testing.rs +++ b/substrate/primitives/core/src/testing.rs @@ -152,3 +152,13 @@ impl crate::traits::SpawnNamed for TaskExecutor { self.0.spawn_ok(future); } } + +#[cfg(feature = "std")] +impl crate::traits::SpawnEssentialNamed for TaskExecutor { + fn spawn_essential_blocking(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) { + self.0.spawn_ok(future); + } + fn spawn_essential(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) { + self.0.spawn_ok(future); + } +} diff --git a/substrate/primitives/core/src/traits.rs b/substrate/primitives/core/src/traits.rs index 15c1816331d..90f8060f9a5 100644 --- a/substrate/primitives/core/src/traits.rs +++ b/substrate/primitives/core/src/traits.rs @@ -205,7 +205,7 @@ sp_externalities::decl_extension! { pub struct RuntimeSpawnExt(Box<dyn RuntimeSpawn>); } -/// Something that can spawn futures (blocking and non-blocking) with an assigned name. +/// Something that can spawn tasks (blocking and non-blocking) with an assigned name. #[dyn_clonable::clonable] pub trait SpawnNamed: Clone + Send + Sync { /// Spawn the given blocking future. @@ -227,3 +227,28 @@ impl SpawnNamed for Box<dyn SpawnNamed> { (**self).spawn(name, future) } } + +/// Something that can spawn essential tasks (blocking and non-blocking) with an assigned name. +/// +/// Essential tasks are special tasks that should take down the node when they end. +#[dyn_clonable::clonable] +pub trait SpawnEssentialNamed: Clone + Send + Sync { + /// Spawn the given blocking future. + /// + /// The given `name` is used to identify the future in tracing. + fn spawn_essential_blocking(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>); + /// Spawn the given non-blocking future. + /// + /// The given `name` is used to identify the future in tracing. + fn spawn_essential(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>); +} + +impl SpawnEssentialNamed for Box<dyn SpawnEssentialNamed> { + fn spawn_essential_blocking(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>) { + (**self).spawn_essential_blocking(name, future) + } + + fn spawn_essential(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>) { + (**self).spawn_essential(name, future) + } +} -- GitLab