// Copyright 2020 Parity Technologies (UK) Ltd. // This file is part of Substrate. // Substrate is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Substrate is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . //! A manual sealing engine: the engine listens for rpc calls to seal blocks and create forks. //! This is suitable for a testing environment. use sp_consensus::{ self, BlockImport, Environment, Proposer, BlockCheckParams, ForkChoiceStrategy, BlockImportParams, BlockOrigin, ImportResult, SelectChain, import_queue::{ BasicQueue, CacheKeyId, Verifier, BoxBlockImport, }, }; use sp_inherents::InherentDataProviders; use sp_runtime::{traits::Block as BlockT, Justification}; use sc_client_api::backend::Backend as ClientBackend; use futures::prelude::*; use sc_transaction_pool::txpool; use std::collections::HashMap; use std::sync::Arc; pub mod rpc; mod error; mod finalize_block; mod seal_new_block; use finalize_block::{finalize_block, FinalizeBlockParams}; use seal_new_block::{seal_new_block, SealBlockParams}; pub use error::Error; pub use rpc::{EngineCommand, CreatedBlock}; /// The synchronous block-import worker of the engine. pub struct ManualSealBlockImport { inner: I, } impl From for ManualSealBlockImport { fn from(i: I) -> Self { ManualSealBlockImport { inner: i } } } impl BlockImport for ManualSealBlockImport where B: BlockT, I: BlockImport, { type Error = I::Error; type Transaction = (); fn check_block(&mut self, block: BlockCheckParams) -> Result { self.inner.check_block(block) } fn import_block( &mut self, block: BlockImportParams, cache: HashMap>, ) -> Result { self.inner.import_block(block, cache) } } /// The verifier for the manual seal engine; instantly finalizes. struct ManualSealVerifier; impl Verifier for ManualSealVerifier { fn verify( &mut self, origin: BlockOrigin, header: B::Header, justification: Option, body: Option>, ) -> Result<(BlockImportParams, Option)>>), String> { let mut import_params = BlockImportParams::new(origin, header); import_params.justification = justification; import_params.body = body; import_params.finalized = true; import_params.fork_choice = Some(ForkChoiceStrategy::LongestChain); Ok((import_params, None)) } } /// Instantiate the import queue for the manual seal consensus engine. pub fn import_queue(block_import: BoxBlockImport) -> BasicQueue { BasicQueue::new( ManualSealVerifier, block_import, None, None, ) } /// Creates the background authorship task for the manual seal engine. pub async fn run_manual_seal( mut block_import: BoxBlockImport, mut env: E, backend: Arc, pool: Arc>, mut seal_block_channel: S, select_chain: C, inherent_data_providers: InherentDataProviders, ) where B: BlockT + 'static, CB: ClientBackend + 'static, E: Environment + 'static, E::Error: std::fmt::Display, >::Error: std::fmt::Display, A: txpool::ChainApi::Hash> + 'static, S: Stream::Hash>> + Unpin + 'static, C: SelectChain + 'static, { while let Some(command) = seal_block_channel.next().await { match command { EngineCommand::SealNewBlock { create_empty, finalize, parent_hash, sender, } => { seal_new_block( SealBlockParams { sender, parent_hash, finalize, create_empty, env: &mut env, select_chain: &select_chain, block_import: &mut block_import, inherent_data_provider: &inherent_data_providers, pool: pool.clone(), backend: backend.clone(), } ).await; } EngineCommand::FinalizeBlock { hash, sender, justification } => { finalize_block( FinalizeBlockParams { hash, sender, justification, backend: backend.clone(), } ).await } } } } /// runs the background authorship task for the instant seal engine. /// instant-seal creates a new block for every transaction imported into /// the transaction pool. pub async fn run_instant_seal( block_import: BoxBlockImport, env: E, backend: Arc, pool: Arc>, select_chain: C, inherent_data_providers: InherentDataProviders, ) where A: txpool::ChainApi::Hash> + 'static, B: BlockT + 'static, CB: ClientBackend + 'static, E: Environment + 'static, E::Error: std::fmt::Display, >::Error: std::fmt::Display, C: SelectChain + 'static { // instant-seal creates blocks as soon as transactions are imported // into the transaction pool. let seal_block_channel = pool.validated_pool().import_notification_stream() .map(|_| { EngineCommand::SealNewBlock { create_empty: false, finalize: false, parent_hash: None, sender: None, } }); run_manual_seal( block_import, env, backend, pool, seal_block_channel, select_chain, inherent_data_providers, ).await } #[cfg(test)] mod tests { use super::*; use substrate_test_runtime_client::{ DefaultTestClientBuilderExt, TestClientBuilderExt, AccountKeyring::*, TestClientBuilder, }; use sc_transaction_pool::{ BasicPool, txpool::Options, }; use substrate_test_runtime_transaction_pool::{TestApi, uxt}; use sp_transaction_pool::TransactionPool; use sp_runtime::generic::BlockId; use sp_blockchain::HeaderBackend; use sp_consensus::ImportedAux; use sc_client::LongestChain; use sp_inherents::InherentDataProviders; use sc_basic_authorship::ProposerFactory; fn api() -> Arc { Arc::new(TestApi::empty()) } #[tokio::test] async fn instant_seal() { let builder = TestClientBuilder::new(); let backend = builder.backend(); let client = Arc::new(builder.build()); let select_chain = LongestChain::new(backend.clone()); let inherent_data_providers = InherentDataProviders::new(); let pool = Arc::new(BasicPool::new(Options::default(), api()).0); let env = ProposerFactory { transaction_pool: pool.clone(), client: client.clone(), }; // this test checks that blocks are created as soon as transactions are imported into the pool. let (sender, receiver) = futures::channel::oneshot::channel(); let mut sender = Arc::new(Some(sender)); let stream = pool.pool().validated_pool().import_notification_stream() .map(move |_| { // we're only going to submit one tx so this fn will only be called once. let mut_sender = Arc::get_mut(&mut sender).unwrap(); let sender = std::mem::replace(mut_sender, None); EngineCommand::SealNewBlock { create_empty: false, finalize: true, parent_hash: None, sender } }); let future = run_manual_seal( Box::new(client.clone()), env, backend.clone(), pool.pool().clone(), stream, select_chain, inherent_data_providers, ); std::thread::spawn(|| { let mut rt = tokio::runtime::Runtime::new().unwrap(); // spawn the background authorship task rt.block_on(future); }); // submit a transaction to pool. let result = pool.submit_one(&BlockId::Number(0), uxt(Alice, 0)).await; // assert that it was successfully imported assert!(result.is_ok()); // assert that the background task returns ok let created_block = receiver.await.unwrap().unwrap(); assert_eq!( created_block, CreatedBlock { hash: created_block.hash.clone(), aux: ImportedAux { header_only: false, clear_justification_requests: false, needs_justification: false, bad_justification: false, needs_finality_proof: false, is_new_best: true, } } ); // assert that there's a new block in the db. assert!(backend.blockchain().header(BlockId::Number(1)).unwrap().is_some()) } #[tokio::test] async fn manual_seal_and_finalization() { let builder = TestClientBuilder::new(); let backend = builder.backend(); let client = Arc::new(builder.build()); let select_chain = LongestChain::new(backend.clone()); let inherent_data_providers = InherentDataProviders::new(); let pool = Arc::new(BasicPool::new(Options::default(), api()).0); let env = ProposerFactory { transaction_pool: pool.clone(), client: client.clone(), }; // this test checks that blocks are created as soon as an engine command is sent over the stream. let (mut sink, stream) = futures::channel::mpsc::channel(1024); let future = run_manual_seal( Box::new(client.clone()), env, backend.clone(), pool.pool().clone(), stream, select_chain, inherent_data_providers, ); std::thread::spawn(|| { let mut rt = tokio::runtime::Runtime::new().unwrap(); // spawn the background authorship task rt.block_on(future); }); // submit a transaction to pool. let result = pool.submit_one(&BlockId::Number(0), uxt(Alice, 0)).await; // assert that it was successfully imported assert!(result.is_ok()); let (tx, rx) = futures::channel::oneshot::channel(); sink.send(EngineCommand::SealNewBlock { parent_hash: None, sender: Some(tx), create_empty: false, finalize: false, }).await.unwrap(); let created_block = rx.await.unwrap().unwrap(); // assert that the background task returns ok assert_eq!( created_block, CreatedBlock { hash: created_block.hash.clone(), aux: ImportedAux { header_only: false, clear_justification_requests: false, needs_justification: false, bad_justification: false, needs_finality_proof: false, is_new_best: true, } } ); // assert that there's a new block in the db. let header = backend.blockchain().header(BlockId::Number(1)).unwrap().unwrap(); let (tx, rx) = futures::channel::oneshot::channel(); sink.send(EngineCommand::FinalizeBlock { sender: Some(tx), hash: header.hash(), justification: None }).await.unwrap(); // assert that the background task returns ok assert_eq!(rx.await.unwrap().unwrap(), ()); } #[tokio::test] async fn manual_seal_fork_blocks() { let builder = TestClientBuilder::new(); let backend = builder.backend(); let client = Arc::new(builder.build()); let select_chain = LongestChain::new(backend.clone()); let inherent_data_providers = InherentDataProviders::new(); let pool_api = api(); let pool = Arc::new(BasicPool::new(Options::default(), pool_api.clone()).0); let env = ProposerFactory { transaction_pool: pool.clone(), client: client.clone(), }; // this test checks that blocks are created as soon as an engine command is sent over the stream. let (mut sink, stream) = futures::channel::mpsc::channel(1024); let future = run_manual_seal( Box::new(client.clone()), env, backend.clone(), pool.pool().clone(), stream, select_chain, inherent_data_providers, ); std::thread::spawn(|| { let mut rt = tokio::runtime::Runtime::new().unwrap(); // spawn the background authorship task rt.block_on(future); }); // submit a transaction to pool. let result = pool.submit_one(&BlockId::Number(0), uxt(Alice, 0)).await; // assert that it was successfully imported assert!(result.is_ok()); let (tx, rx) = futures::channel::oneshot::channel(); sink.send(EngineCommand::SealNewBlock { parent_hash: None, sender: Some(tx), create_empty: false, finalize: false, }).await.unwrap(); let created_block = rx.await.unwrap().unwrap(); pool_api.increment_nonce(Alice.into()); // assert that the background task returns ok assert_eq!( created_block, CreatedBlock { hash: created_block.hash.clone(), aux: ImportedAux { header_only: false, clear_justification_requests: false, needs_justification: false, bad_justification: false, needs_finality_proof: false, is_new_best: true } } ); // assert that there's a new block in the db. assert!(backend.blockchain().header(BlockId::Number(0)).unwrap().is_some()); assert!(pool.submit_one(&BlockId::Number(1), uxt(Alice, 1)).await.is_ok()); let (tx1, rx1) = futures::channel::oneshot::channel(); assert!(sink.send(EngineCommand::SealNewBlock { parent_hash: Some(created_block.hash.clone()), sender: Some(tx1), create_empty: false, finalize: false, }).await.is_ok()); assert!(rx1.await.unwrap().is_ok()); assert!(backend.blockchain().header(BlockId::Number(1)).unwrap().is_some()); pool_api.increment_nonce(Alice.into()); assert!(pool.submit_one(&BlockId::Number(2), uxt(Alice, 2)).await.is_ok()); let (tx2, rx2) = futures::channel::oneshot::channel(); assert!(sink.send(EngineCommand::SealNewBlock { parent_hash: Some(created_block.hash), sender: Some(tx2), create_empty: false, finalize: false, }).await.is_ok()); let imported = rx2.await.unwrap().unwrap(); // assert that fork block is in the db assert!(backend.blockchain().header(BlockId::Hash(imported.hash)).unwrap().is_some()) } }