diff --git a/substrate/client/consensus/pow/src/lib.rs b/substrate/client/consensus/pow/src/lib.rs index 1f5781434ef71042660cc130301e38d2359e68c7..7b1012888e86913f2e490a0000b83effe1accd91 100644 --- a/substrate/client/consensus/pow/src/lib.rs +++ b/substrate/client/consensus/pow/src/lib.rs @@ -41,13 +41,12 @@ mod worker; -pub use crate::worker::{MiningBuild, MiningMetadata, MiningWorker}; +pub use crate::worker::{MiningBuild, MiningHandle, MiningMetadata}; use crate::worker::UntilImportedOrTimeout; use codec::{Decode, Encode}; use futures::{Future, StreamExt}; use log::*; -use parking_lot::Mutex; use prometheus_endpoint::Registry; use sc_client_api::{self, backend::AuxStore, BlockOf, BlockchainEvents}; use sc_consensus::{ @@ -525,7 +524,7 @@ pub fn start_mining_worker<Block, C, S, Algorithm, E, SO, L, CIDP, CAW>( build_time: Duration, can_author_with: CAW, ) -> ( - Arc<Mutex<MiningWorker<Block, Algorithm, C, L, <E::Proposer as Proposer<Block>>::Proof>>>, + MiningHandle<Block, Algorithm, C, L, <E::Proposer as Proposer<Block>>::Proof>, impl Future<Output = ()>, ) where @@ -543,12 +542,7 @@ where CAW: CanAuthorWith<Block> + Clone + Send + 'static, { let mut timer = UntilImportedOrTimeout::new(client.import_notification_stream(), timeout); - let worker = Arc::new(Mutex::new(MiningWorker { - build: None, - algorithm: algorithm.clone(), - block_import, - justification_sync_link, - })); + let worker = MiningHandle::new(algorithm.clone(), block_import, justification_sync_link); let worker_ret = worker.clone(); let task = async move { @@ -559,7 +553,7 @@ where if sync_oracle.is_major_syncing() { debug!(target: "pow", "Skipping proposal due to sync."); - worker.lock().on_major_syncing(); + worker.on_major_syncing(); continue } @@ -587,7 +581,7 @@ where continue } - if worker.lock().best_hash() == Some(best_hash) { + if worker.best_hash() == Some(best_hash) { continue } @@ -682,7 +676,7 @@ where proposal, }; - worker.lock().on_build(build); + worker.on_build(build); } }; diff --git a/substrate/client/consensus/pow/src/worker.rs b/substrate/client/consensus/pow/src/worker.rs index c0ca16ccad3aa3905df1f6855ee06cb5386c9d72..3faa18ece3188bb46c2a9bc605416a72451934ca 100644 --- a/substrate/client/consensus/pow/src/worker.rs +++ b/substrate/client/consensus/pow/src/worker.rs @@ -22,6 +22,7 @@ use futures::{ }; use futures_timer::Delay; use log::*; +use parking_lot::Mutex; use sc_client_api::ImportNotifications; use sc_consensus::{BlockImportParams, BoxBlockImport, StateAction, StorageChanges}; use sp_consensus::{BlockOrigin, Proposal}; @@ -30,7 +31,16 @@ use sp_runtime::{ traits::{Block as BlockT, Header as HeaderT}, DigestItem, }; -use std::{borrow::Cow, collections::HashMap, pin::Pin, time::Duration}; +use std::{ + borrow::Cow, + collections::HashMap, + pin::Pin, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::Duration, +}; use crate::{PowAlgorithm, PowIntermediate, Seal, INTERMEDIATE_KEY, POW_ENGINE_ID}; @@ -60,21 +70,26 @@ pub struct MiningBuild< pub proposal: Proposal<Block, sp_api::TransactionFor<C, Block>, Proof>, } +/// Version of the mining worker. +#[derive(Eq, PartialEq, Clone, Copy)] +pub struct Version(usize); + /// Mining worker that exposes structs to query the current mining build and submit mined blocks. -pub struct MiningWorker< +pub struct MiningHandle< Block: BlockT, Algorithm: PowAlgorithm<Block>, C: sp_api::ProvideRuntimeApi<Block>, L: sc_consensus::JustificationSyncLink<Block>, Proof, > { - pub(crate) build: Option<MiningBuild<Block, Algorithm, C, Proof>>, - pub(crate) algorithm: Algorithm, - pub(crate) block_import: BoxBlockImport<Block, sp_api::TransactionFor<C, Block>>, - pub(crate) justification_sync_link: L, + version: Arc<AtomicUsize>, + algorithm: Arc<Algorithm>, + justification_sync_link: Arc<L>, + build: Arc<Mutex<Option<MiningBuild<Block, Algorithm, C, Proof>>>>, + block_import: Arc<Mutex<BoxBlockImport<Block, sp_api::TransactionFor<C, Block>>>>, } -impl<Block, Algorithm, C, L, Proof> MiningWorker<Block, Algorithm, C, L, Proof> +impl<Block, Algorithm, C, L, Proof> MiningHandle<Block, Algorithm, C, L, Proof> where Block: BlockT, C: sp_api::ProvideRuntimeApi<Block>, @@ -83,35 +98,65 @@ where L: sc_consensus::JustificationSyncLink<Block>, sp_api::TransactionFor<C, Block>: Send + 'static, { - /// Get the current best hash. `None` if the worker has just started or the client is doing - /// major syncing. - pub fn best_hash(&self) -> Option<Block::Hash> { - self.build.as_ref().map(|b| b.metadata.best_hash) + fn increment_version(&self) { + self.version.fetch_add(1, Ordering::SeqCst); } - pub(crate) fn on_major_syncing(&mut self) { - self.build = None; + pub(crate) fn new( + algorithm: Algorithm, + block_import: BoxBlockImport<Block, sp_api::TransactionFor<C, Block>>, + justification_sync_link: L, + ) -> Self { + Self { + version: Arc::new(AtomicUsize::new(0)), + algorithm: Arc::new(algorithm), + justification_sync_link: Arc::new(justification_sync_link), + build: Arc::new(Mutex::new(None)), + block_import: Arc::new(Mutex::new(block_import)), + } } - pub(crate) fn on_build(&mut self, build: MiningBuild<Block, Algorithm, C, Proof>) { - self.build = Some(build); + pub(crate) fn on_major_syncing(&self) { + let mut build = self.build.lock(); + *build = None; + self.increment_version(); + } + + pub(crate) fn on_build(&self, value: MiningBuild<Block, Algorithm, C, Proof>) { + let mut build = self.build.lock(); + *build = Some(value); + self.increment_version(); + } + + /// Get the version of the mining worker. + /// + /// This returns type `Version` which can only compare equality. If `Version` is unchanged, then + /// it can be certain that `best_hash` and `metadata` were not changed. + pub fn version(&self) -> Version { + Version(self.version.load(Ordering::SeqCst)) + } + + /// Get the current best hash. `None` if the worker has just started or the client is doing + /// major syncing. + pub fn best_hash(&self) -> Option<Block::Hash> { + self.build.lock().as_ref().map(|b| b.metadata.best_hash) } /// Get a copy of the current mining metadata, if available. pub fn metadata(&self) -> Option<MiningMetadata<Block::Hash, Algorithm::Difficulty>> { - self.build.as_ref().map(|b| b.metadata.clone()) + self.build.lock().as_ref().map(|b| b.metadata.clone()) } /// Submit a mined seal. The seal will be validated again. Returns true if the submission is /// successful. - pub async fn submit(&mut self, seal: Seal) -> bool { - if let Some(build) = self.build.take() { + pub async fn submit(&self, seal: Seal) -> bool { + if let Some(metadata) = self.metadata() { match self.algorithm.verify( - &BlockId::Hash(build.metadata.best_hash), - &build.metadata.pre_hash, - build.metadata.pre_runtime.as_ref().map(|v| &v[..]), + &BlockId::Hash(metadata.best_hash), + &metadata.pre_hash, + metadata.pre_runtime.as_ref().map(|v| &v[..]), &seal, - build.metadata.difficulty, + metadata.difficulty, ) { Ok(true) => (), Ok(false) => { @@ -130,55 +175,92 @@ where return false }, } + } else { + warn!( + target: "pow", + "Unable to import mined block: metadata does not exist", + ); + return false + } - let seal = DigestItem::Seal(POW_ENGINE_ID, seal); - let (header, body) = build.proposal.block.deconstruct(); - - let mut import_block = BlockImportParams::new(BlockOrigin::Own, header); - import_block.post_digests.push(seal); - import_block.body = Some(body); - import_block.state_action = - StateAction::ApplyChanges(StorageChanges::Changes(build.proposal.storage_changes)); - - let intermediate = PowIntermediate::<Algorithm::Difficulty> { - difficulty: Some(build.metadata.difficulty), - }; - - import_block - .intermediates - .insert(Cow::from(INTERMEDIATE_KEY), Box::new(intermediate) as Box<_>); - - let header = import_block.post_header(); - match self.block_import.import_block(import_block, HashMap::default()).await { - Ok(res) => { - res.handle_justification( - &header.hash(), - *header.number(), - &mut self.justification_sync_link, - ); - - info!( - target: "pow", - "✅ Successfully mined block on top of: {}", - build.metadata.best_hash - ); - true - }, - Err(err) => { - warn!( - target: "pow", - "Unable to import mined block: {:?}", - err, - ); - false - }, + let build = if let Some(build) = { + let mut build = self.build.lock(); + let value = build.take(); + if value.is_some() { + self.increment_version(); } + value + } { + build } else { warn!( target: "pow", "Unable to import mined block: build does not exist", ); - false + return false + }; + + let seal = DigestItem::Seal(POW_ENGINE_ID, seal); + let (header, body) = build.proposal.block.deconstruct(); + + let mut import_block = BlockImportParams::new(BlockOrigin::Own, header); + import_block.post_digests.push(seal); + import_block.body = Some(body); + import_block.state_action = + StateAction::ApplyChanges(StorageChanges::Changes(build.proposal.storage_changes)); + + let intermediate = PowIntermediate::<Algorithm::Difficulty> { + difficulty: Some(build.metadata.difficulty), + }; + + import_block + .intermediates + .insert(Cow::from(INTERMEDIATE_KEY), Box::new(intermediate) as Box<_>); + + let header = import_block.post_header(); + let mut block_import = self.block_import.lock(); + + match block_import.import_block(import_block, HashMap::default()).await { + Ok(res) => { + res.handle_justification( + &header.hash(), + *header.number(), + &self.justification_sync_link, + ); + + info!( + target: "pow", + "✅ Successfully mined block on top of: {}", + build.metadata.best_hash + ); + true + }, + Err(err) => { + warn!( + target: "pow", + "Unable to import mined block: {:?}", + err, + ); + false + }, + } + } +} + +impl<Block, Algorithm, C, L, Proof> Clone for MiningHandle<Block, Algorithm, C, L, Proof> +where + Block: BlockT, + Algorithm: PowAlgorithm<Block>, + C: sp_api::ProvideRuntimeApi<Block>, + L: sc_consensus::JustificationSyncLink<Block>, +{ + fn clone(&self) -> Self { + Self { + version: self.version.clone(), + algorithm: self.algorithm.clone(), + justification_sync_link: self.justification_sync_link.clone(), + build: self.build.clone(), + block_import: self.block_import.clone(), } } }