diff --git a/Cargo.lock b/Cargo.lock index 6f1e91f7b025a44deb1eb679debc4d6e858175dd..5935e7ba73017c11cfe20b9aac61fccc660909f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3719,6 +3719,7 @@ dependencies = [ "cumulus-relay-chain-interface", "futures", "parity-scale-codec", + "parking_lot 0.12.1", "polkadot-node-primitives", "polkadot-node-subsystem", "polkadot-overseer", diff --git a/cumulus/client/consensus/aura/Cargo.toml b/cumulus/client/consensus/aura/Cargo.toml index c473b2113dd38c7c1d117255600d08851673c395..5ab3e6f2512974db1dc00234ff47faeed6737b70 100644 --- a/cumulus/client/consensus/aura/Cargo.toml +++ b/cumulus/client/consensus/aura/Cargo.toml @@ -13,6 +13,7 @@ workspace = true async-trait = { workspace = true } codec = { features = ["derive"], workspace = true, default-features = true } futures = { workspace = true } +parking_lot = { workspace = true } tracing = { workspace = true, default-features = true } schnellru = { workspace = true } diff --git a/cumulus/client/consensus/aura/src/equivocation_import_queue.rs b/cumulus/client/consensus/aura/src/equivocation_import_queue.rs index be554bdcfc79b986f11da24404a47556031657b2..68f2d37c8748863be879134d3fd0849adf5efb11 100644 --- a/cumulus/client/consensus/aura/src/equivocation_import_queue.rs +++ b/cumulus/client/consensus/aura/src/equivocation_import_queue.rs @@ -21,6 +21,7 @@ /// should be thrown out and which ones should be kept. use codec::Codec; use cumulus_client_consensus_common::ParachainBlockImportMarker; +use parking_lot::Mutex; use schnellru::{ByLength, LruMap}; use sc_consensus::{ @@ -70,7 +71,7 @@ impl NaiveEquivocationDefender { struct Verifier<P, Client, Block, CIDP> { client: Arc<Client>, create_inherent_data_providers: CIDP, - defender: NaiveEquivocationDefender, + defender: Mutex<NaiveEquivocationDefender>, telemetry: Option<TelemetryHandle>, _phantom: std::marker::PhantomData<fn() -> (Block, P)>, } @@ -88,7 +89,7 @@ where CIDP: CreateInherentDataProviders<Block, ()>, { async fn verify( - &mut self, + &self, mut block_params: BlockImportParams<Block>, ) -> Result<BlockImportParams<Block>, String> { // Skip checks that include execution, if being told so, or when importing only state. @@ -137,7 +138,7 @@ where block_params.post_hash = Some(post_hash); // Check for and reject egregious amounts of equivocations. - if self.defender.insert_and_check(slot) { + if self.defender.lock().insert_and_check(slot) { return Err(format!( "Rejecting block {:?} due to excessive equivocations at slot", post_hash, @@ -243,7 +244,7 @@ where let verifier = Verifier::<P, _, _, _> { client, create_inherent_data_providers, - defender: NaiveEquivocationDefender::default(), + defender: Mutex::new(NaiveEquivocationDefender::default()), telemetry, _phantom: std::marker::PhantomData, }; diff --git a/cumulus/client/consensus/common/src/import_queue.rs b/cumulus/client/consensus/common/src/import_queue.rs index 8024b7695a285a414c89cea74561ece91dfd48de..488693604fefccbe2c9b37a22c8ee3c4809383f2 100644 --- a/cumulus/client/consensus/common/src/import_queue.rs +++ b/cumulus/client/consensus/common/src/import_queue.rs @@ -50,7 +50,7 @@ pub struct VerifyNothing; #[async_trait::async_trait] impl<Block: BlockT> Verifier<Block> for VerifyNothing { async fn verify( - &mut self, + &self, params: BlockImportParams<Block>, ) -> Result<BlockImportParams<Block>, String> { Ok(params) diff --git a/cumulus/client/consensus/common/src/lib.rs b/cumulus/client/consensus/common/src/lib.rs index cebe34e7ea58828372a9261e3be94866e119546a..2b0d8290182abd722ebe72a662b45a9564ff273d 100644 --- a/cumulus/client/consensus/common/src/lib.rs +++ b/cumulus/client/consensus/common/src/lib.rs @@ -172,13 +172,13 @@ impl<Block: BlockT, I: Clone, BE> Clone for ParachainBlockImport<Block, I, BE> { impl<Block, BI, BE> BlockImport<Block> for ParachainBlockImport<Block, BI, BE> where Block: BlockT, - BI: BlockImport<Block> + Send, + BI: BlockImport<Block> + Send + Sync, BE: Backend<Block>, { type Error = BI::Error; async fn check_block( - &mut self, + &self, block: sc_consensus::BlockCheckParams<Block>, ) -> Result<sc_consensus::ImportResult, Self::Error> { self.inner.check_block(block).await diff --git a/cumulus/client/consensus/relay-chain/src/import_queue.rs b/cumulus/client/consensus/relay-chain/src/import_queue.rs index 1b521e79d4820fbc4c6709cb177170afa60d3f37..1d6f039da4c123fc79c1132b7b96e93a96c69411 100644 --- a/cumulus/client/consensus/relay-chain/src/import_queue.rs +++ b/cumulus/client/consensus/relay-chain/src/import_queue.rs @@ -52,7 +52,7 @@ where CIDP: CreateInherentDataProviders<Block, ()>, { async fn verify( - &mut self, + &self, mut block_params: BlockImportParams<Block>, ) -> Result<BlockImportParams<Block>, String> { block_params.fork_choice = Some(sc_consensus::ForkChoiceStrategy::Custom( diff --git a/cumulus/polkadot-parachain/src/service.rs b/cumulus/polkadot-parachain/src/service.rs index 9cd3a0037223d6dd4241e5c1b6d3c61f5dff3d3f..a22ae77a77594680f45ee283f266378d0be207de 100644 --- a/cumulus/polkadot-parachain/src/service.rs +++ b/cumulus/polkadot-parachain/src/service.rs @@ -498,43 +498,26 @@ pub async fn start_shell_node<Net: NetworkBackend<Block, Hash>>( .await } -enum BuildOnAccess<R> { - Uninitialized(Option<Box<dyn FnOnce() -> R + Send + Sync>>), - Initialized(R), -} - -impl<R> BuildOnAccess<R> { - fn get_mut(&mut self) -> &mut R { - loop { - match self { - Self::Uninitialized(f) => { - *self = Self::Initialized((f.take().unwrap())()); - }, - Self::Initialized(ref mut r) => return r, - } - } - } -} - struct Verifier<Client, AuraId> { client: Arc<Client>, - aura_verifier: BuildOnAccess<Box<dyn VerifierT<Block>>>, + aura_verifier: Box<dyn VerifierT<Block>>, relay_chain_verifier: Box<dyn VerifierT<Block>>, _phantom: PhantomData<AuraId>, } #[async_trait::async_trait] -impl<Client, AuraId: AuraIdT> VerifierT<Block> for Verifier<Client, AuraId> +impl<Client, AuraId> VerifierT<Block> for Verifier<Client, AuraId> where - Client: sp_api::ProvideRuntimeApi<Block> + Send + Sync, + Client: ProvideRuntimeApi<Block> + Send + Sync, Client::Api: AuraRuntimeApi<Block, AuraId>, + AuraId: AuraIdT + Sync, { async fn verify( - &mut self, + &self, block_import: BlockImportParams<Block>, ) -> Result<BlockImportParams<Block>, String> { if self.client.runtime_api().has_aura_api(*block_import.header.parent_hash()) { - self.aura_verifier.get_mut().verify(block_import).await + self.aura_verifier.verify(block_import).await } else { self.relay_chain_verifier.verify(block_import).await } @@ -543,7 +526,7 @@ where /// Build the import queue for parachain runtimes that started with relay chain consensus and /// switched to aura. -pub fn build_relay_to_aura_import_queue<RuntimeApi, AuraId: AuraIdT>( +pub fn build_relay_to_aura_import_queue<RuntimeApi, AuraId>( client: Arc<ParachainClient<RuntimeApi>>, block_import: ParachainBlockImport<RuntimeApi>, config: &Configuration, @@ -553,38 +536,35 @@ pub fn build_relay_to_aura_import_queue<RuntimeApi, AuraId: AuraIdT>( where RuntimeApi: ConstructNodeRuntimeApi<Block, ParachainClient<RuntimeApi>>, RuntimeApi::RuntimeApi: AuraRuntimeApi<Block, AuraId>, + AuraId: AuraIdT + Sync, { let verifier_client = client.clone(); - let aura_verifier = move || { - Box::new(cumulus_client_consensus_aura::build_verifier::< - <AuraId as AppCrypto>::Pair, - _, - _, - _, - >(cumulus_client_consensus_aura::BuildVerifierParams { - client: verifier_client.clone(), - create_inherent_data_providers: move |parent_hash, _| { - let cidp_client = verifier_client.clone(); - async move { - let slot_duration = cumulus_client_consensus_aura::slot_duration_at( - &*cidp_client, - parent_hash, - )?; - let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); - - let slot = - sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration( - *timestamp, - slot_duration, - ); - - Ok((slot, timestamp)) - } - }, - telemetry: telemetry_handle, - })) as Box<_> - }; + let aura_verifier = cumulus_client_consensus_aura::build_verifier::< + <AuraId as AppCrypto>::Pair, + _, + _, + _, + >(cumulus_client_consensus_aura::BuildVerifierParams { + client: verifier_client.clone(), + create_inherent_data_providers: move |parent_hash, _| { + let cidp_client = verifier_client.clone(); + async move { + let slot_duration = + cumulus_client_consensus_aura::slot_duration_at(&*cidp_client, parent_hash)?; + let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); + + let slot = + sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration( + *timestamp, + slot_duration, + ); + + Ok((slot, timestamp)) + } + }, + telemetry: telemetry_handle, + }); let relay_chain_verifier = Box::new(RelayChainVerifier::new(client.clone(), |_, _| async { Ok(()) })) as Box<_>; @@ -592,7 +572,7 @@ where let verifier = Verifier { client, relay_chain_verifier, - aura_verifier: BuildOnAccess::Uninitialized(Some(Box::new(aura_verifier))), + aura_verifier: Box::new(aura_verifier), _phantom: PhantomData, }; @@ -632,7 +612,7 @@ pub async fn start_generic_aura_lookahead_node<Net: NetworkBackend<Block, Hash>> /// /// Uses the lookahead collator to support async backing. #[sc_tracing::logging::prefix_logs_with("Parachain")] -pub async fn start_asset_hub_lookahead_node<RuntimeApi, AuraId: AuraIdT, Net>( +pub async fn start_asset_hub_lookahead_node<RuntimeApi, AuraId, Net>( parachain_config: Configuration, polkadot_config: Configuration, collator_options: CollatorOptions, @@ -644,6 +624,7 @@ where RuntimeApi::RuntimeApi: AuraRuntimeApi<Block, AuraId> + pallet_transaction_payment_rpc::TransactionPaymentRuntimeApi<Block, Balance> + substrate_frame_rpc_system::AccountNonceApi<Block, AccountId, Nonce>, + AuraId: AuraIdT + Sync, Net: NetworkBackend<Block, Hash>, { start_node_impl::<RuntimeApi, _, _, _, Net>( diff --git a/prdoc/pr_4844.prdoc b/prdoc/pr_4844.prdoc new file mode 100644 index 0000000000000000000000000000000000000000..999e63c84ed9a0491c6b4683a5ed3d5b25efa8c2 --- /dev/null +++ b/prdoc/pr_4844.prdoc @@ -0,0 +1,34 @@ +title: Make `Verifier::verify` and `BlockImport::check_block` use `&self` instead of `&mut self` + +doc: + - audience: Node Dev + description: | + `Verifier::verify` and `BlockImport::check_block` were refactored to use `&self` instead of `&mut self` + because there is no fundamental requirement for those operations to be exclusive in nature. + +crates: +- name: sc-consensus + bump: major + validate: false +- name: sc-consensus-aura + bump: major +- name: sc-consensus-babe + bump: major +- name: sc-consensus-beefy + bump: major +- name: sc-consensus-grandpa + bump: major +- name: sc-consensus-manual-seal + bump: major +- name: sc-consensus-pow + bump: major +- name: sc-service + bump: major +- name: cumulus-client-consensus-common + bump: major +- name: cumulus-client-consensus-aura + bump: major +- name: cumulus-client-consensus-relay-chain + bump: major +- name: polkadot-parachain-bin + validate: false diff --git a/substrate/client/consensus/aura/src/import_queue.rs b/substrate/client/consensus/aura/src/import_queue.rs index a8777ef8788cc3a247d6d09c146f61bf4cb23e62..79f4faa5ebf97657b0f6d02933a1e7f8421d4f56 100644 --- a/substrate/client/consensus/aura/src/import_queue.rs +++ b/substrate/client/consensus/aura/src/import_queue.rs @@ -174,7 +174,7 @@ where CIDP::InherentDataProviders: InherentDataProviderExt + Send + Sync, { async fn verify( - &mut self, + &self, mut block: BlockImportParams<B>, ) -> Result<BlockImportParams<B>, String> { // Skip checks that include execution, if being told so or when importing only state. diff --git a/substrate/client/consensus/babe/src/lib.rs b/substrate/client/consensus/babe/src/lib.rs index 0c85de24004031fce96be35bc413506069093eb1..0c1eb88758644c0d661c25c4feb670c6054781f4 100644 --- a/substrate/client/consensus/babe/src/lib.rs +++ b/substrate/client/consensus/babe/src/lib.rs @@ -1128,7 +1128,7 @@ where CIDP::InherentDataProviders: InherentDataProviderExt + Send + Sync, { async fn verify( - &mut self, + &self, mut block: BlockImportParams<Block>, ) -> Result<BlockImportParams<Block>, String> { trace!( @@ -1681,7 +1681,7 @@ where } async fn check_block( - &mut self, + &self, block: BlockCheckParams<Block>, ) -> Result<ImportResult, Self::Error> { self.inner.check_block(block).await.map_err(Into::into) diff --git a/substrate/client/consensus/babe/src/tests.rs b/substrate/client/consensus/babe/src/tests.rs index 716067ae4000661beab6aeb90772087720d0a5ae..6f805188b9a42d806f45c01715f1a088770c3bfe 100644 --- a/substrate/client/consensus/babe/src/tests.rs +++ b/substrate/client/consensus/babe/src/tests.rs @@ -143,11 +143,11 @@ thread_local! { pub struct PanickingBlockImport<B>(B); #[async_trait::async_trait] -impl<B: BlockImport<TestBlock>> BlockImport<TestBlock> for PanickingBlockImport<B> +impl<BI> BlockImport<TestBlock> for PanickingBlockImport<BI> where - B: Send, + BI: BlockImport<TestBlock> + Send + Sync, { - type Error = B::Error; + type Error = BI::Error; async fn import_block( &mut self, @@ -157,7 +157,7 @@ where } async fn check_block( - &mut self, + &self, block: BlockCheckParams<TestBlock>, ) -> Result<ImportResult, Self::Error> { Ok(self.0.check_block(block).await.expect("checking block failed")) @@ -198,7 +198,7 @@ impl Verifier<TestBlock> for TestVerifier { /// new set of validators to import. If not, err with an Error-Message /// presented to the User in the logs. async fn verify( - &mut self, + &self, mut block: BlockImportParams<TestBlock>, ) -> Result<BlockImportParams<TestBlock>, String> { // apply post-sealing mutations (i.e. stripping seal, if desired). diff --git a/substrate/client/consensus/beefy/src/import.rs b/substrate/client/consensus/beefy/src/import.rs index c01fb3db4845eb9e413e0dbe781ae395a3b31217..8480268529338fe09333719a2e26141e37c792ea 100644 --- a/substrate/client/consensus/beefy/src/import.rs +++ b/substrate/client/consensus/beefy/src/import.rs @@ -192,7 +192,7 @@ where } async fn check_block( - &mut self, + &self, block: BlockCheckParams<Block>, ) -> Result<ImportResult, Self::Error> { self.inner.check_block(block).await diff --git a/substrate/client/consensus/common/src/block_import.rs b/substrate/client/consensus/common/src/block_import.rs index d91851aea62cf4564464b67bcd0bdcd5d712e139..c5adbb5a5fca0634b1cdb038c569a1de312cf859 100644 --- a/substrate/client/consensus/common/src/block_import.rs +++ b/substrate/client/consensus/common/src/block_import.rs @@ -307,10 +307,7 @@ pub trait BlockImport<B: BlockT> { type Error: std::error::Error + Send + 'static; /// Check block preconditions. - async fn check_block( - &mut self, - block: BlockCheckParams<B>, - ) -> Result<ImportResult, Self::Error>; + async fn check_block(&self, block: BlockCheckParams<B>) -> Result<ImportResult, Self::Error>; /// Import a block. async fn import_block( @@ -324,10 +321,7 @@ impl<B: BlockT> BlockImport<B> for crate::import_queue::BoxBlockImport<B> { type Error = sp_consensus::error::Error; /// Check block preconditions. - async fn check_block( - &mut self, - block: BlockCheckParams<B>, - ) -> Result<ImportResult, Self::Error> { + async fn check_block(&self, block: BlockCheckParams<B>) -> Result<ImportResult, Self::Error> { (**self).check_block(block).await } @@ -348,10 +342,7 @@ where { type Error = E; - async fn check_block( - &mut self, - block: BlockCheckParams<B>, - ) -> Result<ImportResult, Self::Error> { + async fn check_block(&self, block: BlockCheckParams<B>) -> Result<ImportResult, Self::Error> { (&**self).check_block(block).await } diff --git a/substrate/client/consensus/common/src/import_queue.rs b/substrate/client/consensus/common/src/import_queue.rs index 1ddda04126a99e595d306fd36e27fc4edd3fbd6c..35fc8ad4a402e72d9f6c8d110a468bea5f3c5195 100644 --- a/substrate/client/consensus/common/src/import_queue.rs +++ b/substrate/client/consensus/common/src/import_queue.rs @@ -28,6 +28,10 @@ //! queues to be instantiated simply. use log::{debug, trace}; +use std::{ + fmt, + time::{Duration, Instant}, +}; use sp_consensus::{error::Error as ConsensusError, BlockOrigin}; use sp_runtime::{ @@ -93,11 +97,10 @@ pub struct IncomingBlock<B: BlockT> { /// Verify a justification of a block #[async_trait::async_trait] -pub trait Verifier<B: BlockT>: Send { +pub trait Verifier<B: BlockT>: Send + Sync { /// Verify the given block data and return the `BlockImportParams` to /// continue the block import process. - async fn verify(&mut self, block: BlockImportParams<B>) - -> Result<BlockImportParams<B>, String>; + async fn verify(&self, block: BlockImportParams<B>) -> Result<BlockImportParams<B>, String>; } /// Blocks import queue API. @@ -166,16 +169,16 @@ pub trait Link<B: BlockT>: Send { /// Block import successful result. #[derive(Debug, PartialEq)] -pub enum BlockImportStatus<N: std::fmt::Debug + PartialEq> { +pub enum BlockImportStatus<BlockNumber: fmt::Debug + PartialEq> { /// Imported known block. - ImportedKnown(N, Option<RuntimeOrigin>), + ImportedKnown(BlockNumber, Option<RuntimeOrigin>), /// Imported unknown block. - ImportedUnknown(N, ImportedAux, Option<RuntimeOrigin>), + ImportedUnknown(BlockNumber, ImportedAux, Option<RuntimeOrigin>), } -impl<N: std::fmt::Debug + PartialEq> BlockImportStatus<N> { +impl<BlockNumber: fmt::Debug + PartialEq> BlockImportStatus<BlockNumber> { /// Returns the imported block number. - pub fn number(&self) -> &N { + pub fn number(&self) -> &BlockNumber { match self { BlockImportStatus::ImportedKnown(n, _) | BlockImportStatus::ImportedUnknown(n, _, _) => n, @@ -224,44 +227,30 @@ pub async fn import_single_block<B: BlockT, V: Verifier<B>>( block: IncomingBlock<B>, verifier: &mut V, ) -> BlockImportResult<B> { - import_single_block_metered(import_handle, block_origin, block, verifier, None).await + match verify_single_block_metered(import_handle, block_origin, block, verifier, None).await? { + SingleBlockVerificationOutcome::Imported(import_status) => Ok(import_status), + SingleBlockVerificationOutcome::Verified(import_parameters) => + import_single_block_metered(import_handle, import_parameters, None).await, + } } -/// Single block import function with metering. -pub(crate) async fn import_single_block_metered<B: BlockT, V: Verifier<B>>( - import_handle: &mut impl BlockImport<B, Error = ConsensusError>, - block_origin: BlockOrigin, - block: IncomingBlock<B>, - verifier: &mut V, - metrics: Option<Metrics>, -) -> BlockImportResult<B> { - let peer = block.origin; - - let (header, justifications) = match (block.header, block.justifications) { - (Some(header), justifications) => (header, justifications), - (None, _) => { - if let Some(ref peer) = peer { - debug!(target: LOG_TARGET, "Header {} was not provided by {} ", block.hash, peer); - } else { - debug!(target: LOG_TARGET, "Header {} was not provided ", block.hash); - } - return Err(BlockImportError::IncompleteHeader(peer)) - }, - }; - - trace!(target: LOG_TARGET, "Header {} has {:?} logs", block.hash, header.digest().logs().len()); - - let number = *header.number(); - let hash = block.hash; - let parent_hash = *header.parent_hash(); - - let import_handler = |import| match import { +fn import_handler<Block>( + number: NumberFor<Block>, + hash: Block::Hash, + parent_hash: Block::Hash, + block_origin: Option<RuntimeOrigin>, + import: Result<ImportResult, ConsensusError>, +) -> Result<BlockImportStatus<NumberFor<Block>>, BlockImportError> +where + Block: BlockT, +{ + match import { Ok(ImportResult::AlreadyInChain) => { trace!(target: LOG_TARGET, "Block already in chain {}: {:?}", number, hash); - Ok(BlockImportStatus::ImportedKnown(number, peer)) + Ok(BlockImportStatus::ImportedKnown(number, block_origin)) }, Ok(ImportResult::Imported(aux)) => - Ok(BlockImportStatus::ImportedUnknown(number, aux, peer)), + Ok(BlockImportStatus::ImportedUnknown(number, aux, block_origin)), Ok(ImportResult::MissingState) => { debug!( target: LOG_TARGET, @@ -278,15 +267,60 @@ pub(crate) async fn import_single_block_metered<B: BlockT, V: Verifier<B>>( }, Ok(ImportResult::KnownBad) => { debug!(target: LOG_TARGET, "Peer gave us a bad block {}: {:?}", number, hash); - Err(BlockImportError::BadBlock(peer)) + Err(BlockImportError::BadBlock(block_origin)) }, Err(e) => { debug!(target: LOG_TARGET, "Error importing block {}: {:?}: {}", number, hash, e); Err(BlockImportError::Other(e)) }, + } +} + +pub(crate) enum SingleBlockVerificationOutcome<Block: BlockT> { + /// Block is already imported. + Imported(BlockImportStatus<NumberFor<Block>>), + /// Block is verified, but needs to be imported. + Verified(SingleBlockImportParameters<Block>), +} + +pub(crate) struct SingleBlockImportParameters<Block: BlockT> { + import_block: BlockImportParams<Block>, + hash: Block::Hash, + block_origin: Option<RuntimeOrigin>, + verification_time: Duration, +} + +/// Single block import function with metering. +pub(crate) async fn verify_single_block_metered<B: BlockT, V: Verifier<B>>( + import_handle: &impl BlockImport<B, Error = ConsensusError>, + block_origin: BlockOrigin, + block: IncomingBlock<B>, + verifier: &mut V, + metrics: Option<&Metrics>, +) -> Result<SingleBlockVerificationOutcome<B>, BlockImportError> { + let peer = block.origin; + let justifications = block.justifications; + + let Some(header) = block.header else { + if let Some(ref peer) = peer { + debug!(target: LOG_TARGET, "Header {} was not provided by {peer} ", block.hash); + } else { + debug!(target: LOG_TARGET, "Header {} was not provided ", block.hash); + } + return Err(BlockImportError::IncompleteHeader(peer)) }; - match import_handler( + trace!(target: LOG_TARGET, "Header {} has {:?} logs", block.hash, header.digest().logs().len()); + + let number = *header.number(); + let hash = block.hash; + let parent_hash = *header.parent_hash(); + + match import_handler::<B>( + number, + hash, + parent_hash, + peer, import_handle .check_block(BlockCheckParams { hash, @@ -299,10 +333,13 @@ pub(crate) async fn import_single_block_metered<B: BlockT, V: Verifier<B>>( .await, )? { BlockImportStatus::ImportedUnknown { .. } => (), - r => return Ok(r), // Any other successful result means that the block is already imported. + r => { + // Any other successful result means that the block is already imported. + return Ok(SingleBlockVerificationOutcome::Imported(r)) + }, } - let started = std::time::Instant::now(); + let started = Instant::now(); let mut import_block = BlockImportParams::new(block_origin, header); import_block.body = block.body; @@ -333,19 +370,42 @@ pub(crate) async fn import_single_block_metered<B: BlockT, V: Verifier<B>>( } else { trace!(target: LOG_TARGET, "Verifying {}({}) failed: {}", number, hash, msg); } - if let Some(metrics) = metrics.as_ref() { + if let Some(metrics) = metrics { metrics.report_verification(false, started.elapsed()); } BlockImportError::VerificationFailed(peer, msg) })?; - if let Some(metrics) = metrics.as_ref() { - metrics.report_verification(true, started.elapsed()); + let verification_time = started.elapsed(); + if let Some(metrics) = metrics { + metrics.report_verification(true, verification_time); } + Ok(SingleBlockVerificationOutcome::Verified(SingleBlockImportParameters { + import_block, + hash, + block_origin: peer, + verification_time, + })) +} + +pub(crate) async fn import_single_block_metered<Block: BlockT>( + import_handle: &mut impl BlockImport<Block, Error = ConsensusError>, + import_parameters: SingleBlockImportParameters<Block>, + metrics: Option<&Metrics>, +) -> BlockImportResult<Block> { + let started = Instant::now(); + + let SingleBlockImportParameters { import_block, hash, block_origin, verification_time } = + import_parameters; + + let number = *import_block.header.number(); + let parent_hash = *import_block.header.parent_hash(); + let imported = import_handle.import_block(import_block).await; - if let Some(metrics) = metrics.as_ref() { - metrics.report_verification_and_import(started.elapsed()); + if let Some(metrics) = metrics { + metrics.report_verification_and_import(started.elapsed() + verification_time); } - import_handler(imported) + + import_handler::<Block>(number, hash, parent_hash, block_origin, imported) } diff --git a/substrate/client/consensus/common/src/import_queue/basic_queue.rs b/substrate/client/consensus/common/src/import_queue/basic_queue.rs index e5eac3896cc8c0602dcb2451f860635424dc2f26..05f2b252796146f62ba7c258b208fa4c2eb4a3d1 100644 --- a/substrate/client/consensus/common/src/import_queue/basic_queue.rs +++ b/substrate/client/consensus/common/src/import_queue/basic_queue.rs @@ -32,9 +32,9 @@ use std::pin::Pin; use crate::{ import_queue::{ buffered_link::{self, BufferedLinkReceiver, BufferedLinkSender}, - import_single_block_metered, BlockImportError, BlockImportStatus, BoxBlockImport, - BoxJustificationImport, ImportQueue, ImportQueueService, IncomingBlock, Link, - RuntimeOrigin, Verifier, LOG_TARGET, + import_single_block_metered, verify_single_block_metered, BlockImportError, + BlockImportStatus, BoxBlockImport, BoxJustificationImport, ImportQueue, ImportQueueService, + IncomingBlock, Link, RuntimeOrigin, SingleBlockVerificationOutcome, Verifier, LOG_TARGET, }, metrics::Metrics, }; @@ -60,13 +60,16 @@ impl<B: BlockT> BasicQueue<B> { /// Instantiate a new basic queue, with given verifier. /// /// This creates a background task, and calls `on_start` on the justification importer. - pub fn new<V: 'static + Verifier<B>>( + pub fn new<V>( verifier: V, block_import: BoxBlockImport<B>, justification_import: Option<BoxJustificationImport<B>>, spawner: &impl sp_core::traits::SpawnEssentialNamed, prometheus_registry: Option<&Registry>, - ) -> Self { + ) -> Self + where + V: Verifier<B> + 'static, + { let (result_sender, result_port) = buffered_link::buffered_link(100_000); let metrics = prometheus_registry.and_then(|r| { @@ -252,7 +255,7 @@ struct BlockImportWorker<B: BlockT> { } impl<B: BlockT> BlockImportWorker<B> { - fn new<V: 'static + Verifier<B>>( + fn new<V>( result_sender: BufferedLinkSender<B>, verifier: V, block_import: BoxBlockImport<B>, @@ -262,7 +265,10 @@ impl<B: BlockT> BlockImportWorker<B> { impl Future<Output = ()> + Send, TracingUnboundedSender<worker_messages::ImportJustification<B>>, TracingUnboundedSender<worker_messages::ImportBlocks<B>>, - ) { + ) + where + V: Verifier<B> + 'static, + { use worker_messages::*; let (justification_sender, mut justification_port) = @@ -419,15 +425,22 @@ async fn import_many_blocks<B: BlockT, V: Verifier<B>>( let import_result = if has_error { Err(BlockImportError::Cancelled) } else { - // The actual import. - import_single_block_metered( + let verification_fut = verify_single_block_metered( import_handle, blocks_origin, block, verifier, - metrics.clone(), - ) - .await + metrics.as_ref(), + ); + match verification_fut.await { + Ok(SingleBlockVerificationOutcome::Imported(import_status)) => Ok(import_status), + Ok(SingleBlockVerificationOutcome::Verified(import_parameters)) => { + // The actual import. + import_single_block_metered(import_handle, import_parameters, metrics.as_ref()) + .await + }, + Err(e) => Err(e), + } }; if let Some(metrics) = metrics.as_ref() { @@ -494,7 +507,7 @@ mod tests { #[async_trait::async_trait] impl Verifier<Block> for () { async fn verify( - &mut self, + &self, block: BlockImportParams<Block>, ) -> Result<BlockImportParams<Block>, String> { Ok(BlockImportParams::new(block.origin, block.header)) @@ -506,7 +519,7 @@ mod tests { type Error = sp_consensus::Error; async fn check_block( - &mut self, + &self, _block: BlockCheckParams<Block>, ) -> Result<ImportResult, Self::Error> { Ok(ImportResult::imported(false)) diff --git a/substrate/client/consensus/grandpa/src/import.rs b/substrate/client/consensus/grandpa/src/import.rs index b594c0f678cea64efae7980ee17a9876ce6d5013..8b7b02f180ecd582063c5b02e1f4217c0a507e13 100644 --- a/substrate/client/consensus/grandpa/src/import.rs +++ b/substrate/client/consensus/grandpa/src/import.rs @@ -518,7 +518,7 @@ where Client: ClientForGrandpa<Block, BE>, Client::Api: GrandpaApi<Block>, for<'a> &'a Client: BlockImport<Block, Error = ConsensusError>, - SC: Send, + SC: Send + Sync, { type Error = ConsensusError; @@ -697,7 +697,7 @@ where } async fn check_block( - &mut self, + &self, block: BlockCheckParams<Block>, ) -> Result<ImportResult, Self::Error> { self.inner.check_block(block).await diff --git a/substrate/client/consensus/manual-seal/src/consensus/babe.rs b/substrate/client/consensus/manual-seal/src/consensus/babe.rs index bc56ce0227142fee2c001c39ce8d31cd9e6fb9b5..a68e46f0134d655d1b034b2b4a40727627724164 100644 --- a/substrate/client/consensus/manual-seal/src/consensus/babe.rs +++ b/substrate/client/consensus/manual-seal/src/consensus/babe.rs @@ -96,7 +96,7 @@ where C: HeaderBackend<B> + HeaderMetadata<B, Error = sp_blockchain::Error>, { async fn verify( - &mut self, + &self, mut import_params: BlockImportParams<B>, ) -> Result<BlockImportParams<B>, String> { import_params.finalized = false; diff --git a/substrate/client/consensus/manual-seal/src/lib.rs b/substrate/client/consensus/manual-seal/src/lib.rs index 8fc7e7ecab2f45cf8359c2f449dde3b480bb3ad3..39f8f8609d8d7f35867cc8108ad6667263fe5b74 100644 --- a/substrate/client/consensus/manual-seal/src/lib.rs +++ b/substrate/client/consensus/manual-seal/src/lib.rs @@ -65,7 +65,7 @@ struct ManualSealVerifier; #[async_trait::async_trait] impl<B: BlockT> Verifier<B> for ManualSealVerifier { async fn verify( - &mut self, + &self, mut block: BlockImportParams<B>, ) -> Result<BlockImportParams<B>, String> { block.finalized = false; diff --git a/substrate/client/consensus/pow/src/lib.rs b/substrate/client/consensus/pow/src/lib.rs index ee5c1dfc6f11a26599c0f01efee9224caded43cd..50e9533abb36ab24e5d4942d154a378f84c4beec 100644 --- a/substrate/client/consensus/pow/src/lib.rs +++ b/substrate/client/consensus/pow/src/lib.rs @@ -312,10 +312,7 @@ where { type Error = ConsensusError; - async fn check_block( - &mut self, - block: BlockCheckParams<B>, - ) -> Result<ImportResult, Self::Error> { + async fn check_block(&self, block: BlockCheckParams<B>) -> Result<ImportResult, Self::Error> { self.inner.check_block(block).await.map_err(Into::into) } @@ -442,7 +439,7 @@ where Algorithm::Difficulty: 'static + Send, { async fn verify( - &mut self, + &self, mut block: BlockImportParams<B>, ) -> Result<BlockImportParams<B>, String> { let hash = block.header.hash(); diff --git a/substrate/client/network/test/src/lib.rs b/substrate/client/network/test/src/lib.rs index 8a8f9608051af0bcb523b405585ad7c0b81b4ae2..221c8515d6d416a691aaf59128299173064af295 100644 --- a/substrate/client/network/test/src/lib.rs +++ b/substrate/client/network/test/src/lib.rs @@ -114,7 +114,7 @@ impl PassThroughVerifier { #[async_trait::async_trait] impl<B: BlockT> Verifier<B> for PassThroughVerifier { async fn verify( - &mut self, + &self, mut block: BlockImportParams<B>, ) -> Result<BlockImportParams<B>, String> { if block.fork_choice.is_none() { @@ -210,7 +210,7 @@ impl BlockImport<Block> for PeersClient { type Error = ConsensusError; async fn check_block( - &mut self, + &self, block: BlockCheckParams<Block>, ) -> Result<ImportResult, Self::Error> { self.client.check_block(block).await @@ -600,7 +600,7 @@ where type Error = ConsensusError; async fn check_block( - &mut self, + &self, block: BlockCheckParams<Block>, ) -> Result<ImportResult, Self::Error> { self.inner.check_block(block).await @@ -622,10 +622,7 @@ struct VerifierAdapter<B: BlockT> { #[async_trait::async_trait] impl<B: BlockT> Verifier<B> for VerifierAdapter<B> { - async fn verify( - &mut self, - block: BlockImportParams<B>, - ) -> Result<BlockImportParams<B>, String> { + async fn verify(&self, block: BlockImportParams<B>) -> Result<BlockImportParams<B>, String> { let hash = block.header.hash(); self.verifier.lock().await.verify(block).await.map_err(|e| { self.failed_verifications.lock().insert(hash, e.clone()); diff --git a/substrate/client/network/test/src/service.rs b/substrate/client/network/test/src/service.rs index 150c1db7560e6d589e21e9aecb2add06fc57a47c..c4a2b261081e6b101bf4620fd379996b3ae32a69 100644 --- a/substrate/client/network/test/src/service.rs +++ b/substrate/client/network/test/src/service.rs @@ -134,7 +134,7 @@ impl TestNetworkBuilder { #[async_trait::async_trait] impl<B: BlockT> sc_consensus::Verifier<B> for PassThroughVerifier { async fn verify( - &mut self, + &self, mut block: sc_consensus::BlockImportParams<B>, ) -> Result<sc_consensus::BlockImportParams<B>, String> { block.finalized = self.0; diff --git a/substrate/client/service/src/client/client.rs b/substrate/client/service/src/client/client.rs index 2fbcc3ba4f7569863124f55e3712a88af1109fff..a2c9212f7b9c9ce80860aa9ca9f3b9e8b0a844d5 100644 --- a/substrate/client/service/src/client/client.rs +++ b/substrate/client/service/src/client/client.rs @@ -1780,7 +1780,7 @@ where /// Check block preconditions. async fn check_block( - &mut self, + &self, block: BlockCheckParams<Block>, ) -> Result<ImportResult, Self::Error> { let BlockCheckParams { @@ -1862,10 +1862,10 @@ where } async fn check_block( - &mut self, + &self, block: BlockCheckParams<Block>, ) -> Result<ImportResult, Self::Error> { - (&*self).check_block(block).await + (&self).check_block(block).await } }