Skip to content
lib.rs 16.4 KiB
Newer Older
Web3 Philosopher's avatar
Web3 Philosopher committed
// This file is part of Substrate.

Bastian Köcher's avatar
Bastian Köcher committed
// Copyright (C) 2020-2021 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

// This program is free software: you can redistribute it and/or modify
Web3 Philosopher's avatar
Web3 Philosopher committed
// it under the terms of the GNU General Public License as published by
Nikolay Volf's avatar
Nikolay Volf committed
// the Free Software Foundation, either version 3 of the License, or
Web3 Philosopher's avatar
Web3 Philosopher committed
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
Web3 Philosopher's avatar
Web3 Philosopher committed
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Web3 Philosopher's avatar
Web3 Philosopher committed
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
Web3 Philosopher's avatar
Web3 Philosopher committed
//! A manual sealing engine: the engine listens for rpc calls to seal blocks and create forks.
//! This is suitable for a testing environment.

use futures::prelude::*;
Web3 Philosopher's avatar
Web3 Philosopher committed
use sp_consensus::{
	Environment, Proposer, SelectChain, BlockImport,
	ForkChoiceStrategy, BlockImportParams, BlockOrigin,
	import_queue::{Verifier, BasicQueue, CacheKeyId, BoxBlockImport},
Web3 Philosopher's avatar
Web3 Philosopher committed
};
use sp_inherents::CreateInherentDataProviders;
use sp_runtime::{traits::Block as BlockT, Justifications, ConsensusEngineId};
use sc_client_api::backend::{Backend as ClientBackend, Finalizer};
use sc_transaction_pool::{ChainApi, Pool};
use prometheus_endpoint::Registry;
Web3 Philosopher's avatar
Web3 Philosopher committed

mod error;
mod finalize_block;
mod seal_block;

pub mod consensus;
	consensus::ConsensusDataProvider,
	finalize_block::{finalize_block, FinalizeBlockParams},
	seal_block::{SealBlockParams, seal_block, MAX_PROPOSAL_DURATION},
use sp_api::{ProvideRuntimeApi, TransactionFor};
/// The `ConsensusEngineId` of Manual Seal.
pub const MANUAL_SEAL_ENGINE_ID: ConsensusEngineId = [b'm', b'a', b'n', b'l'];

Web3 Philosopher's avatar
Web3 Philosopher committed
/// The verifier for the manual seal engine; instantly finalizes.
struct ManualSealVerifier;

#[async_trait::async_trait]
Web3 Philosopher's avatar
Web3 Philosopher committed
impl<B: BlockT> Verifier<B> for ManualSealVerifier {
Web3 Philosopher's avatar
Web3 Philosopher committed
		&mut self,
		origin: BlockOrigin,
		header: B::Header,
		justifications: Option<Justifications>,
Web3 Philosopher's avatar
Web3 Philosopher committed
		body: Option<Vec<B::Extrinsic>>,
	) -> Result<(BlockImportParams<B, ()>, Option<Vec<(CacheKeyId, Vec<u8>)>>), String> {
		let mut import_params = BlockImportParams::new(origin, header);
		import_params.justifications = justifications;
		import_params.body = body;
		import_params.fork_choice = Some(ForkChoiceStrategy::LongestChain);
Web3 Philosopher's avatar
Web3 Philosopher committed

		Ok((import_params, None))
	}
}

/// Instantiate the import queue for the manual seal consensus engine.
pub fn import_queue<Block, Transaction>(
	block_import: BoxBlockImport<Block, Transaction>,
	spawner: &impl sp_core::traits::SpawnEssentialNamed,
	registry: Option<&Registry>,
) -> BasicQueue<Block, Transaction>
		Transaction: Send + Sync + 'static,
Web3 Philosopher's avatar
Web3 Philosopher committed
{
	BasicQueue::new(
		ManualSealVerifier,
Web3 Philosopher's avatar
Web3 Philosopher committed
		None,
/// Params required to start the instant sealing authorship task.
pub struct ManualSealParams<B: BlockT, BI, E, C: ProvideRuntimeApi<B>, A: ChainApi, SC, CS, CIDP> {
	/// Block import instance for well. importing blocks.
	pub block_import: BI,

	/// The environment we are producing blocks for.
	pub env: E,

	/// Client instance
	pub client: Arc<C>,

	/// Shared reference to the transaction pool.

	/// Stream<Item = EngineCommands>, Basically the receiving end of a channel for sending commands to
	/// the authorship task.
	pub commands_stream: CS,

	/// SelectChain strategy.
	pub select_chain: SC,

	/// Digest provider for inclusion in blocks.
	pub consensus_data_provider: Option<Box<dyn ConsensusDataProvider<B, Transaction = TransactionFor<C, B>>>>,

	/// Something that can create the inherent data providers.
	pub create_inherent_data_providers: CIDP,
}

/// Params required to start the manual sealing authorship task.
pub struct InstantSealParams<B: BlockT, BI, E, C: ProvideRuntimeApi<B>, A: ChainApi, SC, CIDP> {
	/// Block import instance for well. importing blocks.
	pub block_import: BI,

	/// The environment we are producing blocks for.
	pub env: E,

	/// Client instance
	pub client: Arc<C>,

	/// Shared reference to the transaction pool.

	/// SelectChain strategy.
	pub select_chain: SC,

	/// Digest provider for inclusion in blocks.
	pub consensus_data_provider: Option<Box<dyn ConsensusDataProvider<B, Transaction = TransactionFor<C, B>>>>,

	/// Something that can create the inherent data providers.
	pub create_inherent_data_providers: CIDP,
Web3 Philosopher's avatar
Web3 Philosopher committed
/// Creates the background authorship task for the manual seal engine.
pub async fn run_manual_seal<B, BI, CB, E, C, A, SC, CS, CIDP>(
	ManualSealParams {
		mut block_import,
		mut env,
		client,
		pool,
		mut commands_stream,
		select_chain,
		consensus_data_provider,
		create_inherent_data_providers,
	}: ManualSealParams<B, BI, E, C, A, SC, CS, CIDP>
Web3 Philosopher's avatar
Web3 Philosopher committed
)
	where
Web3 Philosopher's avatar
Web3 Philosopher committed
		B: BlockT + 'static,
		BI: BlockImport<B, Error = sp_consensus::Error, Transaction = sp_api::TransactionFor<C, B>>
			+ Send + Sync + 'static,
		C: HeaderBackend<B> + Finalizer<B, CB> + ProvideRuntimeApi<B> + 'static,
Web3 Philosopher's avatar
Web3 Philosopher committed
		CB: ClientBackend<B> + 'static,
		E: Environment<B> + 'static,
		E::Proposer: Proposer<B, Transaction = TransactionFor<C, B>>,
		CS: Stream<Item=EngineCommand<<B as BlockT>::Hash>> + Unpin + 'static,
		TransactionFor<C, B>: 'static,
		CIDP: CreateInherentDataProviders<B, ()>,
Web3 Philosopher's avatar
Web3 Philosopher committed
{
	while let Some(command) = commands_stream.next().await {
Web3 Philosopher's avatar
Web3 Philosopher committed
		match command {
			EngineCommand::SealNewBlock {
				create_empty,
				finalize,
				parent_hash,
				sender,
			} => {
Web3 Philosopher's avatar
Web3 Philosopher committed
					SealBlockParams {
						sender,
						parent_hash,
						finalize,
						create_empty,
						env: &mut env,
						select_chain: &select_chain,
						block_import: &mut block_import,
						consensus_data_provider: consensus_data_provider.as_ref().map(|p| &**p),
Web3 Philosopher's avatar
Web3 Philosopher committed
						pool: pool.clone(),
						create_inherent_data_providers: &create_inherent_data_providers,
Web3 Philosopher's avatar
Web3 Philosopher committed
					}
				).await;
			}
			EngineCommand::FinalizeBlock { hash, sender, justification } => {
				let justification = justification.map(|j| (MANUAL_SEAL_ENGINE_ID, j));
Web3 Philosopher's avatar
Web3 Philosopher committed
				finalize_block(
					FinalizeBlockParams {
						hash,
						sender,
						justification,
Web3 Philosopher's avatar
Web3 Philosopher committed
					}
				).await
			}
		}
	}
}

/// runs the background authorship task for the instant seal engine.
/// instant-seal creates a new block for every transaction imported into
/// the transaction pool.
pub async fn run_instant_seal<B, BI, CB, E, C, A, SC, CIDP>(
	InstantSealParams {
		block_import,
		env,
		client,
		pool,
		select_chain,
		consensus_data_provider,
		create_inherent_data_providers,
	}: InstantSealParams<B, BI, E, C, A, SC, CIDP>
Web3 Philosopher's avatar
Web3 Philosopher committed
)
	where
Web3 Philosopher's avatar
Web3 Philosopher committed
		B: BlockT + 'static,
		BI: BlockImport<B, Error = sp_consensus::Error, Transaction = sp_api::TransactionFor<C, B>>
			+ Send + Sync + 'static,
		C: HeaderBackend<B> + Finalizer<B, CB> + ProvideRuntimeApi<B> + 'static,
Web3 Philosopher's avatar
Web3 Philosopher committed
		CB: ClientBackend<B> + 'static,
		E: Environment<B> + 'static,
		E::Proposer: Proposer<B, Transaction = TransactionFor<C, B>>,
		SC: SelectChain<B> + 'static,
		TransactionFor<C, B>: 'static,
		CIDP: CreateInherentDataProviders<B, ()>,
Web3 Philosopher's avatar
Web3 Philosopher committed
{
	// instant-seal creates blocks as soon as transactions are imported
	// into the transaction pool.
	let commands_stream = pool.validated_pool()
		.import_notification_stream()
Web3 Philosopher's avatar
Web3 Philosopher committed
		.map(|_| {
			EngineCommand::SealNewBlock {
				create_empty: false,
				finalize: false,
				parent_hash: None,
				sender: None,
			}
		});

	run_manual_seal(
		ManualSealParams {
			block_import,
			env,
			client,
			pool,
			commands_stream,
			select_chain,
			consensus_data_provider,
			create_inherent_data_providers,
Web3 Philosopher's avatar
Web3 Philosopher committed
	).await
}

#[cfg(test)]
mod tests {
	use super::*;
	use substrate_test_runtime_client::{
		DefaultTestClientBuilderExt,
		TestClientBuilderExt,
		AccountKeyring::*,
		TestClientBuilder,
	};
	use sc_transaction_pool::{BasicPool, RevalidationType, Options};
	use substrate_test_runtime_transaction_pool::{TestApi, uxt};
	use sp_transaction_pool::{TransactionPool, MaintainedTransactionPool, TransactionSource};
Web3 Philosopher's avatar
Web3 Philosopher committed
	use sp_runtime::generic::BlockId;
	use sp_consensus::ImportedAux;
	use sc_basic_authorship::ProposerFactory;
	use sc_client_api::BlockBackend;
	fn api() -> Arc<TestApi> {
		Arc::new(TestApi::empty())
	const SOURCE: TransactionSource = TransactionSource::External;

Web3 Philosopher's avatar
Web3 Philosopher committed
	#[tokio::test]
	async fn instant_seal() {
		let builder = TestClientBuilder::new();
		let (client, select_chain) = builder.build_with_longest_chain();
		let client = Arc::new(client);
		let spawner = sp_core::testing::TaskExecutor::new();
		let pool = Arc::new(BasicPool::with_revalidation_type(
			Options::default(),
			true.into(),
			api(),
			None,
			RevalidationType::Full,
			spawner.clone(),
			0,
			spawner.clone(),
Web3 Philosopher's avatar
Web3 Philosopher committed
		// this test checks that blocks are created as soon as transactions are imported into the pool.
		let (sender, receiver) = futures::channel::oneshot::channel();
		let mut sender = Arc::new(Some(sender));
		let commands_stream = pool.pool().validated_pool().import_notification_stream()
Web3 Philosopher's avatar
Web3 Philosopher committed
			.map(move |_| {
				// we're only going to submit one tx so this fn will only be called once.
				let mut_sender =  Arc::get_mut(&mut sender).unwrap();
				let sender = std::mem::take(mut_sender);
Web3 Philosopher's avatar
Web3 Philosopher committed
				EngineCommand::SealNewBlock {
					create_empty: false,
					finalize: true,
					parent_hash: None,
					sender
				}
			});
		let future = run_manual_seal(
			ManualSealParams {
				block_import: client.clone(),
				env,
				client: client.clone(),
				pool: pool.pool().clone(),
				commands_stream,
				select_chain,
				create_inherent_data_providers: |_, _| async { Ok(()) },
				consensus_data_provider: None,
			}
Web3 Philosopher's avatar
Web3 Philosopher committed
		);
		std::thread::spawn(|| {
			let mut rt = tokio::runtime::Runtime::new().unwrap();
			// spawn the background authorship task
			rt.block_on(future);
		});
		// submit a transaction to pool.
		let result = pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Alice, 0)).await;
Web3 Philosopher's avatar
Web3 Philosopher committed
		// assert that it was successfully imported
		assert!(result.is_ok());
		// assert that the background task returns ok
		let created_block = receiver.await.unwrap().unwrap();
		assert_eq!(
			created_block,
			CreatedBlock {
				hash: created_block.hash.clone(),
				aux: ImportedAux {
					header_only: false,
					clear_justification_requests: false,
					needs_justification: false,
					bad_justification: false,
					is_new_best: true,
				}
			}
		);
		// assert that there's a new block in the db.
		assert!(client.header(&BlockId::Number(1)).unwrap().is_some())
Web3 Philosopher's avatar
Web3 Philosopher committed
	}

	#[tokio::test]
	async fn manual_seal_and_finalization() {
		let builder = TestClientBuilder::new();
		let (client, select_chain) = builder.build_with_longest_chain();
		let client = Arc::new(client);
		let spawner = sp_core::testing::TaskExecutor::new();
		let pool = Arc::new(BasicPool::with_revalidation_type(
			Options::default(),
			true.into(),
			api(),
			None,
			RevalidationType::Full,
			spawner.clone(),
			spawner.clone(),
Web3 Philosopher's avatar
Web3 Philosopher committed
		// this test checks that blocks are created as soon as an engine command is sent over the stream.
		let (mut sink, commands_stream) = futures::channel::mpsc::channel(1024);
Web3 Philosopher's avatar
Web3 Philosopher committed
		let future = run_manual_seal(
			ManualSealParams {
				block_import: client.clone(),
				env,
				client: client.clone(),
				pool: pool.pool().clone(),
				commands_stream,
				select_chain,
				consensus_data_provider: None,
				create_inherent_data_providers: |_, _| async { Ok(()) },
Web3 Philosopher's avatar
Web3 Philosopher committed
		);
		std::thread::spawn(|| {
			let mut rt = tokio::runtime::Runtime::new().unwrap();
			// spawn the background authorship task
			rt.block_on(future);
		});
		// submit a transaction to pool.
		let result = pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Alice, 0)).await;
Web3 Philosopher's avatar
Web3 Philosopher committed
		// assert that it was successfully imported
		assert!(result.is_ok());
		let (tx, rx) = futures::channel::oneshot::channel();
		sink.send(EngineCommand::SealNewBlock {
			parent_hash: None,
			sender: Some(tx),
			create_empty: false,
			finalize: false,
		}).await.unwrap();
		let created_block = rx.await.unwrap().unwrap();

		// assert that the background task returns ok
		assert_eq!(
			created_block,
			CreatedBlock {
				hash: created_block.hash.clone(),
				aux: ImportedAux {
					header_only: false,
					clear_justification_requests: false,
					needs_justification: false,
					bad_justification: false,
					is_new_best: true,
				}
			}
		);
		// assert that there's a new block in the db.
		let header = client.header(&BlockId::Number(1)).unwrap().unwrap();
Web3 Philosopher's avatar
Web3 Philosopher committed
		let (tx, rx) = futures::channel::oneshot::channel();
		sink.send(EngineCommand::FinalizeBlock {
			sender: Some(tx),
			hash: header.hash(),
			justification: None
		}).await.unwrap();
		// assert that the background task returns ok
		assert_eq!(rx.await.unwrap().unwrap(), ());
	}

	#[tokio::test]
	async fn manual_seal_fork_blocks() {
		let builder = TestClientBuilder::new();
		let (client, select_chain) = builder.build_with_longest_chain();
		let client = Arc::new(client);
		let pool_api = api();
		let spawner = sp_core::testing::TaskExecutor::new();
		let pool = Arc::new(BasicPool::with_revalidation_type(
			Options::default(),
			true.into(),
			pool_api.clone(),
			None,
			RevalidationType::Full,
			spawner.clone(),
			spawner.clone(),
Web3 Philosopher's avatar
Web3 Philosopher committed
		// this test checks that blocks are created as soon as an engine command is sent over the stream.
		let (mut sink, commands_stream) = futures::channel::mpsc::channel(1024);
Web3 Philosopher's avatar
Web3 Philosopher committed
		let future = run_manual_seal(
			ManualSealParams {
				block_import: client.clone(),
				env,
				client: client.clone(),
				pool: pool.pool().clone(),
				commands_stream,
				select_chain,
				consensus_data_provider: None,
				create_inherent_data_providers: |_, _| async { Ok(()) },
Web3 Philosopher's avatar
Web3 Philosopher committed
		);
		std::thread::spawn(|| {
			let mut rt = tokio::runtime::Runtime::new().unwrap();
			// spawn the background authorship task
			rt.block_on(future);
		});
		// submit a transaction to pool.
		let result = pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Alice, 0)).await;
Web3 Philosopher's avatar
Web3 Philosopher committed
		// assert that it was successfully imported
		assert!(result.is_ok());

		let (tx, rx) = futures::channel::oneshot::channel();
		sink.send(EngineCommand::SealNewBlock {
			parent_hash: None,
			sender: Some(tx),
			create_empty: false,
			finalize: false,
		}).await.unwrap();
		let created_block = rx.await.unwrap().unwrap();
		pool_api.increment_nonce(Alice.into());
Web3 Philosopher's avatar
Web3 Philosopher committed

		// assert that the background task returns ok
		assert_eq!(
			created_block,
			CreatedBlock {
				hash: created_block.hash.clone(),
				aux: ImportedAux {
					header_only: false,
					clear_justification_requests: false,
					needs_justification: false,
					bad_justification: false,
					is_new_best: true
				}
			}
		);
		let block = client.block(&BlockId::Number(1)).unwrap().unwrap().block;
		pool_api.add_block(block, true);
		assert!(pool.submit_one(&BlockId::Number(1), SOURCE, uxt(Alice, 1)).await.is_ok());
		let header = client.header(&BlockId::Number(1)).expect("db error").expect("imported above");
		pool.maintain(sp_transaction_pool::ChainEvent::NewBestBlock {
			hash: header.hash(),
Web3 Philosopher's avatar
Web3 Philosopher committed
		let (tx1, rx1) = futures::channel::oneshot::channel();
		assert!(sink.send(EngineCommand::SealNewBlock {
			parent_hash: Some(created_block.hash),
Web3 Philosopher's avatar
Web3 Philosopher committed
			sender: Some(tx1),
			create_empty: false,
			finalize: false,
		}).await.is_ok());
		assert_matches::assert_matches!(
			rx1.await.expect("should be no error receiving"),
			Ok(_)
		);
		let block = client.block(&BlockId::Number(2)).unwrap().unwrap().block;
		pool_api.add_block(block, true);
		pool_api.increment_nonce(Alice.into());
		assert!(pool.submit_one(&BlockId::Number(1), SOURCE, uxt(Bob, 0)).await.is_ok());
Web3 Philosopher's avatar
Web3 Philosopher committed
		let (tx2, rx2) = futures::channel::oneshot::channel();
		assert!(sink.send(EngineCommand::SealNewBlock {
			parent_hash: Some(created_block.hash),
			sender: Some(tx2),
			create_empty: false,
			finalize: false,
		}).await.is_ok());
		let imported = rx2.await.unwrap().unwrap();
		// assert that fork block is in the db
		assert!(client.header(&BlockId::Hash(imported.hash)).unwrap().is_some())