From 14e95f3398497c48d91663cee4691682460f2373 Mon Sep 17 00:00:00 2001
From: Nikolay Volf <nikvolf@gmail.com>
Date: Fri, 24 Jan 2020 04:21:24 -0800
Subject: [PATCH] Refactor tx-pool maintenance and other high-level api (#4629)

* Reduction.

* Reformation.

* add locked timer stuff

* fix issues and introduce full pool

* arrange together

* fix benches

* fix new_light

* Add revalidation test case

* review fixes

* review fixes

* use just ready future

* address review
---
 substrate/bin/node-template/src/service.rs    |  13 +-
 substrate/bin/node/cli/src/service.rs         |  24 +-
 substrate/client/service/src/builder.rs       |   6 +-
 substrate/client/service/src/lib.rs           |   8 +-
 .../transaction-pool/graph/benches/basics.rs  |  17 +-
 .../client/transaction-pool/graph/src/pool.rs |  22 +-
 .../graph/src/validated_pool.rs               |   4 +-
 substrate/client/transaction-pool/src/api.rs  |  42 +-
 substrate/client/transaction-pool/src/lib.rs  | 231 ++++++-
 .../client/transaction-pool/src/maintainer.rs | 645 ------------------
 .../client/transaction-pool/src/tests.rs      | 123 +++-
 .../primitives/transaction-pool/src/pool.rs   | 115 +---
 12 files changed, 423 insertions(+), 827 deletions(-)
 delete mode 100644 substrate/client/transaction-pool/src/maintainer.rs

diff --git a/substrate/bin/node-template/src/service.rs b/substrate/bin/node-template/src/service.rs
index ed2299e30f7..458656d836d 100644
--- a/substrate/bin/node-template/src/service.rs
+++ b/substrate/bin/node-template/src/service.rs
@@ -43,9 +43,7 @@ macro_rules! new_full_start {
 			.with_transaction_pool(|config, client, _fetcher| {
 				let pool_api = sc_transaction_pool::FullChainApi::new(client.clone());
 				let pool = sc_transaction_pool::BasicPool::new(config, pool_api);
-				let maintainer = sc_transaction_pool::FullBasicPoolMaintainer::new(pool.pool().clone(), client);
-				let maintainable_pool = sp_transaction_pool::MaintainableTransactionPool::new(pool, maintainer);
-				Ok(maintainable_pool)
+				Ok(pool)
 			})?
 			.with_import_queue(|_config, client, mut select_chain, transaction_pool| {
 				let select_chain = select_chain.take()
@@ -207,11 +205,12 @@ pub fn new_light<C: Send + Default + 'static>(config: Configuration<C, GenesisCo
 		.with_transaction_pool(|config, client, fetcher| {
 			let fetcher = fetcher
 				.ok_or_else(|| "Trying to start light transaction pool without active fetcher")?;
+
 			let pool_api = sc_transaction_pool::LightChainApi::new(client.clone(), fetcher.clone());
-			let pool = sc_transaction_pool::BasicPool::new(config, pool_api);
-			let maintainer = sc_transaction_pool::LightBasicPoolMaintainer::with_defaults(pool.pool().clone(), client, fetcher);
-			let maintainable_pool = sp_transaction_pool::MaintainableTransactionPool::new(pool, maintainer);
-			Ok(maintainable_pool)
+			let pool = sc_transaction_pool::BasicPool::with_revalidation_type(
+				config, pool_api, sc_transaction_pool::RevalidationType::Light,
+			);
+			Ok(pool)
 		})?
 		.with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool| {
 			let fetch_checker = fetcher
diff --git a/substrate/bin/node/cli/src/service.rs b/substrate/bin/node/cli/src/service.rs
index 603df5b7d96..75cb6ac4c48 100644
--- a/substrate/bin/node/cli/src/service.rs
+++ b/substrate/bin/node/cli/src/service.rs
@@ -64,9 +64,7 @@ macro_rules! new_full_start {
 			.with_transaction_pool(|config, client, _fetcher| {
 				let pool_api = sc_transaction_pool::FullChainApi::new(client.clone());
 				let pool = sc_transaction_pool::BasicPool::new(config, pool_api);
-				let maintainer = sc_transaction_pool::FullBasicPoolMaintainer::new(pool.pool().clone(), client);
-				let maintainable_pool = sp_transaction_pool::MaintainableTransactionPool::new(pool, maintainer);
-				Ok(maintainable_pool)
+				Ok(pool)
 			})?
 			.with_import_queue(|_config, client, mut select_chain, _transaction_pool| {
 				let select_chain = select_chain.take()
@@ -272,15 +270,9 @@ type ConcreteClient =
 #[allow(dead_code)]
 type ConcreteBackend = Backend<ConcreteBlock>;
 #[allow(dead_code)]
-type ConcreteTransactionPool = sp_transaction_pool::MaintainableTransactionPool<
-	sc_transaction_pool::BasicPool<
-		sc_transaction_pool::FullChainApi<ConcreteClient, ConcreteBlock>,
-		ConcreteBlock
-	>,
-	sc_transaction_pool::FullBasicPoolMaintainer<
-		ConcreteClient,
-		sc_transaction_pool::FullChainApi<ConcreteClient, Block>
-	>
+type ConcreteTransactionPool = sc_transaction_pool::BasicPool<
+	sc_transaction_pool::FullChainApi<ConcreteClient, ConcreteBlock>,
+	ConcreteBlock
 >;
 
 /// A specialized configuration object for setting up the node..
@@ -322,10 +314,10 @@ pub fn new_light<C: Send + Default + 'static>(config: NodeConfiguration<C>)
 			let fetcher = fetcher
 				.ok_or_else(|| "Trying to start light transaction pool without active fetcher")?;
 			let pool_api = sc_transaction_pool::LightChainApi::new(client.clone(), fetcher.clone());
-			let pool = sc_transaction_pool::BasicPool::new(config, pool_api);
-			let maintainer = sc_transaction_pool::LightBasicPoolMaintainer::with_defaults(pool.pool().clone(), client, fetcher);
-			let maintainable_pool = sp_transaction_pool::MaintainableTransactionPool::new(pool, maintainer);
-			Ok(maintainable_pool)
+			let pool = sc_transaction_pool::BasicPool::with_revalidation_type(
+				config, pool_api, sc_transaction_pool::RevalidationType::Light,
+			);
+			Ok(pool)
 		})?
 		.with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool| {
 			let fetch_checker = fetcher
diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs
index 044798701c6..194bd09e24b 100644
--- a/substrate/client/service/src/builder.rs
+++ b/substrate/client/service/src/builder.rs
@@ -49,7 +49,7 @@ use std::{
 };
 use sysinfo::{get_current_pid, ProcessExt, System, SystemExt};
 use sc_telemetry::{telemetry, SUBSTRATE_INFO};
-use sp_transaction_pool::{TransactionPool, TransactionPoolMaintainer};
+use sp_transaction_pool::MaintainedTransactionPool;
 use sp_blockchain;
 use grafana_data_source::{self, record_metrics};
 
@@ -740,9 +740,7 @@ ServiceBuilder<
 	TSc: Clone,
 	TImpQu: 'static + ImportQueue<TBl>,
 	TNetP: NetworkSpecialization<TBl>,
-	TExPool: 'static
-		+ TransactionPool<Block=TBl, Hash = <TBl as BlockT>::Hash>
-		+ TransactionPoolMaintainer<Block=TBl, Hash = <TBl as BlockT>::Hash>,
+	TExPool: MaintainedTransactionPool<Block=TBl, Hash = <TBl as BlockT>::Hash> + 'static,
 	TRpc: sc_rpc::RpcExtension<sc_rpc::Metadata> + Clone,
 {
 
diff --git a/substrate/client/service/src/lib.rs b/substrate/client/service/src/lib.rs
index c1b87e44919..1b2e7bcd3cc 100644
--- a/substrate/client/service/src/lib.rs
+++ b/substrate/client/service/src/lib.rs
@@ -61,7 +61,7 @@ pub use self::builder::{
 };
 pub use config::{Configuration, Roles, PruningMode};
 pub use sc_chain_spec::{ChainSpec, Properties, RuntimeGenesis, Extension as ChainSpecExtension};
-pub use sp_transaction_pool::{TransactionPool, TransactionPoolMaintainer, InPoolTransaction, error::IntoPoolError};
+pub use sp_transaction_pool::{TransactionPool, InPoolTransaction, error::IntoPoolError};
 pub use sc_transaction_pool::txpool::Options as TransactionPoolOptions;
 pub use sc_client::FinalityNotifications;
 pub use sc_rpc::Metadata as RpcMetadata;
@@ -148,8 +148,7 @@ pub trait AbstractService: 'static + Future<Output = Result<(), Error>> +
 	/// Chain selection algorithm.
 	type SelectChain: sp_consensus::SelectChain<Self::Block>;
 	/// Transaction pool.
-	type TransactionPool: TransactionPool<Block = Self::Block>
-		+ TransactionPoolMaintainer<Block = Self::Block>;
+	type TransactionPool: TransactionPool<Block = Self::Block>;
 	/// Network specialization.
 	type NetworkSpecialization: NetworkSpecialization<Self::Block>;
 
@@ -213,8 +212,7 @@ where
 	TExec: 'static + sc_client::CallExecutor<TBl> + Send + Sync + Clone,
 	TRtApi: 'static + Send + Sync,
 	TSc: sp_consensus::SelectChain<TBl> + 'static + Clone + Send + Unpin,
-	TExPool: 'static + TransactionPool<Block = TBl>
-		+ TransactionPoolMaintainer<Block = TBl>,
+	TExPool: 'static + TransactionPool<Block = TBl>,
 	TOc: 'static + Send + Sync,
 	TNetSpec: NetworkSpecialization<TBl>,
 {
diff --git a/substrate/client/transaction-pool/graph/benches/basics.rs b/substrate/client/transaction-pool/graph/benches/basics.rs
index 557a2ca3d1f..75d15cc1f19 100644
--- a/substrate/client/transaction-pool/graph/benches/basics.rs
+++ b/substrate/client/transaction-pool/graph/benches/basics.rs
@@ -16,7 +16,7 @@
 
 use criterion::{criterion_group, criterion_main, Criterion};
 
-use futures::executor::block_on;
+use futures::{future::{ready, Ready}, executor::block_on};
 use sc_transaction_graph::*;
 use sp_runtime::transaction_validity::{ValidTransaction, InvalidTransaction};
 use codec::Encode;
@@ -49,7 +49,8 @@ impl ChainApi for TestApi {
 	type Block = Block;
 	type Hash = H256;
 	type Error = sp_transaction_pool::error::Error;
-	type ValidationFuture = futures::future::Ready<sp_transaction_pool::error::Result<TransactionValidity>>;
+	type ValidationFuture = Ready<sp_transaction_pool::error::Result<TransactionValidity>>;
+	type BodyFuture = Ready<sp_transaction_pool::error::Result<Option<Vec<Extrinsic>>>>;
 
 	fn validate_transaction(
 		&self,
@@ -61,14 +62,14 @@ impl ChainApi for TestApi {
 
 		match self.block_id_to_number(at) {
 			Ok(Some(num)) if num > 5 => {
-				return futures::future::ready(
+				return ready(
 					Ok(Err(InvalidTransaction::Stale.into()))
 				)
 			},
 			_ => {},
 		}
 
-		futures::future::ready(
+		ready(
 			Ok(Ok(ValidTransaction {
 				priority: 4,
 				requires: if nonce > 1 && self.nonce_dependant {
@@ -105,6 +106,10 @@ impl ChainApi for TestApi {
 		let encoded = uxt.encode();
 		(blake2_256(&encoded).into(), encoded.len())
 	}
+
+	fn block_body(&self, _id: &BlockId<Self::Block>) -> Self::BodyFuture {
+		ready(Ok(None))
+	}
 }
 
 fn uxt(transfer: Transfer) -> Extrinsic {
@@ -150,13 +155,13 @@ fn benchmark_main(c: &mut Criterion) {
 
     c.bench_function("sequential 50 tx", |b| {
 		b.iter(|| {
-			bench_configured(Pool::new(Default::default(), TestApi::new_dependant()), 50);
+			bench_configured(Pool::new(Default::default(), TestApi::new_dependant().into()), 50);
 		});
 	});
 
 	c.bench_function("random 100 tx", |b| {
 		b.iter(|| {
-			bench_configured(Pool::new(Default::default(), TestApi::default()), 100);
+			bench_configured(Pool::new(Default::default(), TestApi::default().into()), 100);
 		});
 	});
 }
diff --git a/substrate/client/transaction-pool/graph/src/pool.rs b/substrate/client/transaction-pool/graph/src/pool.rs
index 629bd0a9a93..5be879f079a 100644
--- a/substrate/client/transaction-pool/graph/src/pool.rs
+++ b/substrate/client/transaction-pool/graph/src/pool.rs
@@ -68,6 +68,8 @@ pub trait ChainApi: Send + Sync {
 	type Error: From<error::Error> + error::IntoPoolError;
 	/// Validate transaction future.
 	type ValidationFuture: Future<Output=Result<TransactionValidity, Self::Error>> + Send + Unpin;
+	/// Body future (since block body might be remote)
+	type BodyFuture: Future<Output = Result<Option<Vec<<Self::Block as traits::Block>::Extrinsic>>, Self::Error>> + Unpin + Send + 'static;
 
 	/// Verify extrinsic at given block.
 	fn validate_transaction(
@@ -84,6 +86,9 @@ pub trait ChainApi: Send + Sync {
 
 	/// Returns hash and encoding length of the extrinsic.
 	fn hash_and_length(&self, uxt: &ExtrinsicFor<Self>) -> (Self::Hash, usize);
+
+	/// Returns a block body given the block id.
+	fn block_body(&self, at: &BlockId<Self::Block>) -> Self::BodyFuture;
 }
 
 /// Pool configuration options.
@@ -120,7 +125,7 @@ pub struct Pool<B: ChainApi> {
 
 impl<B: ChainApi> Pool<B> {
 	/// Create a new transaction pool.
-	pub fn new(options: Options, api: B) -> Self {
+	pub fn new(options: Options, api: Arc<B>) -> Self {
 		Pool {
 			validated_pool: Arc::new(ValidatedPool::new(options, api)),
 		}
@@ -488,6 +493,7 @@ mod tests {
 		type Hash = u64;
 		type Error = error::Error;
 		type ValidationFuture = futures::future::Ready<error::Result<TransactionValidity>>;
+		type BodyFuture = futures::future::Ready<error::Result<Option<Vec<Extrinsic>>>>;
 
 		/// Verify extrinsic at given block.
 		fn validate_transaction(
@@ -560,6 +566,10 @@ mod tests {
 				len
 			)
 		}
+
+		fn block_body(&self, _id: &BlockId<Self::Block>) -> Self::BodyFuture {
+			futures::future::ready(Ok(None))
+		}
 	}
 
 	fn uxt(transfer: Transfer) -> Extrinsic {
@@ -567,7 +577,7 @@ mod tests {
 	}
 
 	fn pool() -> Pool<TestApi> {
-		Pool::new(Default::default(), TestApi::default())
+		Pool::new(Default::default(), TestApi::default().into())
 	}
 
 	#[test]
@@ -713,7 +723,7 @@ mod tests {
 			ready: limit.clone(),
 			future: limit.clone(),
 			..Default::default()
-		}, TestApi::default());
+		}, TestApi::default().into());
 
 		let hash1 = block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer {
 			from: AccountId::from_h256(H256::from_low_u64_be(1)),
@@ -748,7 +758,7 @@ mod tests {
 			ready: limit.clone(),
 			future: limit.clone(),
 			..Default::default()
-		}, TestApi::default());
+		}, TestApi::default().into());
 
 		// when
 		block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer {
@@ -924,7 +934,7 @@ mod tests {
 				ready: limit.clone(),
 				future: limit.clone(),
 				..Default::default()
-			}, TestApi::default());
+			}, TestApi::default().into());
 
 			let xt = uxt(Transfer {
 				from: AccountId::from_h256(H256::from_low_u64_be(1)),
@@ -958,7 +968,7 @@ mod tests {
 			let (tx, rx) = std::sync::mpsc::sync_channel(1);
 			let mut api = TestApi::default();
 			api.delay = Arc::new(Mutex::new(rx.into()));
-			let pool = Arc::new(Pool::new(Default::default(), api));
+			let pool = Arc::new(Pool::new(Default::default(), api.into()));
 
 			// when
 			let xt = uxt(Transfer {
diff --git a/substrate/client/transaction-pool/graph/src/validated_pool.rs b/substrate/client/transaction-pool/graph/src/validated_pool.rs
index 29f82fb894a..34f34d58068 100644
--- a/substrate/client/transaction-pool/graph/src/validated_pool.rs
+++ b/substrate/client/transaction-pool/graph/src/validated_pool.rs
@@ -63,7 +63,7 @@ pub type ValidatedTransactionFor<B> = ValidatedTransaction<
 
 /// Pool that deals with validated transactions.
 pub(crate) struct ValidatedPool<B: ChainApi> {
-	api: B,
+	api: Arc<B>,
 	options: Options,
 	listener: RwLock<Listener<ExHash<B>, BlockHash<B>>>,
 	pool: RwLock<base::BasePool<
@@ -76,7 +76,7 @@ pub(crate) struct ValidatedPool<B: ChainApi> {
 
 impl<B: ChainApi> ValidatedPool<B> {
 	/// Create a new transaction pool.
-	pub fn new(options: Options, api: B) -> Self {
+	pub fn new(options: Options, api: Arc<B>) -> Self {
 		let base_pool = base::BasePool::new(options.reject_future_transactions);
 		ValidatedPool {
 			api,
diff --git a/substrate/client/transaction-pool/src/api.rs b/substrate/client/transaction-pool/src/api.rs
index 8495b8b65f1..1bf63482148 100644
--- a/substrate/client/transaction-pool/src/api.rs
+++ b/substrate/client/transaction-pool/src/api.rs
@@ -19,12 +19,13 @@
 use std::{marker::PhantomData, pin::Pin, sync::Arc};
 use codec::{Decode, Encode};
 use futures::{
-	channel::oneshot, executor::{ThreadPool, ThreadPoolBuilder}, future::{Future, FutureExt, ready},
+	channel::oneshot, executor::{ThreadPool, ThreadPoolBuilder}, future::{Future, FutureExt, ready, Ready},
 };
 
 use sc_client_api::{
 	blockchain::HeaderBackend,
-	light::{Fetcher, RemoteCallRequest}
+	light::{Fetcher, RemoteCallRequest, RemoteBodyRequest},
+	BlockBody,
 };
 use sp_core::Hasher;
 use sp_runtime::{
@@ -63,7 +64,7 @@ impl<Client, Block> FullChainApi<Client, Block> where
 
 impl<Client, Block> sc_transaction_graph::ChainApi for FullChainApi<Client, Block> where
 	Block: BlockT,
-	Client: ProvideRuntimeApi<Block> + BlockIdTo<Block> + 'static + Send + Sync,
+	Client: ProvideRuntimeApi<Block> + BlockBody<Block> + BlockIdTo<Block> + 'static + Send + Sync,
 	Client::Api: TaggedTransactionQueue<Block>,
 	sp_api::ApiErrorFor<Client, Block>: Send,
 {
@@ -71,6 +72,11 @@ impl<Client, Block> sc_transaction_graph::ChainApi for FullChainApi<Client, Bloc
 	type Hash = Block::Hash;
 	type Error = error::Error;
 	type ValidationFuture = Pin<Box<dyn Future<Output = error::Result<TransactionValidity>> + Send>>;
+	type BodyFuture = Ready<error::Result<Option<Vec<<Self::Block as BlockT>::Extrinsic>>>>;
+
+	fn block_body(&self, id: &BlockId<Self::Block>) -> Self::BodyFuture {
+		ready(self.client.block_body(&id).map_err(|e| error::Error::from(e)))
+	}
 
 	fn validate_transaction(
 		&self,
@@ -149,6 +155,7 @@ impl<Client, F, Block> sc_transaction_graph::ChainApi for LightChainApi<Client,
 	type Hash = Block::Hash;
 	type Error = error::Error;
 	type ValidationFuture = Box<dyn Future<Output = error::Result<TransactionValidity>> + Send + Unpin>;
+	type BodyFuture = Pin<Box<dyn Future<Output = error::Result<Option<Vec<<Self::Block as BlockT>::Extrinsic>>>> + Send>>;
 
 	fn validate_transaction(
 		&self,
@@ -197,4 +204,33 @@ impl<Client, F, Block> sc_transaction_graph::ChainApi for LightChainApi<Client,
 			(<<Block::Header as HeaderT>::Hashing as HashT>::hash(x), x.len())
 		})
 	}
+
+	fn block_body(&self, id: &BlockId<Self::Block>) -> Self::BodyFuture {
+		let header = self.client.header(*id)
+			.and_then(|h| h.ok_or(sp_blockchain::Error::UnknownBlock(format!("{}", id))));
+		let header = match header {
+			Ok(header) => header,
+			Err(err) => {
+				log::warn!(target: "txpool", "Failed to query header: {:?}", err);
+				return Box::pin(ready(Ok(None)));
+			}
+		};
+
+		let fetcher = self.fetcher.clone();
+		async move {
+			let transactions = fetcher.remote_body({
+					RemoteBodyRequest {
+						header,
+						retry_count: None,
+					}
+				})
+				.await
+				.unwrap_or_else(|e| {
+					log::warn!(target: "txpool", "Failed to fetch block body: {:?}", e);
+					Vec::new()
+				});
+
+			Ok(Some(transactions))
+		}.boxed()
+	}
 }
diff --git a/substrate/client/transaction-pool/src/lib.rs b/substrate/client/transaction-pool/src/lib.rs
index 4d71307c0ab..f6f7774935b 100644
--- a/substrate/client/transaction-pool/src/lib.rs
+++ b/substrate/client/transaction-pool/src/lib.rs
@@ -20,26 +20,26 @@
 #![warn(unused_extern_crates)]
 
 mod api;
-mod maintainer;
-
 pub mod error;
+
 #[cfg(test)]
 mod tests;
 
 pub use sc_transaction_graph as txpool;
 pub use crate::api::{FullChainApi, LightChainApi};
-pub use crate::maintainer::{FullBasicPoolMaintainer, LightBasicPoolMaintainer};
 
-use std::{collections::HashMap, sync::Arc};
-use futures::{Future, FutureExt};
+use std::{collections::HashMap, sync::Arc, pin::Pin, time::Instant};
+use futures::{Future, FutureExt, future::ready};
+use parking_lot::Mutex;
 
 use sp_runtime::{
 	generic::BlockId,
-	traits::Block as BlockT,
+	traits::{Block as BlockT, NumberFor, SimpleArithmetic, Extrinsic},
 };
 use sp_transaction_pool::{
 	TransactionPool, PoolStatus, ImportNotificationStream,
-	TxHash, TransactionFor, TransactionStatusStreamFor,
+	TxHash, TransactionFor, TransactionStatusStreamFor, BlockHash,
+	MaintainedTransactionPool,
 };
 
 /// Basic implementation of transaction pool that can be customized by providing PoolApi.
@@ -49,6 +49,25 @@ pub struct BasicPool<PoolApi, Block>
 		PoolApi: sc_transaction_graph::ChainApi<Block=Block, Hash=Block::Hash>,
 {
 	pool: Arc<sc_transaction_graph::Pool<PoolApi>>,
+	api: Arc<PoolApi>,
+	revalidation_strategy: Arc<Mutex<RevalidationStrategy<NumberFor<Block>>>>,
+}
+
+/// Type of revalidation.
+pub enum RevalidationType {
+	/// Light revalidation type.
+	///
+	/// During maintenance, transaction pool makes periodic revalidation
+	/// of all transactions depending on number of blocks or time passed.
+	/// Also this kind of revalidation does not resubmit transactions from
+	/// retracted blocks, since it is too expensive.
+	Light,
+
+	/// Full revalidation type.
+	///
+	/// During maintenance, transaction pool revalidates some fixed amount of
+	/// transactions from the pool of valid transactions.
+	Full,
 }
 
 impl<PoolApi, Block> BasicPool<PoolApi, Block>
@@ -57,16 +76,44 @@ impl<PoolApi, Block> BasicPool<PoolApi, Block>
 		PoolApi: sc_transaction_graph::ChainApi<Block=Block, Hash=Block::Hash>,
 {
 	/// Create new basic transaction pool with provided api.
-	pub fn new(options: sc_transaction_graph::Options, pool_api: PoolApi) -> Self {
+	pub fn new(
+		options: sc_transaction_graph::Options,
+		pool_api: PoolApi,
+	) -> Self {
+		Self::with_revalidation_type(options, pool_api, RevalidationType::Full)
+	}
+
+	/// Create new basic transaction pool with provided api and custom
+	/// revalidation type.
+	pub fn with_revalidation_type(
+		options: sc_transaction_graph::Options,
+		pool_api: PoolApi,
+		revalidation_type: RevalidationType,
+	) -> Self {
+		let api = Arc::new(pool_api);
+		let cloned_api = api.clone();
 		BasicPool {
-			pool: Arc::new(sc_transaction_graph::Pool::new(options, pool_api)),
+			api: cloned_api,
+			pool: Arc::new(sc_transaction_graph::Pool::new(options, api)),
+			revalidation_strategy: Arc::new(Mutex::new(
+				match revalidation_type {
+					RevalidationType::Light => RevalidationStrategy::Light(RevalidationStatus::NotScheduled),
+					RevalidationType::Full => RevalidationStrategy::Always,
+				}
+			)),
 		}
+
 	}
 
 	/// Gets shared reference to the underlying pool.
 	pub fn pool(&self) -> &Arc<sc_transaction_graph::Pool<PoolApi>> {
 		&self.pool
 	}
+
+	#[cfg(test)]
+	pub fn api(&self) -> &Arc<PoolApi> {
+		&self.api
+	}
 }
 
 impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
@@ -130,3 +177,169 @@ impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
 		self.pool.on_broadcasted(propagations)
 	}
 }
+
+#[cfg_attr(test, derive(Debug))]
+enum RevalidationStatus<N> {
+	/// The revalidation has never been completed.
+	NotScheduled,
+	/// The revalidation is scheduled.
+	Scheduled(Option<std::time::Instant>, Option<N>),
+	/// The revalidation is in progress.
+	InProgress,
+}
+
+enum RevalidationStrategy<N> {
+	Always,
+	Light(RevalidationStatus<N>)
+}
+
+struct RevalidationAction {
+	revalidate: bool,
+	resubmit: bool,
+	revalidate_amount: Option<usize>,
+}
+
+impl<N: Clone + Copy + SimpleArithmetic> RevalidationStrategy<N> {
+	pub fn clear(&mut self) {
+		if let Self::Light(status) = self {
+			status.clear()
+		}
+	}
+
+	pub fn next(
+		&mut self,
+		block: N,
+		revalidate_time_period: Option<std::time::Duration>,
+		revalidate_block_period: Option<N>,
+	) -> RevalidationAction {
+		match self {
+			Self::Light(status) => RevalidationAction {
+				revalidate: status.next_required(
+					block,
+					revalidate_time_period,
+					revalidate_block_period
+				),
+				resubmit: false,
+				revalidate_amount: None,
+			},
+			Self::Always => RevalidationAction {
+				revalidate: true,
+				resubmit: true,
+				revalidate_amount: Some(16),
+			}
+		}
+	}
+}
+
+impl<N: Clone + Copy + SimpleArithmetic> RevalidationStatus<N> {
+	/// Called when revalidation is completed.
+	pub fn clear(&mut self) {
+		*self = Self::NotScheduled;
+	}
+
+	/// Returns true if revalidation is required.
+	pub fn next_required(
+		&mut self,
+		block: N,
+		revalidate_time_period: Option<std::time::Duration>,
+		revalidate_block_period: Option<N>,
+	) -> bool {
+		match *self {
+			Self::NotScheduled => {
+				*self = Self::Scheduled(
+					revalidate_time_period.map(|period| Instant::now() + period),
+					revalidate_block_period.map(|period| block + period),
+				);
+				false
+			},
+			Self::Scheduled(revalidate_at_time, revalidate_at_block) => {
+				let is_required = revalidate_at_time.map(|at| Instant::now() >= at).unwrap_or(false)
+					|| revalidate_at_block.map(|at| block >= at).unwrap_or(false);
+				if is_required {
+					*self = Self::InProgress;
+				}
+				is_required
+			},
+			Self::InProgress => false,
+		}
+	}
+}
+
+impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
+where
+	Block: BlockT,
+	PoolApi: 'static + sc_transaction_graph::ChainApi<Block=Block, Hash=Block::Hash, Error=error::Error>,
+{
+	fn maintain(&self, id: &BlockId<Self::Block>, retracted: &[BlockHash<Self>])
+		-> Pin<Box<dyn Future<Output=()> + Send>>
+	{
+		let id = id.clone();
+		let pool = self.pool.clone();
+		let api = self.api.clone();
+
+		let block_number = match api.block_id_to_number(&id) {
+			Ok(Some(number)) => number,
+			_ => {
+				log::trace!(target: "txqueue", "Skipping chain event - no number for that block {:?}", id);
+				return Box::pin(ready(()));
+			}
+		};
+
+		let next_action = self.revalidation_strategy.lock().next(
+			block_number,
+			Some(std::time::Duration::from_secs(60)),
+			Some(20.into()),
+		);
+		let revalidation_strategy = self.revalidation_strategy.clone();
+		let retracted = retracted.to_vec();
+
+		async move {
+			// We don't query block if we won't prune anything
+			if !pool.status().is_empty() {
+				let hashes = api.block_body(&id).await
+					.unwrap_or_else(|e| {
+						log::warn!("Prune known transactions: error request {:?}!", e);
+						None
+					})
+				.unwrap_or_default()
+				.into_iter()
+				.map(|tx| pool.hash_of(&tx))
+				.collect::<Vec<_>>();
+
+				if let Err(e) = pool.prune_known(&id, &hashes) {
+					log::error!("Cannot prune known in the pool {:?}!", e);
+				}
+			}
+
+			if next_action.resubmit {
+				let mut resubmit_transactions = Vec::new();
+
+				for retracted_hash in retracted {
+					let block_transactions = api.block_body(&BlockId::hash(retracted_hash.clone())).await
+						.unwrap_or_else(|e| {
+							log::warn!("Failed to fetch block body {:?}!", e);
+							None
+						})
+						.unwrap_or_default()
+						.into_iter()
+						.filter(|tx| tx.is_signed().unwrap_or(true));
+
+					resubmit_transactions.extend(block_transactions);
+				}
+				if let Err(e) = pool.submit_at(&id, resubmit_transactions, true).await {
+					log::debug!(target: "txpool",
+						"[{:?}] Error re-submitting transactions: {:?}", id, e
+					)
+				}
+			}
+
+			if next_action.revalidate {
+				if let Err(e) = pool.revalidate_ready(&id, next_action.revalidate_amount).await {
+					log::warn!("Revalidate ready failed {:?}", e);
+				}
+			}
+
+			revalidation_strategy.lock().clear();
+		}.boxed()
+	}
+}
diff --git a/substrate/client/transaction-pool/src/maintainer.rs b/substrate/client/transaction-pool/src/maintainer.rs
deleted file mode 100644
index 97dc7e10a6f..00000000000
--- a/substrate/client/transaction-pool/src/maintainer.rs
+++ /dev/null
@@ -1,645 +0,0 @@
-// Copyright 2019-2020 Parity Technologies (UK) Ltd.
-// This file is part of Substrate.
-
-// Substrate is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-
-// Substrate is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-// GNU General Public License for more details.
-
-// You should have received a copy of the GNU General Public License
-// along with Substrate.  If not, see <http://www.gnu.org/licenses/>.
-
-use std::{
-	marker::{PhantomData, Unpin},
-	sync::Arc,
-	time::Instant,
-};
-use futures::{
-	Future, FutureExt,
-	future::{Either, join, ready},
-};
-use log::{warn, debug, trace};
-use parking_lot::Mutex;
-
-use sc_client_api::{
-	client::BlockBody,
-	light::{Fetcher, RemoteBodyRequest},
-};
-use sp_runtime::{
-	generic::BlockId,
-	traits::{Block as BlockT, Extrinsic, Header, NumberFor, SimpleArithmetic},
-};
-use sp_blockchain::HeaderBackend;
-use sp_transaction_pool::{TransactionPoolMaintainer, runtime_api::TaggedTransactionQueue};
-use sp_api::ProvideRuntimeApi;
-
-use sc_transaction_graph::{self, ChainApi};
-
-/// Basic transaction pool maintainer for full clients.
-pub struct FullBasicPoolMaintainer<Client, PoolApi: ChainApi> {
-	pool: Arc<sc_transaction_graph::Pool<PoolApi>>,
-	client: Arc<Client>,
-}
-
-impl<Client, PoolApi: ChainApi> FullBasicPoolMaintainer<Client, PoolApi> {
-	/// Create new basic full pool maintainer.
-	pub fn new(
-		pool: Arc<sc_transaction_graph::Pool<PoolApi>>,
-		client: Arc<Client>,
-	) -> Self {
-		FullBasicPoolMaintainer { pool, client }
-	}
-}
-
-impl<Block, Client, PoolApi> TransactionPoolMaintainer
-for
-	FullBasicPoolMaintainer<Client, PoolApi>
-where
-	Block: BlockT,
-	Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + BlockBody<Block> + 'static,
-	Client::Api: TaggedTransactionQueue<Block>,
-	PoolApi: ChainApi<Block = Block, Hash = Block::Hash> + 'static,
-{
-	type Block = Block;
-	type Hash = Block::Hash;
-
-	fn maintain(
-		&self,
-		id: &BlockId<Block>,
-		retracted: &[Block::Hash],
-	) -> Box<dyn Future<Output=()> + Send + Unpin> {
-		let now = std::time::Instant::now();
-		let took = move || format!("Took {} ms", now.elapsed().as_millis());
-
-		let id = *id;
-		trace!(target: "txpool", "[{:?}] Starting pool maintainance", id);
-		// Put transactions from retracted blocks back into the pool.
-		let client_copy = self.client.clone();
-		let retracted_transactions = retracted.to_vec().into_iter()
-			.filter_map(move |hash| client_copy.block_body(&BlockId::hash(hash)).ok().unwrap_or(None))
-			.flat_map(|block| block.into_iter())
-			// if signed information is not present, attempt to resubmit anyway.
-			.filter(|tx| tx.is_signed().unwrap_or(true));
-		let resubmit_future = self.pool
-			.submit_at(&id, retracted_transactions, true)
-			.then(move |resubmit_result| ready(match resubmit_result {
-				Ok(_) => trace!(target: "txpool",
-					"[{:?}] Re-submitting retracted done. {}", id, took()
-				),
-				Err(e) => debug!(target: "txpool",
-					"[{:?}] Error re-submitting transactions: {:?}", id, e
-				),
-			}));
-
-		// Avoid calling into runtime if there is nothing to prune from the pool anyway.
-		if self.pool.status().is_empty() {
-			return Box::new(resubmit_future)
-		}
-
-		let block = (self.client.header(id), self.client.block_body(&id));
-		let prune_future = match block {
-			(Ok(Some(header)), Ok(Some(extrinsics))) => {
-				let parent_id = BlockId::hash(*header.parent_hash());
-				let prune_future = self.pool
-					.prune(&id, &parent_id, &extrinsics)
-					.then(move |prune_result| ready(match prune_result {
-						Ok(_) => trace!(target: "txpool",
-							"[{:?}] Pruning done. {}", id, took()
-						),
-						Err(e) => warn!(target: "txpool",
-							"[{:?}] Error pruning transactions: {:?}", id, e
-						),
-					}));
-
-				Either::Left(resubmit_future.then(|_| prune_future))
-			},
-			(Ok(_), Ok(_)) => Either::Right(resubmit_future),
-			err => {
-				warn!(target: "txpool", "[{:?}] Error reading block: {:?}", id, err);
-				Either::Right(resubmit_future)
-			},
-		};
-
-		let revalidate_future = self.pool
-			.revalidate_ready(&id, Some(16))
-			.then(move |result| ready(match result {
-				Ok(_) => debug!(target: "txpool",
-					"[{:?}] Revalidation done: {}", id, took()
-				),
-				Err(e) => warn!(target: "txpool",
-					"[{:?}] Encountered errors while revalidating transactions: {:?}", id, e
-				),
-			}));
-
-		Box::new(prune_future.then(|_| revalidate_future))
-	}
-}
-
-/// Basic transaction pool maintainer for light clients.
-pub struct LightBasicPoolMaintainer<Block: BlockT, Client, PoolApi: ChainApi, F> {
-	pool: Arc<sc_transaction_graph::Pool<PoolApi>>,
-	client: Arc<Client>,
-	fetcher: Arc<F>,
-	revalidate_time_period: Option<std::time::Duration>,
-	revalidate_block_period: Option<NumberFor<Block>>,
-	revalidation_status: Arc<Mutex<TxPoolRevalidationStatus<NumberFor<Block>>>>,
-	_phantom: PhantomData<Block>,
-}
-
-impl<Block, Client, PoolApi, F> LightBasicPoolMaintainer<Block, Client, PoolApi, F>
-	where
-		Block: BlockT,
-		Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + BlockBody<Block> + 'static,
-		Client::Api: TaggedTransactionQueue<Block>,
-		PoolApi: ChainApi<Block = Block, Hash = Block::Hash> + 'static,
-		F: Fetcher<Block> + 'static,
-{
-	/// Create light pool maintainer with default constants.
-	///
-	/// Default constants are: revalidate every 60 seconds or every 20 blocks
-	/// (whatever happens first).
-	pub fn with_defaults(
-		pool: Arc<sc_transaction_graph::Pool<PoolApi>>,
-		client: Arc<Client>,
-		fetcher: Arc<F>,
-	) -> Self {
-		Self::new(
-			pool,
-			client,
-			fetcher,
-			Some(std::time::Duration::from_secs(60)),
-			Some(20.into()),
-		)
-	}
-
-	/// Create light pool maintainer with passed constants.
-	pub fn new(
-		pool: Arc<sc_transaction_graph::Pool<PoolApi>>,
-		client: Arc<Client>,
-		fetcher: Arc<F>,
-		revalidate_time_period: Option<std::time::Duration>,
-		revalidate_block_period: Option<NumberFor<Block>>,
-	) -> Self {
-		Self {
-			pool,
-			client,
-			fetcher,
-			revalidate_time_period,
-			revalidate_block_period,
-			revalidation_status: Arc::new(Mutex::new(TxPoolRevalidationStatus::NotScheduled)),
-			_phantom: Default::default(),
-		}
-	}
-
-	/// Returns future that prunes block transactions from the pool.
-	fn prune(
-		&self,
-		id: &BlockId<Block>,
-		header: &Block::Header,
-	) -> impl std::future::Future<Output = ()> {
-		// fetch transactions (possible future optimization: proofs of inclusion) that
-		// have been included into new block and prune these from the pool
-		let id = id.clone();
-		let pool = self.pool.clone();
-		self.fetcher.remote_body(RemoteBodyRequest {
-			header: header.clone(),
-			retry_count: None,
-		})
-		.then(move |transactions| ready(
-			transactions
-				.map_err(|e| format!("{}", e))
-				.and_then(|transactions| {
-					let hashes = transactions
-						.into_iter()
-						.map(|tx| pool.hash_of(&tx))
-						.collect::<Vec<_>>();
-					pool.prune_known(&id, &hashes)
-						.map_err(|e| format!("{}", e))
-				})
-		))
-		.then(|r| {
-			if let Err(e) = r {
-				warn!("Error pruning known transactions: {}", e)
-			}
-			ready(())
-		})
-	}
-
-	/// Returns future that performs in-pool transations revalidation, if required.
-	fn revalidate(
-		&self,
-		id: &BlockId<Block>,
-		header: &Block::Header,
-	) -> impl std::future::Future<Output = ()> {
-		// to determine whether ready transaction is still valid, we perform periodic revalidaton
-		// of ready transactions
-		let is_revalidation_required = self.revalidation_status.lock().is_required(
-			*header.number(),
-			self.revalidate_time_period,
-			self.revalidate_block_period,
-		);
-		match is_revalidation_required {
-			true => {
-				let revalidation_status = self.revalidation_status.clone();
-				Either::Left(self.pool
-					.revalidate_ready(id, None)
-					.map(|r| r.map_err(|e| warn!("Error revalidating known transactions: {}", e)))
-					.map(move |_| revalidation_status.lock().clear()))
-			},
-			false => Either::Right(ready(())),
-		}
-	}
-}
-
-impl<Block, Client, PoolApi, F> TransactionPoolMaintainer
-for
-	LightBasicPoolMaintainer<Block, Client, PoolApi, F>
-where
-	Block: BlockT,
-	Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + BlockBody<Block> + 'static,
-	Client::Api: TaggedTransactionQueue<Block>,
-	PoolApi: ChainApi<Block = Block, Hash = Block::Hash> + 'static,
-	F: Fetcher<Block> + 'static,
-{
-	type Block = Block;
-	type Hash = Block::Hash;
-
-	fn maintain(
-		&self,
-		id: &BlockId<Block>,
-		_retracted: &[Block::Hash],
-	) -> Box<dyn Future<Output=()> + Send + Unpin> {
-		// Do nothing if transaction pool is empty.
-		if self.pool.status().is_empty() {
-			self.revalidation_status.lock().clear();
-			return Box::new(ready(()));
-		}
-		let header = self.client.header(*id)
-			.and_then(|h| h.ok_or(sp_blockchain::Error::UnknownBlock(format!("{}", id))));
-		let header = match header {
-			Ok(header) => header,
-			Err(err) => {
-				println!("Failed to maintain light tx pool: {:?}", err);
-				return Box::new(ready(()));
-			}
-		};
-
-		// else prune block transactions from the pool
-		let prune_future = self.prune(id, &header);
-
-		// and then (optionally) revalidate in-pool transactions
-		let revalidate_future = self.revalidate(id, &header);
-
-		let maintain_future = join(
-			prune_future,
-			revalidate_future,
-		).map(|_| ());
-
-		Box::new(maintain_future)
-	}
-}
-
-/// The status of transactions revalidation at light tx pool.
-#[cfg_attr(test, derive(Debug))]
-enum TxPoolRevalidationStatus<N> {
-	/// The revalidation has never been completed.
-	NotScheduled,
-	/// The revalidation is scheduled.
-	Scheduled(Option<std::time::Instant>, Option<N>),
-	/// The revalidation is in progress.
-	InProgress,
-}
-
-impl<N: Clone + Copy + SimpleArithmetic> TxPoolRevalidationStatus<N> {
-	/// Called when revalidation is completed.
-	pub fn clear(&mut self) {
-		*self = TxPoolRevalidationStatus::NotScheduled;
-	}
-
-	/// Returns true if revalidation is required.
-	pub fn is_required(
-		&mut self,
-		block: N,
-		revalidate_time_period: Option<std::time::Duration>,
-		revalidate_block_period: Option<N>,
-	) -> bool {
-		match *self {
-			TxPoolRevalidationStatus::NotScheduled => {
-				*self = TxPoolRevalidationStatus::Scheduled(
-					revalidate_time_period.map(|period| Instant::now() + period),
-					revalidate_block_period.map(|period| block + period),
-				);
-				false
-			},
-			TxPoolRevalidationStatus::Scheduled(revalidate_at_time, revalidate_at_block) => {
-				let is_required = revalidate_at_time.map(|at| Instant::now() >= at).unwrap_or(false)
-					|| revalidate_at_block.map(|at| block >= at).unwrap_or(false);
-				if is_required {
-					*self = TxPoolRevalidationStatus::InProgress;
-				}
-				is_required
-			},
-			TxPoolRevalidationStatus::InProgress => false,
-		}
-	}
-}
-
-#[cfg(test)]
-mod tests {
-	use super::*;
-	use futures::executor::block_on;
-	use codec::Encode;
-	use substrate_test_runtime_client::{
-		prelude::*, Client, runtime::{Block, Transfer}, sp_consensus::{BlockOrigin, SelectChain},
-		LongestChain,
-	};
-	use sp_transaction_pool::PoolStatus;
-	use crate::api::{FullChainApi, LightChainApi};
-
-	struct TestSetup<Api: ChainApi> {
-		client: Arc<Client<Backend>>,
-		longest_chain: LongestChain<Backend, Block>,
-		pool: Arc<sc_transaction_graph::Pool<Api>>,
-	}
-
-	impl<Api: ChainApi> TestSetup<Api> {
-		fn new() -> TestSetup<FullChainApi<Client<Backend>, Block>> {
-			let (client, longest_chain) = TestClientBuilder::new().build_with_longest_chain();
-			let client = Arc::new(client);
-			let pool = Arc::new(
-				sc_transaction_graph::Pool::new(Default::default(), FullChainApi::new(client.clone())),
-			);
-			TestSetup {
-				client,
-				longest_chain,
-				pool,
-			}
-		}
-
-		fn new_light<F>(fetcher: Arc<F>) -> TestSetup<LightChainApi<Client<Backend>, F, Block>>
-		where F: Fetcher<Block> + 'static,
-		{
-			let (client, longest_chain) = TestClientBuilder::new().build_with_longest_chain();
-			let client = Arc::new(client);
-			let pool = Arc::new(
-				sc_transaction_graph::Pool::new(
-					Default::default(),
-					LightChainApi::new(client.clone(), fetcher)
-				),
-			);
-			TestSetup {
-				client,
-				longest_chain,
-				pool,
-			}
-		}
-	}
-
-	fn setup() -> TestSetup<FullChainApi<Client<Backend>, Block>> {
-		TestSetup::<FullChainApi<Client<Backend>, Block>>::new()
-	}
-
-	fn setup_light<F>(fetcher: Arc<F>) -> TestSetup<LightChainApi<Client<Backend>, F, Block>>
-	where F: Fetcher<Block> + 'static,
-	{
-		TestSetup::<LightChainApi<Client<Backend>, F, Block>>::new_light(fetcher)
-	}
-
-	#[test]
-	fn should_remove_transactions_from_the_full_pool() {
-		let mut setup = setup();
-
-		let transaction = Transfer {
-			amount: 5,
-			nonce: 0,
-			from: AccountKeyring::Alice.into(),
-			to: Default::default(),
-		}.into_signed_tx();
-		let best = setup.longest_chain.best_chain().unwrap();
-
-		// store the transaction in the pool
-		block_on(setup.pool.submit_one(&BlockId::hash(best.hash()), transaction.clone())).unwrap();
-
-		// import the block
-		let mut builder = setup.client.new_block(Default::default()).unwrap();
-		builder.push(transaction.clone()).unwrap();
-		let block = builder.build().unwrap().block;
-		let id = BlockId::hash(block.header().hash());
-		setup.client.import(BlockOrigin::Own, block).unwrap();
-
-		// fire notification - this should clean up the queue
-		assert_eq!(setup.pool.status().ready, 1);
-		block_on(FullBasicPoolMaintainer::new(setup.pool.clone(), setup.client.clone()).maintain(&id, &[]));
-
-		// then
-		assert_eq!(setup.pool.status().ready, 0);
-		assert_eq!(setup.pool.status().future, 0);
-	}
-
-	#[test]
-	fn should_remove_transactions_from_the_light_pool() {
-		let transaction = Transfer {
-			amount: 5,
-			nonce: 0,
-			from: AccountKeyring::Alice.into(),
-			to: Default::default(),
-		}.into_signed_tx();
-		let fetcher_transaction = transaction.clone();
-		let fetcher = Arc::new(substrate_test_runtime_client::new_light_fetcher()
-			.with_remote_body(Some(Box::new(move |_| Ok(vec![fetcher_transaction.clone()]))))
-			.with_remote_call(Some(Box::new(move |_| {
-				let validity: sp_runtime::transaction_validity::TransactionValidity =
-					Ok(sp_runtime::transaction_validity::ValidTransaction {
-						priority: 0,
-						requires: Vec::new(),
-						provides: vec![vec![42]],
-						longevity: 0,
-						propagate: true,
-					});
-				Ok(validity.encode())
-			}))));
-
-		let setup = setup_light(fetcher.clone());
-		let best = setup.longest_chain.best_chain().unwrap();
-
-		// store the transaction in the pool
-		block_on(setup.pool.submit_one(&BlockId::hash(best.hash()), transaction.clone())).unwrap();
-
-		// fire notification - this should clean up the queue
-		assert_eq!(setup.pool.status().ready, 1);
-		block_on(LightBasicPoolMaintainer::with_defaults(setup.pool.clone(), setup.client.clone(), fetcher).maintain(
-			&BlockId::Number(0),
-			&[],
-		));
-
-		// then
-		assert_eq!(setup.pool.status().ready, 0);
-		assert_eq!(setup.pool.status().future, 0);
-	}
-
-	#[test]
-	fn should_schedule_transactions_revalidation_at_light_pool() {
-		// when revalidation is not scheduled, it became scheduled
-		let mut status = TxPoolRevalidationStatus::NotScheduled;
-		assert!(!status.is_required(10u32, None, None));
-		match status {
-			TxPoolRevalidationStatus::Scheduled(_, _) => (),
-			_ => panic!("Unexpected status: {:?}", status),
-		}
-
-		// revalidation required at time
-		let mut status = TxPoolRevalidationStatus::Scheduled(Some(Instant::now()), None);
-		assert!(status.is_required(10u32, None, None));
-		match status {
-			TxPoolRevalidationStatus::InProgress => (),
-			_ => panic!("Unexpected status: {:?}", status),
-		}
-
-		// revalidation required at block
-		let mut status = TxPoolRevalidationStatus::Scheduled(None, Some(10));
-		assert!(status.is_required(10u32, None, None));
-		match status {
-			TxPoolRevalidationStatus::InProgress => (),
-			_ => panic!("Unexpected status: {:?}", status),
-		}
-	}
-
-	#[test]
-	fn should_revalidate_transactions_at_light_pool() {
-		use std::sync::atomic;
-		use sp_runtime::transaction_validity::*;
-
-		let build_fetcher = || {
-			let validated = Arc::new(atomic::AtomicBool::new(false));
-			Arc::new(substrate_test_runtime_client::new_light_fetcher()
-				.with_remote_body(Some(Box::new(move |_| Ok(vec![]))))
-				.with_remote_call(Some(Box::new(move |_| {
-					let is_inserted = validated.swap(true, atomic::Ordering::SeqCst);
-					let validity: TransactionValidity = if is_inserted {
-						Err(TransactionValidityError::Invalid(
-							InvalidTransaction::Custom(0)
-						))
-					} else {
-						Ok(ValidTransaction {
-							priority: 0,
-							requires: Vec::new(),
-							provides: vec![vec![42]],
-							longevity: 0,
-							propagate: true,
-						})
-					};
-					Ok(validity.encode())
-				}))))
-		};
-
-		fn with_fetcher_maintain<F: Fetcher<Block> + 'static>(
-			fetcher: Arc<F>,
-			revalidate_time_period: Option<std::time::Duration>,
-			revalidate_block_period: Option<u64>,
-			prepare_maintainer: impl Fn(&Mutex<TxPoolRevalidationStatus<u64>>),
-		) -> PoolStatus {
-			let setup = setup_light(fetcher.clone());
-			let best = setup.longest_chain.best_chain().unwrap();
-
-			// let's prepare maintainer
-			let maintainer = LightBasicPoolMaintainer::new(
-				setup.pool.clone(),
-				setup.client.clone(),
-				fetcher,
-				revalidate_time_period,
-				revalidate_block_period,
-			);
-			prepare_maintainer(&*maintainer.revalidation_status);
-
-			// store the transaction in the pool
-			block_on(setup.pool.submit_one(
-				&BlockId::hash(best.hash()),
-				Transfer {
-					amount: 5,
-					nonce: 0,
-					from: AccountKeyring::Alice.into(),
-					to: Default::default(),
-				}.into_signed_tx(),
-			)).unwrap();
-
-			// and run maintain procedures
-			block_on(maintainer.maintain(&BlockId::Number(0), &[]));
-
-			setup.pool.status()
-		}
-
-		// when revalidation is never required - nothing happens
-		let fetcher = build_fetcher();
-		//let maintainer = DefaultLightTransactionPoolMaintainer::new(client.clone(), fetcher.clone(), None, None);
-		let status = with_fetcher_maintain(fetcher, None, None, |_revalidation_status| {});
-		assert_eq!(status.ready, 1);
-
-		// when revalidation is scheduled by time - it is performed
-		let fetcher = build_fetcher();
-		let status = with_fetcher_maintain(fetcher, None, None, |revalidation_status|
-			*revalidation_status.lock() = TxPoolRevalidationStatus::Scheduled(Some(Instant::now()), None)
-		);
-		assert_eq!(status.ready, 0);
-
-		// when revalidation is scheduled by block number - it is performed
-		let fetcher = build_fetcher();
-		let status = with_fetcher_maintain(fetcher, None, None, |revalidation_status|
-			*revalidation_status.lock() = TxPoolRevalidationStatus::Scheduled(None, Some(0))
-		);
-		assert_eq!(status.ready, 0);
-	}
-
-	#[test]
-	fn should_add_reverted_transactions_to_the_pool() {
-		let mut setup = setup();
-
-		let transaction = Transfer {
-			amount: 5,
-			nonce: 0,
-			from: AccountKeyring::Alice.into(),
-			to: Default::default(),
-		}.into_signed_tx();
-		let best = setup.longest_chain.best_chain().unwrap();
-
-		// store the transaction in the pool
-		block_on(setup.pool.submit_one(&BlockId::hash(best.hash()), transaction.clone())).unwrap();
-
-		// import the block
-		let mut builder = setup.client.new_block(Default::default()).unwrap();
-		builder.push(transaction.clone()).unwrap();
-		let block = builder.build().unwrap().block;
-		let block1_hash = block.header().hash();
-		let id = BlockId::hash(block1_hash.clone());
-		setup.client.import(BlockOrigin::Own, block).unwrap();
-
-		// fire notification - this should clean up the queue
-		assert_eq!(setup.pool.status().ready, 1);
-		block_on(FullBasicPoolMaintainer::new(setup.pool.clone(), setup.client.clone()).maintain(&id, &[]));
-
-		// then
-		assert_eq!(setup.pool.status().ready, 0);
-		assert_eq!(setup.pool.status().future, 0);
-
-		// import second block
-		let builder = setup.client.new_block_at(
-			&BlockId::hash(best.hash()),
-			Default::default(),
-			false,
-		).unwrap();
-		let block = builder.build().unwrap().block;
-		let id = BlockId::hash(block.header().hash());
-		setup.client.import(BlockOrigin::Own, block).unwrap();
-
-		// fire notification - this should add the transaction back to the pool.
-		block_on(FullBasicPoolMaintainer::new(setup.pool.clone(), setup.client.clone()).maintain(&id, &[block1_hash]));
-
-		// then
-		assert_eq!(setup.pool.status().ready, 1);
-		assert_eq!(setup.pool.status().future, 0);
-	}
-}
diff --git a/substrate/client/transaction-pool/src/tests.rs b/substrate/client/transaction-pool/src/tests.rs
index 1199e41cf87..778536b7b9a 100644
--- a/substrate/client/transaction-pool/src/tests.rs
+++ b/substrate/client/transaction-pool/src/tests.rs
@@ -14,29 +14,53 @@
 // You should have received a copy of the GNU General Public License
 // along with Substrate.  If not, see <http://www.gnu.org/licenses/>.
 
-
 use super::*;
 
+use crate::{BasicPool, MaintainedTransactionPool};
 use codec::Encode;
 use futures::executor::block_on;
-use sc_transaction_graph::{self, Pool};
-use substrate_test_runtime_client::{runtime::{AccountId, Block, Hash, Index, Extrinsic, Transfer}, AccountKeyring::{self, *}};
+use parking_lot::RwLock;
+use sc_transaction_graph::{self, ExHash, Pool};
 use sp_runtime::{
 	generic::{self, BlockId},
-	traits::{Hash as HashT, BlakeTwo256},
-	transaction_validity::{TransactionValidity, ValidTransaction},
+	traits::{BlakeTwo256, Hash as HashT},
+	transaction_validity::{TransactionValidity, ValidTransaction, TransactionValidityError, InvalidTransaction},
+};
+use std::collections::HashSet;
+use substrate_test_runtime_client::{
+	runtime::{AccountId, Block, BlockNumber, Extrinsic, Hash, Header, Index, Transfer},
+	AccountKeyring::{self, *},
 };
 
 struct TestApi {
-	pub modifier: Box<dyn Fn(&mut ValidTransaction) + Send + Sync>,
+	pub modifier: RwLock<Box<dyn Fn(&mut ValidTransaction) + Send + Sync>>,
+	pub chain_block_by_number: RwLock<HashMap<BlockNumber, Vec<Extrinsic>>>,
+	pub chain_headers_by_number: RwLock<HashMap<BlockNumber, Header>>,
+	pub invalid_hashes: RwLock<HashSet<ExHash<Self>>>,
+	pub validation_requests: RwLock<Vec<Extrinsic>>,
 }
 
 impl TestApi {
 	fn default() -> Self {
 		TestApi {
-			modifier: Box::new(|_| {}),
+			modifier: RwLock::new(Box::new(|_| {})),
+			chain_block_by_number: RwLock::new(HashMap::new()),
+			invalid_hashes: RwLock::new(HashSet::new()),
+			chain_headers_by_number: RwLock::new(HashMap::new()),
+			validation_requests: RwLock::new(Default::default()),
 		}
 	}
+
+	fn push_block(&self, block_number: BlockNumber, xts: Vec<Extrinsic>) {
+		self.chain_block_by_number.write().insert(block_number, xts);
+		self.chain_headers_by_number.write().insert(block_number, Header {
+			number: block_number,
+			digest: Default::default(),
+			extrinsics_root:  Default::default(),
+			parent_hash: Default::default(),
+			state_root: Default::default(),
+		});
+	}
 }
 
 impl sc_transaction_graph::ChainApi for TestApi {
@@ -44,12 +68,16 @@ impl sc_transaction_graph::ChainApi for TestApi {
 	type Hash = Hash;
 	type Error = error::Error;
 	type ValidationFuture = futures::future::Ready<error::Result<TransactionValidity>>;
+	type BodyFuture = futures::future::Ready<error::Result<Option<Vec<Extrinsic>>>>;
 
 	fn validate_transaction(
 		&self,
 		at: &BlockId<Self::Block>,
 		uxt: sc_transaction_graph::ExtrinsicFor<Self>,
 	) -> Self::ValidationFuture {
+
+		self.validation_requests.write().push(uxt.clone());
+
 		let expected = index(at);
 		let requires = if expected == uxt.transfer().nonce {
 			vec![]
@@ -58,6 +86,12 @@ impl sc_transaction_graph::ChainApi for TestApi {
 		};
 		let provides = vec![vec![uxt.transfer().nonce as u8]];
 
+		if self.invalid_hashes.read().contains(&self.hash_and_length(&uxt).0) {
+			return futures::future::ready(Ok(
+				Err(TransactionValidityError::Invalid(InvalidTransaction::Custom(0)))
+			))
+		}
+
 		let mut validity = ValidTransaction {
 			priority: 1,
 			requires,
@@ -66,29 +100,43 @@ impl sc_transaction_graph::ChainApi for TestApi {
 			propagate: true,
 		};
 
-		(self.modifier)(&mut validity);
+		(self.modifier.read())(&mut validity);
 
-		futures::future::ready(Ok(
-			Ok(validity)
-		))
+		futures::future::ready(Ok(Ok(validity)))
 	}
 
-	fn block_id_to_number(&self, at: &BlockId<Self::Block>) -> error::Result<Option<sc_transaction_graph::NumberFor<Self>>> {
+	fn block_id_to_number(
+		&self,
+		at: &BlockId<Self::Block>,
+	) -> error::Result<Option<sc_transaction_graph::NumberFor<Self>>> {
 		Ok(Some(number_of(at)))
 	}
 
-	fn block_id_to_hash(&self, at: &BlockId<Self::Block>) -> error::Result<Option<sc_transaction_graph::BlockHash<Self>>> {
+	fn block_id_to_hash(
+		&self,
+		at: &BlockId<Self::Block>,
+	) -> error::Result<Option<sc_transaction_graph::BlockHash<Self>>> {
 		Ok(match at {
 			generic::BlockId::Hash(x) => Some(x.clone()),
 			_ => Some(Default::default()),
 		})
 	}
 
-	fn hash_and_length(&self, ex: &sc_transaction_graph::ExtrinsicFor<Self>) -> (Self::Hash, usize) {
+	fn hash_and_length(
+		&self,
+		ex: &sc_transaction_graph::ExtrinsicFor<Self>,
+	) -> (Self::Hash, usize) {
 		let encoded = ex.encode();
 		(BlakeTwo256::hash(&encoded), encoded.len())
 	}
 
+	fn block_body(&self, id: &BlockId<Self::Block>) -> Self::BodyFuture {
+		futures::future::ready(Ok(if let BlockId::Number(num) = id {
+			self.chain_block_by_number.read().get(num).cloned()
+		} else {
+			None
+		}))
+	}
 }
 
 fn index(at: &BlockId<Block>) -> u64 {
@@ -114,7 +162,11 @@ fn uxt(who: AccountKeyring, nonce: Index) -> Extrinsic {
 }
 
 fn pool() -> Pool<TestApi> {
-	Pool::new(Default::default(), TestApi::default())
+	Pool::new(Default::default(), TestApi::default().into())
+}
+
+fn maintained_pool() -> BasicPool<TestApi, Block> {
+	BasicPool::new(Default::default(), TestApi::default())
 }
 
 #[test]
@@ -192,11 +244,11 @@ fn should_ban_invalid_transactions() {
 
 #[test]
 fn should_correctly_prune_transactions_providing_more_than_one_tag() {
-	let mut api = TestApi::default();
-	api.modifier = Box::new(|v: &mut ValidTransaction| {
+	let api = TestApi::default();
+	*api.modifier.write() = Box::new(|v: &mut ValidTransaction| {
 		v.provides.push(vec![155]);
 	});
-	let pool = Pool::new(Default::default(), api);
+	let pool = Pool::new(Default::default(), Arc::new(api));
 	let xt = uxt(Alice, 209);
 	block_on(pool.submit_one(&BlockId::number(0), xt.clone())).expect("1. Imported");
 	assert_eq!(pool.status().ready, 1);
@@ -220,3 +272,38 @@ fn should_correctly_prune_transactions_providing_more_than_one_tag() {
 	assert_eq!(pool.status().ready, 0);
 	assert_eq!(pool.status().future, 2);
 }
+
+#[test]
+fn should_prune_old_during_maintenance() {
+	let xt = uxt(Alice, 209);
+
+	let pool = maintained_pool();
+
+	block_on(pool.submit_one(&BlockId::number(0), xt.clone())).expect("1. Imported");
+	assert_eq!(pool.status().ready, 1);
+
+	pool.api.push_block(1, vec![xt.clone()]);
+
+	block_on(pool.maintain(&BlockId::number(1), &[]));
+	assert_eq!(pool.status().ready, 0);
+}
+
+
+#[test]
+fn should_revalidate_during_maintenance() {
+	let xt1 = uxt(Alice, 209);
+	let xt2 = uxt(Alice, 210);
+
+	let pool = maintained_pool();
+	block_on(pool.submit_one(&BlockId::number(0), xt1.clone())).expect("1. Imported");
+	block_on(pool.submit_one(&BlockId::number(0), xt2.clone())).expect("2. Imported");
+	assert_eq!(pool.status().ready, 2);
+	assert_eq!(pool.api.validation_requests.read().len(), 2);
+
+	pool.api.push_block(1, vec![xt1.clone()]);
+
+	block_on(pool.maintain(&BlockId::number(1), &[]));
+	assert_eq!(pool.status().ready, 1);
+	// test that pool revalidated transaction that left ready and not included in the block
+	assert_eq!(pool.api.validation_requests.read().len(), 3);
+}
diff --git a/substrate/primitives/transaction-pool/src/pool.rs b/substrate/primitives/transaction-pool/src/pool.rs
index e67a9890755..ed24ad0619a 100644
--- a/substrate/primitives/transaction-pool/src/pool.rs
+++ b/substrate/primitives/transaction-pool/src/pool.rs
@@ -20,6 +20,7 @@ use std::{
 	collections::HashMap,
 	hash::Hash,
 	sync::Arc,
+	pin::Pin,
 };
 use futures::{
 	Future, Stream,
@@ -225,6 +226,13 @@ pub trait TransactionPool: Send + Sync {
 	fn hash_of(&self, xt: &TransactionFor<Self>) -> TxHash<Self>;
 }
 
+/// Trait for transaction pool maintenance.
+pub trait MaintainedTransactionPool : TransactionPool {
+	/// Perform maintenance
+	fn maintain(&self, block: &BlockId<Self::Block>, retracted: &[BlockHash<Self>])
+		-> Pin<Box<dyn Future<Output=()> + Send>>;
+}
+
 /// An abstraction for transaction pool.
 ///
 /// This trait is used by offchain calls to be able to submit transactions.
@@ -264,109 +272,4 @@ impl<TPool: TransactionPool> OffchainSubmitTransaction<TPool::Block> for TPool {
 				e
 			))
 	}
-}
-
-/// Transaction pool maintainer interface.
-pub trait TransactionPoolMaintainer: Send + Sync {
-	/// Block type.
-	type Block: BlockT;
-	/// Transaction Hash type.
-	type Hash: Hash + Eq + Member + Serialize;
-
-	/// Returns a future that performs maintenance procedures on the pool when
-	/// with given hash is imported.
-	fn maintain(
-		&self,
-		id: &BlockId<Self::Block>,
-		retracted: &[Self::Hash],
-	) -> Box<dyn Future<Output=()> + Send + Unpin>;
-}
-
-/// Maintainable pool implementation.
-pub struct MaintainableTransactionPool<Pool, Maintainer> {
-	pool: Pool,
-	maintainer: Maintainer,
-}
-
-impl<Pool, Maintainer> MaintainableTransactionPool<Pool, Maintainer> {
-	/// Create new maintainable pool using underlying pool and maintainer.
-	pub fn new(pool: Pool, maintainer: Maintainer) -> Self {
-		MaintainableTransactionPool { pool, maintainer }
-	}
-}
-
-impl<Pool, Maintainer> TransactionPool for MaintainableTransactionPool<Pool, Maintainer>
-	where
-		Pool: TransactionPool,
-		Maintainer: Send + Sync,
-{
-	type Block = Pool::Block;
-	type Hash = Pool::Hash;
-	type InPoolTransaction = Pool::InPoolTransaction;
-	type Error = Pool::Error;
-
-	fn submit_at(
-		&self,
-		at: &BlockId<Self::Block>,
-		xts: impl IntoIterator<Item=TransactionFor<Self>> + 'static,
-	) -> Box<dyn Future<Output=Result<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error>> + Send + Unpin> {
-		self.pool.submit_at(at, xts)
-	}
-
-	fn submit_one(
-		&self,
-		at: &BlockId<Self::Block>,
-		xt: TransactionFor<Self>,
-	) -> Box<dyn Future<Output=Result<TxHash<Self>, Self::Error>> + Send + Unpin> {
-		self.pool.submit_one(at, xt)
-	}
-
-	fn submit_and_watch(
-		&self,
-		at: &BlockId<Self::Block>,
-		xt: TransactionFor<Self>,
-	) -> Box<dyn Future<Output=Result<Box<TransactionStatusStreamFor<Self>>, Self::Error>> + Send + Unpin> {
-		self.pool.submit_and_watch(at, xt)
-	}
-
-	fn remove_invalid(&self, hashes: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>> {
-		self.pool.remove_invalid(hashes)
-	}
-
-	fn status(&self) -> PoolStatus {
-		self.pool.status()
-	}
-
-	fn ready(&self) -> Box<dyn Iterator<Item=Arc<Self::InPoolTransaction>>> {
-		self.pool.ready()
-	}
-
-	fn import_notification_stream(&self) -> ImportNotificationStream {
-		self.pool.import_notification_stream()
-	}
-
-	fn hash_of(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
-		self.pool.hash_of(xt)
-	}
-
-	fn on_broadcasted(&self, propagations: HashMap<TxHash<Self>, Vec<String>>) {
-		self.pool.on_broadcasted(propagations)
-	}
-}
-
-impl<Pool, Maintainer> TransactionPoolMaintainer for MaintainableTransactionPool<Pool, Maintainer>
-	where
-		Pool: Send + Sync,
-		Maintainer: TransactionPoolMaintainer
-{
-	type Block = Maintainer::Block;
-	type Hash = Maintainer::Hash;
-
-	fn maintain(
-		&self,
-		id: &BlockId<Self::Block>,
-		retracted: &[Self::Hash],
-	) -> Box<dyn Future<Output=()> + Send + Unpin> {
-		self.maintainer.maintain(id, retracted)
-	}
-}
+}
\ No newline at end of file
-- 
GitLab