diff --git a/substrate/client/consensus/manual-seal/src/finalize_block.rs b/substrate/client/consensus/manual-seal/src/finalize_block.rs
index b3b60e223805b857c23602ae718a2347c8cb3291..5780a25f97256331eb1d11b2370dd838ec112ac7 100644
--- a/substrate/client/consensus/manual-seal/src/finalize_block.rs
+++ b/substrate/client/consensus/manual-seal/src/finalize_block.rs
@@ -23,35 +23,40 @@ use sp_runtime::{
 	generic::BlockId,
 };
 use std::sync::Arc;
-use sc_client_api::backend::Backend as ClientBackend;
+use sc_client_api::backend::{Backend as ClientBackend, Finalizer};
+use std::marker::PhantomData;
 
 /// params for block finalization.
-pub struct FinalizeBlockParams<B: BlockT, CB> {
+pub struct FinalizeBlockParams<B: BlockT, F, CB> {
 	/// hash of the block
 	pub hash: <B as BlockT>::Hash,
 	/// sender to report errors/success to the rpc.
 	pub sender: rpc::Sender<()>,
 	/// finalization justification
 	pub justification: Option<Justification>,
-	/// client backend
-	pub backend: Arc<CB>,
+	/// Finalizer trait object.
+	pub finalizer: Arc<F>,
+	/// phantom type to pin the Backend type
+	pub _phantom: PhantomData<CB>,
 }
 
+
 /// finalizes a block in the backend with the given params.
-pub async fn finalize_block<B, CB>(params: FinalizeBlockParams<B, CB>)
+pub async fn finalize_block<B, F, CB>(params: FinalizeBlockParams<B, F, CB>)
 	where
 		B: BlockT,
+		F: Finalizer<B, CB>,
 		CB: ClientBackend<B>,
 {
 	let FinalizeBlockParams {
 		hash,
 		mut sender,
 		justification,
-		backend: back_end,
+		finalizer,
 		..
 	} = params;
 
-	match back_end.finalize_block(BlockId::Hash(hash), justification) {
+	match finalizer.finalize_block(BlockId::Hash(hash), justification, true) {
 		Err(e) => {
 			log::warn!("Failed to finalize block {:?}", e);
 			rpc::send_result(&mut sender, Err(e.into()))
diff --git a/substrate/client/consensus/manual-seal/src/lib.rs b/substrate/client/consensus/manual-seal/src/lib.rs
index f3a0ca887fd005ed34ade733ed13d9e8e68898b9..8294ae049f65808c4797afc9e34f2e6b423b6b39 100644
--- a/substrate/client/consensus/manual-seal/src/lib.rs
+++ b/substrate/client/consensus/manual-seal/src/lib.rs
@@ -17,66 +17,32 @@
 //! A manual sealing engine: the engine listens for rpc calls to seal blocks and create forks.
 //! This is suitable for a testing environment.
 
+use futures::prelude::*;
 use sp_consensus::{
-	self, BlockImport, Environment, Proposer, BlockCheckParams,
-	ForkChoiceStrategy, BlockImportParams, BlockOrigin,
-	ImportResult, SelectChain,
-	import_queue::{
-		BasicQueue,
-		CacheKeyId,
-		Verifier,
-		BoxBlockImport,
-	},
+	Environment, Proposer, ForkChoiceStrategy, BlockImportParams, BlockOrigin, SelectChain,
+	import_queue::{BasicQueue, CacheKeyId, Verifier, BoxBlockImport},
 };
+use sp_blockchain::HeaderBackend;
 use sp_inherents::InherentDataProviders;
 use sp_runtime::{traits::Block as BlockT, Justification};
-use sc_client_api::backend::Backend as ClientBackend;
-use futures::prelude::*;
+use sc_client_api::backend::{Backend as ClientBackend, Finalizer};
 use sc_transaction_pool::txpool;
-use std::collections::HashMap;
-use std::sync::Arc;
+use std::{sync::Arc, marker::PhantomData};
 
-pub mod rpc;
 mod error;
 mod finalize_block;
 mod seal_new_block;
-use finalize_block::{finalize_block, FinalizeBlockParams};
-use seal_new_block::{seal_new_block, SealBlockParams};
-pub use error::Error;
-pub use rpc::{EngineCommand, CreatedBlock};
-
-/// The synchronous block-import worker of the engine.
-pub struct ManualSealBlockImport<I> {
-	inner: I,
-}
-
-impl<I> From<I> for ManualSealBlockImport<I> {
-	fn from(i: I) -> Self {
-		ManualSealBlockImport { inner: i }
-	}
-}
-
-impl<B, I> BlockImport<B> for ManualSealBlockImport<I>
-	where
-		B: BlockT,
-		I: BlockImport<B, Transaction = ()>,
-{
-	type Error = I::Error;
-	type Transaction = ();
-
-	fn check_block(&mut self, block: BlockCheckParams<B>) -> Result<ImportResult, Self::Error>
-	{
-		self.inner.check_block(block)
-	}
+pub mod rpc;
 
-	fn import_block(
-		&mut self,
-		block: BlockImportParams<B, Self::Transaction>,
-		cache: HashMap<CacheKeyId, Vec<u8>>,
-	) -> Result<ImportResult, Self::Error> {
-		self.inner.import_block(block, cache)
-	}
-}
+use self::{
+	finalize_block::{finalize_block, FinalizeBlockParams},
+	seal_new_block::{seal_new_block, SealBlockParams},
+};
+pub use self::{
+	error::Error,
+	rpc::{EngineCommand, CreatedBlock},
+};
+use sc_client_api::{TransactionFor, Backend};
 
 /// The verifier for the manual seal engine; instantly finalizes.
 struct ManualSealVerifier;
@@ -92,7 +58,7 @@ impl<B: BlockT> Verifier<B> for ManualSealVerifier {
 		let mut import_params = BlockImportParams::new(origin, header);
 		import_params.justification = justification;
 		import_params.body = body;
-		import_params.finalized = true;
+		import_params.finalized = false;
 		import_params.fork_choice = Some(ForkChoiceStrategy::LongestChain);
 
 		Ok((import_params, None))
@@ -100,37 +66,43 @@ impl<B: BlockT> Verifier<B> for ManualSealVerifier {
 }
 
 /// Instantiate the import queue for the manual seal consensus engine.
-pub fn import_queue<B: BlockT>(block_import: BoxBlockImport<B, ()>) -> BasicQueue<B, ()>
+pub fn import_queue<Block, B>(
+	block_import: BoxBlockImport<Block, TransactionFor<B, Block>>
+) -> BasicQueue<Block, TransactionFor<B, Block>>
+	where
+		Block: BlockT,
+		B: Backend<Block> + 'static,
 {
 	BasicQueue::new(
 		ManualSealVerifier,
-		block_import,
+		Box::new(block_import),
 		None,
 		None,
 	)
 }
 
 /// Creates the background authorship task for the manual seal engine.
-pub async fn run_manual_seal<B, CB, E, A, C, S, T>(
+pub async fn run_manual_seal<B, CB, E, C, A, SC, S, T>(
 	mut block_import: BoxBlockImport<B, T>,
 	mut env: E,
-	backend: Arc<CB>,
+	client: Arc<C>,
 	pool: Arc<txpool::Pool<A>>,
-	mut seal_block_channel: S,
-	select_chain: C,
+	mut commands_stream: S,
+	select_chain: SC,
 	inherent_data_providers: InherentDataProviders,
 )
 	where
+		A: txpool::ChainApi<Block=B, Hash=<B as BlockT>::Hash> + 'static,
 		B: BlockT + 'static,
+		C: HeaderBackend<B> + Finalizer<B, CB> + 'static,
 		CB: ClientBackend<B> + 'static,
 		E: Environment<B> + 'static,
 		E::Error: std::fmt::Display,
 		<E::Proposer as Proposer<B>>::Error: std::fmt::Display,
-		A: txpool::ChainApi<Block=B, Hash=<B as BlockT>::Hash> + 'static,
 		S: Stream<Item=EngineCommand<<B as BlockT>::Hash>> + Unpin + 'static,
-		C: SelectChain<B> + 'static,
+		SC: SelectChain<B> + 'static,
 {
-	while let Some(command) = seal_block_channel.next().await {
+	while let Some(command) = commands_stream.next().await {
 		match command {
 			EngineCommand::SealNewBlock {
 				create_empty,
@@ -149,7 +121,7 @@ pub async fn run_manual_seal<B, CB, E, A, C, S, T>(
 						block_import: &mut block_import,
 						inherent_data_provider: &inherent_data_providers,
 						pool: pool.clone(),
-						backend: backend.clone(),
+						client: client.clone(),
 					}
 				).await;
 			}
@@ -159,7 +131,8 @@ pub async fn run_manual_seal<B, CB, E, A, C, S, T>(
 						hash,
 						sender,
 						justification,
-						backend: backend.clone(),
+						finalizer: client.clone(),
+						_phantom: PhantomData,
 					}
 				).await
 			}
@@ -170,26 +143,28 @@ pub async fn run_manual_seal<B, CB, E, A, C, S, T>(
 /// runs the background authorship task for the instant seal engine.
 /// instant-seal creates a new block for every transaction imported into
 /// the transaction pool.
-pub async fn run_instant_seal<B, CB, E, A, C, T>(
+pub async fn run_instant_seal<B, CB, E, C, A, SC, T>(
 	block_import: BoxBlockImport<B, T>,
 	env: E,
-	backend: Arc<CB>,
+	client: Arc<C>,
 	pool: Arc<txpool::Pool<A>>,
-	select_chain: C,
+	select_chain: SC,
 	inherent_data_providers: InherentDataProviders,
 )
 	where
 		A: txpool::ChainApi<Block=B, Hash=<B as BlockT>::Hash> + 'static,
 		B: BlockT + 'static,
+		C: HeaderBackend<B> + Finalizer<B, CB> + 'static,
 		CB: ClientBackend<B> + 'static,
 		E: Environment<B> + 'static,
 		E::Error: std::fmt::Display,
 		<E::Proposer as Proposer<B>>::Error: std::fmt::Display,
-		C: SelectChain<B> + 'static
+		SC: SelectChain<B> + 'static
 {
 	// instant-seal creates blocks as soon as transactions are imported
 	// into the transaction pool.
-	let seal_block_channel = pool.validated_pool().import_notification_stream()
+	let commands_stream = pool.validated_pool()
+		.import_notification_stream()
 		.map(|_| {
 			EngineCommand::SealNewBlock {
 				create_empty: false,
@@ -202,9 +177,9 @@ pub async fn run_instant_seal<B, CB, E, A, C, T>(
 	run_manual_seal(
 		block_import,
 		env,
-		backend,
+		client,
 		pool,
-		seal_block_channel,
+		commands_stream,
 		select_chain,
 		inherent_data_providers,
 	).await
@@ -226,9 +201,7 @@ mod tests {
 	use substrate_test_runtime_transaction_pool::{TestApi, uxt};
 	use sp_transaction_pool::{TransactionPool, MaintainedTransactionPool, TransactionSource};
 	use sp_runtime::generic::BlockId;
-	use sp_blockchain::HeaderBackend;
 	use sp_consensus::ImportedAux;
-	use sc_client::LongestChain;
 	use sp_inherents::InherentDataProviders;
 	use sc_basic_authorship::ProposerFactory;
 
@@ -241,9 +214,8 @@ mod tests {
 	#[tokio::test]
 	async fn instant_seal() {
 		let builder = TestClientBuilder::new();
-		let backend = builder.backend();
-		let client = Arc::new(builder.build());
-		let select_chain = LongestChain::new(backend.clone());
+		let (client, select_chain) = builder.build_with_longest_chain();
+		let client = Arc::new(client);
 		let inherent_data_providers = InherentDataProviders::new();
 		let pool = Arc::new(BasicPool::new(Options::default(), api()).0);
 		let env = ProposerFactory::new(
@@ -268,7 +240,7 @@ mod tests {
 		let future = run_manual_seal(
 			Box::new(client.clone()),
 			env,
-			backend.clone(),
+			client.clone(),
 			pool.pool().clone(),
 			stream,
 			select_chain,
@@ -300,15 +272,14 @@ mod tests {
 			}
 		);
 		// assert that there's a new block in the db.
-		assert!(backend.blockchain().header(BlockId::Number(1)).unwrap().is_some())
+		assert!(client.header(&BlockId::Number(1)).unwrap().is_some())
 	}
 
 	#[tokio::test]
 	async fn manual_seal_and_finalization() {
 		let builder = TestClientBuilder::new();
-		let backend = builder.backend();
-		let client = Arc::new(builder.build());
-		let select_chain = LongestChain::new(backend.clone());
+		let (client, select_chain) = builder.build_with_longest_chain();
+		let client = Arc::new(client);
 		let inherent_data_providers = InherentDataProviders::new();
 		let pool = Arc::new(BasicPool::new(Options::default(), api()).0);
 		let env = ProposerFactory::new(
@@ -320,7 +291,7 @@ mod tests {
 		let future = run_manual_seal(
 			Box::new(client.clone()),
 			env,
-			backend.clone(),
+			client.clone(),
 			pool.pool().clone(),
 			stream,
 			select_chain,
@@ -360,7 +331,7 @@ mod tests {
 			}
 		);
 		// assert that there's a new block in the db.
-		let header = backend.blockchain().header(BlockId::Number(1)).unwrap().unwrap();
+		let header = client.header(&BlockId::Number(1)).unwrap().unwrap();
 		let (tx, rx) = futures::channel::oneshot::channel();
 		sink.send(EngineCommand::FinalizeBlock {
 			sender: Some(tx),
@@ -374,9 +345,8 @@ mod tests {
 	#[tokio::test]
 	async fn manual_seal_fork_blocks() {
 		let builder = TestClientBuilder::new();
-		let backend = builder.backend();
-		let client = Arc::new(builder.build());
-		let select_chain = LongestChain::new(backend.clone());
+		let (client, select_chain) = builder.build_with_longest_chain();
+		let client = Arc::new(client);
 		let inherent_data_providers = InherentDataProviders::new();
 		let pool_api = api();
 		let pool = Arc::new(BasicPool::new(Options::default(), pool_api.clone()).0);
@@ -389,7 +359,7 @@ mod tests {
 		let future = run_manual_seal(
 			Box::new(client.clone()),
 			env,
-			backend.clone(),
+			client.clone(),
 			pool.pool().clone(),
 			stream,
 			select_chain,
@@ -431,12 +401,12 @@ mod tests {
 			}
 		);
 		// assert that there's a new block in the db.
-		assert!(backend.blockchain().header(BlockId::Number(0)).unwrap().is_some());
+		assert!(client.header(&BlockId::Number(0)).unwrap().is_some());
 		assert!(pool.submit_one(&BlockId::Number(1), SOURCE, uxt(Alice, 1)).await.is_ok());
 
 		pool.maintain(sp_transaction_pool::ChainEvent::NewBlock {
 			id: BlockId::Number(1),
-			header: backend.blockchain().header(BlockId::Number(1)).expect("db error").expect("imported above"),
+			header: client.header(&BlockId::Number(1)).expect("db error").expect("imported above"),
 			is_new_best: true,
 			retracted: vec![],
 		}).await;
@@ -452,7 +422,7 @@ mod tests {
 			rx1.await.expect("should be no error receiving"),
 			Ok(_)
 		);
-		assert!(backend.blockchain().header(BlockId::Number(1)).unwrap().is_some());
+		assert!(client.header(&BlockId::Number(1)).unwrap().is_some());
 		pool_api.increment_nonce(Alice.into());
 
 		assert!(pool.submit_one(&BlockId::Number(2), SOURCE, uxt(Alice, 2)).await.is_ok());
@@ -465,6 +435,6 @@ mod tests {
 		}).await.is_ok());
 		let imported = rx2.await.unwrap().unwrap();
 		// assert that fork block is in the db
-		assert!(backend.blockchain().header(BlockId::Hash(imported.hash)).unwrap().is_some())
+		assert!(client.header(&BlockId::Hash(imported.hash)).unwrap().is_some())
 	}
 }
diff --git a/substrate/client/consensus/manual-seal/src/seal_new_block.rs b/substrate/client/consensus/manual-seal/src/seal_new_block.rs
index 39d73e16ab74fdadde2e3a597fa2ef71615cc9fa..88b58ef4cc2b39cbbe3af1209e54408390e074ef 100644
--- a/substrate/client/consensus/manual-seal/src/seal_new_block.rs
+++ b/substrate/client/consensus/manual-seal/src/seal_new_block.rs
@@ -33,7 +33,6 @@ use sp_consensus::{
 	import_queue::BoxBlockImport,
 };
 use sp_blockchain::HeaderBackend;
-use sc_client_api::backend::Backend as ClientBackend;
 use std::collections::HashMap;
 use std::time::Duration;
 use sp_inherents::InherentDataProviders;
@@ -42,7 +41,7 @@ use sp_inherents::InherentDataProviders;
 const MAX_PROPOSAL_DURATION: u64 = 10;
 
 /// params for sealing a new block
-pub struct SealBlockParams<'a, B: BlockT, C, CB, E, T, P: txpool::ChainApi> {
+pub struct SealBlockParams<'a, B: BlockT, SC, HB, E, T, P: txpool::ChainApi> {
 	/// if true, empty blocks(without extrinsics) will be created.
 	/// otherwise, will return Error::EmptyTransactionPool.
 	pub create_empty: bool,
@@ -54,12 +53,12 @@ pub struct SealBlockParams<'a, B: BlockT, C, CB, E, T, P: txpool::ChainApi> {
 	pub sender: rpc::Sender<CreatedBlock<<B as BlockT>::Hash>>,
 	/// transaction pool
 	pub pool: Arc<txpool::Pool<P>>,
-	/// client backend
-	pub backend: Arc<CB>,
+	/// header backend
+	pub client: Arc<HB>,
 	/// Environment trait object for creating a proposer
 	pub env: &'a mut E,
 	/// SelectChain object
-	pub select_chain: &'a C,
+	pub select_chain: &'a SC,
 	/// block import object
 	pub block_import: &'a mut BoxBlockImport<B, T>,
 	/// inherent data provider
@@ -67,24 +66,24 @@ pub struct SealBlockParams<'a, B: BlockT, C, CB, E, T, P: txpool::ChainApi> {
 }
 
 /// seals a new block with the given params
-pub async fn seal_new_block<B, SC, CB, E, T, P>(
+pub async fn seal_new_block<B, SC, HB, E, T, P>(
 	SealBlockParams {
 		create_empty,
 		finalize,
 		pool,
 		parent_hash,
-		backend: back_end,
+		client,
 		select_chain,
 		block_import,
 		env,
 		inherent_data_provider,
 		mut sender,
 		..
-	}: SealBlockParams<'_, B, SC, CB, E, T, P>
+	}: SealBlockParams<'_, B, SC, HB, E, T, P>
 )
 	where
 		B: BlockT,
-		CB: ClientBackend<B>,
+		HB: HeaderBackend<B>,
 		E: Environment<B>,
 		<E as Environment<B>>::Error: std::fmt::Display,
 		<E::Proposer as Proposer<B>>::Error: std::fmt::Display,
@@ -101,7 +100,7 @@ pub async fn seal_new_block<B, SC, CB, E, T, P>(
 		// or fetch the best_block.
 		let header = match parent_hash {
 			Some(hash) => {
-				match back_end.blockchain().header(BlockId::Hash(hash))? {
+				match client.header(BlockId::Hash(hash))? {
 					Some(header) => header,
 					None => return Err(Error::BlockNotFound(format!("{}", hash))),
 				}