// This file is part of Substrate. // Copyright (C) Parity Technologies (UK) Ltd. // SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with this program. If not, see . //! A manual sealing engine: the engine listens for rpc calls to seal blocks and create forks. //! This is suitable for a testing environment. use futures::prelude::*; use futures_timer::Delay; use prometheus_endpoint::Registry; use sc_client_api::{ backend::{Backend as ClientBackend, Finalizer}, client::BlockchainEvents, }; use sc_consensus::{ block_import::{BlockImport, BlockImportParams, ForkChoiceStrategy}, import_queue::{BasicQueue, BoxBlockImport, Verifier}, }; use sp_blockchain::HeaderBackend; use sp_consensus::{Environment, Proposer, SelectChain}; use sp_core::traits::SpawnNamed; use sp_inherents::CreateInherentDataProviders; use sp_runtime::{traits::Block as BlockT, ConsensusEngineId}; use std::{marker::PhantomData, sync::Arc, time::Duration}; mod error; mod finalize_block; mod seal_block; pub mod consensus; pub mod rpc; pub use self::{ consensus::ConsensusDataProvider, error::Error, finalize_block::{finalize_block, FinalizeBlockParams}, rpc::{CreatedBlock, EngineCommand}, seal_block::{seal_block, SealBlockParams, MAX_PROPOSAL_DURATION}, }; use sc_transaction_pool_api::TransactionPool; use sp_api::ProvideRuntimeApi; const LOG_TARGET: &str = "manual-seal"; /// The `ConsensusEngineId` of Manual Seal. pub const MANUAL_SEAL_ENGINE_ID: ConsensusEngineId = [b'm', b'a', b'n', b'l']; /// The verifier for the manual seal engine; instantly finalizes. struct ManualSealVerifier; #[async_trait::async_trait] impl Verifier for ManualSealVerifier { async fn verify( &mut self, mut block: BlockImportParams, ) -> Result, String> { block.finalized = false; block.fork_choice = Some(ForkChoiceStrategy::LongestChain); Ok(block) } } /// Instantiate the import queue for the manual seal consensus engine. pub fn import_queue( block_import: BoxBlockImport, spawner: &impl sp_core::traits::SpawnEssentialNamed, registry: Option<&Registry>, ) -> BasicQueue where Block: BlockT, { BasicQueue::new(ManualSealVerifier, block_import, None, spawner, registry) } /// Params required to start the manual sealing authorship task. pub struct ManualSealParams, TP, SC, CS, CIDP, P> { /// Block import instance. pub block_import: BI, /// The environment we are producing blocks for. pub env: E, /// Client instance pub client: Arc, /// Shared reference to the transaction pool. pub pool: Arc, /// Stream, Basically the receiving end of a channel for sending /// commands to the authorship task. pub commands_stream: CS, /// SelectChain strategy. pub select_chain: SC, /// Digest provider for inclusion in blocks. pub consensus_data_provider: Option>>, /// Something that can create the inherent data providers. pub create_inherent_data_providers: CIDP, } /// Params required to start the instant sealing authorship task. pub struct InstantSealParams, TP, SC, CIDP, P> { /// Block import instance for well. importing blocks. pub block_import: BI, /// The environment we are producing blocks for. pub env: E, /// Client instance pub client: Arc, /// Shared reference to the transaction pool. pub pool: Arc, /// SelectChain strategy. pub select_chain: SC, /// Digest provider for inclusion in blocks. pub consensus_data_provider: Option>>, /// Something that can create the inherent data providers. pub create_inherent_data_providers: CIDP, } /// Params required to start the delayed finalization task. pub struct DelayedFinalizeParams { /// Block import instance. pub client: Arc, /// Handle for spawning delayed finalization tasks. pub spawn_handle: S, /// The delay in seconds before a block is finalized. pub delay_sec: u64, } /// Creates the background authorship task for the manually seal engine. pub async fn run_manual_seal( ManualSealParams { mut block_import, mut env, client, pool, mut commands_stream, select_chain, consensus_data_provider, create_inherent_data_providers, }: ManualSealParams, ) where B: BlockT + 'static, BI: BlockImport + Send + Sync + 'static, C: HeaderBackend + Finalizer + ProvideRuntimeApi + 'static, CB: ClientBackend + 'static, E: Environment + 'static, E::Proposer: Proposer, CS: Stream::Hash>> + Unpin + 'static, SC: SelectChain + 'static, TP: TransactionPool, CIDP: CreateInherentDataProviders, P: codec::Encode + Send + Sync + 'static, { while let Some(command) = commands_stream.next().await { match command { EngineCommand::SealNewBlock { create_empty, finalize, parent_hash, sender } => { seal_block(SealBlockParams { sender, parent_hash, finalize, create_empty, env: &mut env, select_chain: &select_chain, block_import: &mut block_import, consensus_data_provider: consensus_data_provider.as_deref(), pool: pool.clone(), client: client.clone(), create_inherent_data_providers: &create_inherent_data_providers, }) .await; }, EngineCommand::FinalizeBlock { hash, sender, justification } => { let justification = justification.map(|j| (MANUAL_SEAL_ENGINE_ID, j)); finalize_block(FinalizeBlockParams { hash, sender, justification, finalizer: client.clone(), _phantom: PhantomData, }) .await }, } } } /// runs the background authorship task for the instant seal engine. /// instant-seal creates a new block for every transaction imported into /// the transaction pool. pub async fn run_instant_seal( InstantSealParams { block_import, env, client, pool, select_chain, consensus_data_provider, create_inherent_data_providers, }: InstantSealParams, ) where B: BlockT + 'static, BI: BlockImport + Send + Sync + 'static, C: HeaderBackend + Finalizer + ProvideRuntimeApi + 'static, CB: ClientBackend + 'static, E: Environment + 'static, E::Proposer: Proposer, SC: SelectChain + 'static, TP: TransactionPool, CIDP: CreateInherentDataProviders, P: codec::Encode + Send + Sync + 'static, { // instant-seal creates blocks as soon as transactions are imported // into the transaction pool. let commands_stream = pool.import_notification_stream().map(|_| EngineCommand::SealNewBlock { create_empty: true, finalize: false, parent_hash: None, sender: None, }); run_manual_seal(ManualSealParams { block_import, env, client, pool, commands_stream, select_chain, consensus_data_provider, create_inherent_data_providers, }) .await } /// Runs the background authorship task for the instant seal engine. /// instant-seal creates a new block for every transaction imported into /// the transaction pool. /// /// This function will finalize the block immediately as well. If you don't /// want this behavior use `run_instant_seal` instead. pub async fn run_instant_seal_and_finalize( InstantSealParams { block_import, env, client, pool, select_chain, consensus_data_provider, create_inherent_data_providers, }: InstantSealParams, ) where B: BlockT + 'static, BI: BlockImport + Send + Sync + 'static, C: HeaderBackend + Finalizer + ProvideRuntimeApi + 'static, CB: ClientBackend + 'static, E: Environment + 'static, E::Proposer: Proposer, SC: SelectChain + 'static, TP: TransactionPool, CIDP: CreateInherentDataProviders, P: codec::Encode + Send + Sync + 'static, { // Creates and finalizes blocks as soon as transactions are imported // into the transaction pool. let commands_stream = pool.import_notification_stream().map(|_| EngineCommand::SealNewBlock { create_empty: false, finalize: true, parent_hash: None, sender: None, }); run_manual_seal(ManualSealParams { block_import, env, client, pool, commands_stream, select_chain, consensus_data_provider, create_inherent_data_providers, }) .await } /// Creates a future for delayed finalization of manual sealed blocks. /// /// The future needs to be spawned in the background alongside the /// [`run_manual_seal`]/[`run_instant_seal`] future. It is required that /// [`EngineCommand::SealNewBlock`] is send with `finalize = false` to not finalize blocks directly /// after building them. This also means that delayed finality can not be used with /// [`run_instant_seal_and_finalize`]. pub async fn run_delayed_finalize( DelayedFinalizeParams { client, spawn_handle, delay_sec }: DelayedFinalizeParams, ) where B: BlockT + 'static, CB: ClientBackend + 'static, C: HeaderBackend + Finalizer + ProvideRuntimeApi + BlockchainEvents + 'static, S: SpawnNamed, { let mut block_import_stream = client.import_notification_stream(); while let Some(notification) = block_import_stream.next().await { let delay = Delay::new(Duration::from_secs(delay_sec)); let cloned_client = client.clone(); spawn_handle.spawn( "delayed-finalize", None, Box::pin(async move { delay.await; finalize_block(FinalizeBlockParams { hash: notification.hash, sender: None, justification: None, finalizer: cloned_client, _phantom: PhantomData, }) .await }), ); } } #[cfg(test)] mod tests { use super::*; use sc_basic_authorship::ProposerFactory; use sc_consensus::ImportedAux; use sc_transaction_pool::{BasicPool, FullChainApi, Options, RevalidationType}; use sc_transaction_pool_api::{MaintainedTransactionPool, TransactionPool, TransactionSource}; use sp_inherents::InherentData; use sp_runtime::generic::{Digest, DigestItem}; use substrate_test_runtime_client::{ AccountKeyring::*, DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt, }; use substrate_test_runtime_transaction_pool::{uxt, TestApi}; fn api() -> Arc { Arc::new(TestApi::empty()) } const SOURCE: TransactionSource = TransactionSource::External; struct TestDigestProvider { _client: Arc, } impl ConsensusDataProvider for TestDigestProvider where B: BlockT, C: ProvideRuntimeApi + Send + Sync, { type Proof = (); fn create_digest( &self, _parent: &B::Header, _inherents: &InherentData, ) -> Result { Ok(Digest { logs: vec![] }) } fn append_block_import( &self, _parent: &B::Header, params: &mut BlockImportParams, _inherents: &InherentData, _proof: Self::Proof, ) -> Result<(), Error> { params.post_digests.push(DigestItem::Other(vec![1])); Ok(()) } } #[tokio::test] async fn instant_seal() { let builder = TestClientBuilder::new(); let (client, select_chain) = builder.build_with_longest_chain(); let client = Arc::new(client); let spawner = sp_core::testing::TaskExecutor::new(); let genesis_hash = client.info().genesis_hash; let pool_api = Arc::new(FullChainApi::new(client.clone(), None, &spawner.clone())); let pool = Arc::new(BasicPool::with_revalidation_type( Options::default(), true.into(), pool_api, None, RevalidationType::Full, spawner.clone(), 0, genesis_hash, genesis_hash, )); let env = ProposerFactory::new(spawner.clone(), client.clone(), pool.clone(), None, None); // this test checks that blocks are created as soon as transactions are imported into the // pool. let (sender, receiver) = futures::channel::oneshot::channel(); let mut sender = Arc::new(Some(sender)); let commands_stream = pool.pool().validated_pool().import_notification_stream().map(move |_| { // we're only going to submit one tx so this fn will only be called once. let mut_sender = Arc::get_mut(&mut sender).unwrap(); let sender = std::mem::take(mut_sender); EngineCommand::SealNewBlock { create_empty: false, finalize: true, parent_hash: None, sender, } }); // spawn the background authorship task tokio::spawn(run_manual_seal(ManualSealParams { block_import: client.clone(), env, client: client.clone(), pool: pool.clone(), commands_stream, select_chain, create_inherent_data_providers: |_, _| async { Ok(()) }, consensus_data_provider: None, })); // submit a transaction to pool. let result = pool.submit_one(genesis_hash, SOURCE, uxt(Alice, 0)).await; // assert that it was successfully imported assert!(result.is_ok()); // assert that the background task returns ok let created_block = receiver.await.unwrap().unwrap(); assert_eq!( created_block, CreatedBlock { hash: created_block.hash, aux: ImportedAux { header_only: false, clear_justification_requests: false, needs_justification: false, bad_justification: false, is_new_best: true, }, proof_size: 0 } ); // assert that there's a new block in the db. assert!(client.header(created_block.hash).unwrap().is_some()); assert_eq!(client.header(created_block.hash).unwrap().unwrap().number, 1) } // TODO: enable once the flakiness is fixed // See https://github.com/paritytech/polkadot-sdk/issues/3603 //#[tokio::test] #[allow(unused)] async fn instant_seal_delayed_finalize() { let builder = TestClientBuilder::new(); let (client, select_chain) = builder.build_with_longest_chain(); let client = Arc::new(client); let spawner = sp_core::testing::TaskExecutor::new(); let genesis_hash = client.info().genesis_hash; let pool_api = Arc::new(FullChainApi::new(client.clone(), None, &spawner.clone())); let pool = Arc::new(BasicPool::with_revalidation_type( Options::default(), true.into(), pool_api, None, RevalidationType::Full, spawner.clone(), 0, genesis_hash, genesis_hash, )); let env = ProposerFactory::new(spawner.clone(), client.clone(), pool.clone(), None, None); // this test checks that blocks are created as soon as transactions are imported into the // pool. let (sender, receiver) = futures::channel::oneshot::channel(); let mut sender = Arc::new(Some(sender)); let commands_stream = pool.pool().validated_pool().import_notification_stream().map(move |_| { // we're only going to submit one tx so this fn will only be called once. let mut_sender = Arc::get_mut(&mut sender).unwrap(); let sender = std::mem::take(mut_sender); EngineCommand::SealNewBlock { create_empty: false, // set to `false`, expecting to be finalized by delayed finalize finalize: false, parent_hash: None, sender, } }); // spawn the background authorship task tokio::spawn(run_manual_seal(ManualSealParams { block_import: client.clone(), commands_stream, env, client: client.clone(), pool: pool.clone(), select_chain, create_inherent_data_providers: |_, _| async { Ok(()) }, consensus_data_provider: None, })); let delay_sec = 5; // spawn the background finality task tokio::spawn(run_delayed_finalize(DelayedFinalizeParams { client: client.clone(), delay_sec, spawn_handle: spawner, })); let mut finality_stream = client.finality_notification_stream(); // submit a transaction to pool. let result = pool.submit_one(genesis_hash, SOURCE, uxt(Alice, 0)).await; // assert that it was successfully imported assert!(result.is_ok()); // assert that the background task returns ok let created_block = receiver.await.unwrap().unwrap(); assert_eq!( created_block, CreatedBlock { hash: created_block.hash, aux: ImportedAux { header_only: false, clear_justification_requests: false, needs_justification: false, bad_justification: false, is_new_best: true, }, proof_size: created_block.proof_size } ); // assert that there's a new block in the db. assert!(client.header(created_block.hash).unwrap().is_some()); assert_eq!(client.header(created_block.hash).unwrap().unwrap().number, 1); assert_eq!(client.info().finalized_hash, client.info().genesis_hash); let finalized = finality_stream.select_next_some().await; assert_eq!(finalized.hash, created_block.hash); } #[tokio::test] async fn manual_seal_and_finalization() { let builder = TestClientBuilder::new(); let (client, select_chain) = builder.build_with_longest_chain(); let client = Arc::new(client); let spawner = sp_core::testing::TaskExecutor::new(); let genesis_hash = client.info().genesis_hash; let pool_api = Arc::new(FullChainApi::new(client.clone(), None, &spawner.clone())); let pool = Arc::new(BasicPool::with_revalidation_type( Options::default(), true.into(), pool_api, None, RevalidationType::Full, spawner.clone(), 0, genesis_hash, genesis_hash, )); let env = ProposerFactory::new(spawner.clone(), client.clone(), pool.clone(), None, None); // this test checks that blocks are created as soon as an engine command is sent over the // stream. let (mut sink, commands_stream) = futures::channel::mpsc::channel(1024); // spawn the background authorship task tokio::spawn(run_manual_seal(ManualSealParams { block_import: client.clone(), env, client: client.clone(), pool: pool.clone(), commands_stream, select_chain, consensus_data_provider: None, create_inherent_data_providers: |_, _| async { Ok(()) }, })); // submit a transaction to pool. let result = pool.submit_one(genesis_hash, SOURCE, uxt(Alice, 0)).await; // assert that it was successfully imported assert!(result.is_ok()); let (tx, rx) = futures::channel::oneshot::channel(); sink.send(EngineCommand::SealNewBlock { parent_hash: None, sender: Some(tx), create_empty: false, finalize: false, }) .await .unwrap(); let created_block = rx.await.unwrap().unwrap(); // assert that the background task returns ok assert_eq!( created_block, CreatedBlock { hash: created_block.hash, aux: ImportedAux { header_only: false, clear_justification_requests: false, needs_justification: false, bad_justification: false, is_new_best: true, }, proof_size: 0 } ); // assert that there's a new block in the db. let header = client.header(created_block.hash).unwrap().unwrap(); let (tx, rx) = futures::channel::oneshot::channel(); sink.send(EngineCommand::FinalizeBlock { sender: Some(tx), hash: header.hash(), justification: None, }) .await .unwrap(); // check that the background task returns ok: rx.await.unwrap().unwrap(); } #[tokio::test] async fn manual_seal_fork_blocks() { let builder = TestClientBuilder::new(); let (client, select_chain) = builder.build_with_longest_chain(); let client = Arc::new(client); let pool_api = Arc::new(FullChainApi::new( client.clone(), None, &sp_core::testing::TaskExecutor::new(), )); let spawner = sp_core::testing::TaskExecutor::new(); let genesis_hash = client.info().genesis_hash; let pool = Arc::new(BasicPool::with_revalidation_type( Options::default(), true.into(), pool_api.clone(), None, RevalidationType::Full, spawner.clone(), 0, genesis_hash, genesis_hash, )); let env = ProposerFactory::new(spawner.clone(), client.clone(), pool.clone(), None, None); // this test checks that blocks are created as soon as an engine command is sent over the // stream. let (mut sink, commands_stream) = futures::channel::mpsc::channel(1024); // spawn the background authorship task tokio::spawn(run_manual_seal(ManualSealParams { block_import: client.clone(), env, client: client.clone(), pool: pool.clone(), commands_stream, select_chain, consensus_data_provider: None, create_inherent_data_providers: |_, _| async { Ok(()) }, })); // submit a transaction to pool. let result = pool.submit_one(genesis_hash, SOURCE, uxt(Alice, 0)).await; // assert that it was successfully imported assert!(result.is_ok()); let (tx, rx) = futures::channel::oneshot::channel(); sink.send(EngineCommand::SealNewBlock { parent_hash: None, sender: Some(tx), create_empty: false, finalize: false, }) .await .unwrap(); let created_block = rx.await.unwrap().unwrap(); // assert that the background task returns ok assert_eq!( created_block, CreatedBlock { hash: created_block.hash, aux: ImportedAux { header_only: false, clear_justification_requests: false, needs_justification: false, bad_justification: false, is_new_best: true }, proof_size: 0 } ); assert!(pool.submit_one(created_block.hash, SOURCE, uxt(Alice, 1)).await.is_ok()); let header = client.header(created_block.hash).expect("db error").expect("imported above"); assert_eq!(header.number, 1); pool.maintain(sc_transaction_pool_api::ChainEvent::NewBestBlock { hash: header.hash(), tree_route: None, }) .await; let (tx1, rx1) = futures::channel::oneshot::channel(); assert!(sink .send(EngineCommand::SealNewBlock { parent_hash: Some(created_block.hash), sender: Some(tx1), create_empty: false, finalize: false, }) .await .is_ok()); assert_matches::assert_matches!(rx1.await.expect("should be no error receiving"), Ok(_)); assert!(pool.submit_one(created_block.hash, SOURCE, uxt(Bob, 0)).await.is_ok()); let (tx2, rx2) = futures::channel::oneshot::channel(); assert!(sink .send(EngineCommand::SealNewBlock { parent_hash: Some(created_block.hash), sender: Some(tx2), create_empty: false, finalize: false, }) .await .is_ok()); let imported = rx2.await.unwrap().unwrap(); // assert that fork block is in the db assert!(client.header(imported.hash).unwrap().is_some()) } #[tokio::test] async fn manual_seal_post_hash() { let builder = TestClientBuilder::new(); let (client, select_chain) = builder.build_with_longest_chain(); let client = Arc::new(client); let spawner = sp_core::testing::TaskExecutor::new(); let genesis_hash = client.header(client.info().genesis_hash).unwrap().unwrap().hash(); let pool = Arc::new(BasicPool::with_revalidation_type( Options::default(), true.into(), api(), None, RevalidationType::Full, spawner.clone(), 0, genesis_hash, genesis_hash, )); let env = ProposerFactory::new(spawner.clone(), client.clone(), pool.clone(), None, None); let (mut sink, commands_stream) = futures::channel::mpsc::channel(1024); // spawn the background authorship task tokio::spawn(run_manual_seal(ManualSealParams { block_import: client.clone(), env, client: client.clone(), pool: pool.clone(), commands_stream, select_chain, // use a provider that pushes some post digest data consensus_data_provider: Some(Box::new(TestDigestProvider { _client: client.clone() })), create_inherent_data_providers: |_, _| async { Ok(()) }, })); let (tx, rx) = futures::channel::oneshot::channel(); sink.send(EngineCommand::SealNewBlock { parent_hash: None, sender: Some(tx), create_empty: true, finalize: false, }) .await .unwrap(); let created_block = rx.await.unwrap().unwrap(); // assert that the background task returned the actual header hash let header = client.header(created_block.hash).unwrap().unwrap(); assert_eq!(header.number, 1); } }