diff --git a/Cargo.lock b/Cargo.lock
index a86f82efc2d90af4b79b859596c276b146ccc828..8c0957db4f03ff06cabb9bb7e85daa176f519099 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -11354,6 +11354,7 @@ dependencies = [
  "sc-transaction-pool-api",
  "serde",
  "serde_json",
+ "sp-blockchain",
  "sp-consensus",
  "sp-core 28.0.0",
  "sp-inherents 26.0.0",
@@ -23861,6 +23862,7 @@ version = "28.0.0"
 dependencies = [
  "async-trait",
  "futures",
+ "indexmap 2.7.0",
  "log",
  "parity-scale-codec",
  "serde",
diff --git a/prdoc/pr_6661.prdoc b/prdoc/pr_6661.prdoc
new file mode 100644
index 0000000000000000000000000000000000000000..2e39646fededadca2fcd50dc5a835ce10c688d84
--- /dev/null
+++ b/prdoc/pr_6661.prdoc
@@ -0,0 +1,17 @@
+title: '`txpool api`: `remove_invalid` call improved'
+doc:
+- audience: Node Dev
+  description: |-
+    Currently the transaction which is reported as invalid by a block builder (or `removed_invalid` by other components) is silently skipped. This PR improves this behavior. The transaction pool `report_invalid` function now accepts optional error associated with every reported transaction, and also the optional block hash which both provide hints how reported invalid transaction shall be handled.  Depending on error, the transaction pool can decide if transaction shall be removed from the view only or entirely from the pool. Invalid event will be dispatched if required.
+
+crates:
+- name: sc-transaction-pool-api
+  bump: minor
+- name: sc-transaction-pool
+  bump: minor
+- name: sc-rpc-spec-v2
+  bump: minor
+- name: sc-rpc
+  bump: minor
+- name: sc-basic-authorship
+  bump: minor
diff --git a/substrate/bin/node/bench/Cargo.toml b/substrate/bin/node/bench/Cargo.toml
index 83f7b82cd2b51e755c4b9d9273a281db12a23ad7..b5f94060bb657f441efc1d1c32012914eb28986a 100644
--- a/substrate/bin/node/bench/Cargo.toml
+++ b/substrate/bin/node/bench/Cargo.toml
@@ -36,6 +36,7 @@ sc-transaction-pool = { workspace = true, default-features = true }
 sc-transaction-pool-api = { workspace = true, default-features = true }
 serde = { workspace = true, default-features = true }
 serde_json = { workspace = true, default-features = true }
+sp-blockchain = { workspace = true, default-features = true }
 sp-consensus = { workspace = true, default-features = true }
 sp-core = { workspace = true, default-features = true }
 sp-inherents = { workspace = true, default-features = true }
diff --git a/substrate/bin/node/bench/src/construct.rs b/substrate/bin/node/bench/src/construct.rs
index 22129c6a1d69dacfed6b85b619e00f3f147a24e2..9049732d6d376121bf110817f8a1229d061d1b1a 100644
--- a/substrate/bin/node/bench/src/construct.rs
+++ b/substrate/bin/node/bench/src/construct.rs
@@ -31,7 +31,7 @@ use node_primitives::Block;
 use node_testing::bench::{BenchDb, BlockType, DatabaseType, KeyTypes};
 use sc_transaction_pool_api::{
 	ImportNotificationStream, PoolStatus, ReadyTransactions, TransactionFor, TransactionSource,
-	TransactionStatusStreamFor, TxHash,
+	TransactionStatusStreamFor, TxHash, TxInvalidityReportMap,
 };
 use sp_consensus::{Environment, Proposer};
 use sp_inherents::InherentDataProvider;
@@ -271,7 +271,11 @@ impl sc_transaction_pool_api::TransactionPool for Transactions {
 		unimplemented!()
 	}
 
-	fn remove_invalid(&self, _hashes: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>> {
+	fn report_invalid(
+		&self,
+		_at: Option<Self::Hash>,
+		_invalid_tx_errors: TxInvalidityReportMap<TxHash<Self>>,
+	) -> Vec<Arc<Self::InPoolTransaction>> {
 		Default::default()
 	}
 
diff --git a/substrate/client/basic-authorship/src/basic_authorship.rs b/substrate/client/basic-authorship/src/basic_authorship.rs
index 2096af1c25bb771f7307dc5ee286c93217df804f..b3519f47a158cdfe7b28e68203b42dc27382a7ed 100644
--- a/substrate/client/basic-authorship/src/basic_authorship.rs
+++ b/substrate/client/basic-authorship/src/basic_authorship.rs
@@ -29,7 +29,7 @@ use futures::{
 use log::{debug, error, info, trace, warn};
 use sc_block_builder::{BlockBuilderApi, BlockBuilderBuilder};
 use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_INFO};
-use sc_transaction_pool_api::{InPoolTransaction, TransactionPool};
+use sc_transaction_pool_api::{InPoolTransaction, TransactionPool, TxInvalidityReportMap};
 use sp_api::{ApiExt, CallApiAt, ProvideRuntimeApi};
 use sp_blockchain::{ApplyExtrinsicFailed::Validity, Error::ApplyExtrinsicFailed, HeaderBackend};
 use sp_consensus::{DisableProofRecording, EnableProofRecording, ProofRecording, Proposal};
@@ -413,7 +413,7 @@ where
 		let soft_deadline =
 			now + time::Duration::from_micros(self.soft_deadline_percent.mul_floor(left_micros));
 		let mut skipped = 0;
-		let mut unqueue_invalid = Vec::new();
+		let mut unqueue_invalid = TxInvalidityReportMap::new();
 
 		let delay = deadline.saturating_duration_since((self.now)()) / 8;
 		let mut pending_iterator =
@@ -512,7 +512,13 @@ where
 						target: LOG_TARGET,
 						"[{:?}] Invalid transaction: {} at: {}", pending_tx_hash, e, self.parent_hash
 					);
-					unqueue_invalid.push(pending_tx_hash);
+
+					let error_to_report = match e {
+						ApplyExtrinsicFailed(Validity(e)) => Some(e),
+						_ => None,
+					};
+
+					unqueue_invalid.insert(pending_tx_hash, error_to_report);
 				},
 			}
 		};
@@ -524,7 +530,7 @@ where
 			);
 		}
 
-		self.transaction_pool.remove_invalid(&unqueue_invalid);
+		self.transaction_pool.report_invalid(Some(self.parent_hash), unqueue_invalid);
 		Ok(end_reason)
 	}
 
diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests/middleware_pool.rs b/substrate/client/rpc-spec-v2/src/transaction/tests/middleware_pool.rs
index a543969a89b83d4a0ad8d8e789c361d472759095..b06201564c24e6ddd96c3367d988c72f735c7b20 100644
--- a/substrate/client/rpc-spec-v2/src/transaction/tests/middleware_pool.rs
+++ b/substrate/client/rpc-spec-v2/src/transaction/tests/middleware_pool.rs
@@ -21,7 +21,7 @@ use codec::Encode;
 use sc_transaction_pool::BasicPool;
 use sc_transaction_pool_api::{
 	ImportNotificationStream, PoolStatus, ReadyTransactions, TransactionFor, TransactionPool,
-	TransactionSource, TransactionStatusStreamFor, TxHash,
+	TransactionSource, TransactionStatusStreamFor, TxHash, TxInvalidityReportMap,
 };
 
 use crate::hex_string;
@@ -137,8 +137,12 @@ impl TransactionPool for MiddlewarePool {
 		Ok(watcher.boxed())
 	}
 
-	fn remove_invalid(&self, hashes: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>> {
-		self.inner_pool.remove_invalid(hashes)
+	fn report_invalid(
+		&self,
+		at: Option<<Self::Block as BlockT>::Hash>,
+		invalid_tx_errors: TxInvalidityReportMap<TxHash<Self>>,
+	) -> Vec<Arc<Self::InPoolTransaction>> {
+		self.inner_pool.report_invalid(at, invalid_tx_errors)
 	}
 
 	fn status(&self) -> PoolStatus {
diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs
index 2fd4ce2454565dfaa4f25c607b1fed83650a7f45..66b0a06bfe3c0e5a8b5105a5b46515199f6893a1 100644
--- a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs
+++ b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs
@@ -228,7 +228,7 @@ where
 			}
 
 			// Best effort pool removal (tx can already be finalized).
-			pool.remove_invalid(&[broadcast_state.tx_hash]);
+			pool.report_invalid(None, [(broadcast_state.tx_hash, None)].into());
 		});
 
 		// Keep track of this entry and the abortable handle.
diff --git a/substrate/client/rpc/src/author/mod.rs b/substrate/client/rpc/src/author/mod.rs
index 6afc871e565af4b5e2b96453c7d10ff5fc53c097..0c99da106baffeb9c8545626ef8993b719644033 100644
--- a/substrate/client/rpc/src/author/mod.rs
+++ b/substrate/client/rpc/src/author/mod.rs
@@ -21,19 +21,17 @@
 #[cfg(test)]
 mod tests;
 
-use std::sync::Arc;
-
+use self::error::{Error, Result};
 use crate::{
 	utils::{spawn_subscription_task, BoundedVecDeque, PendingSubscription},
 	SubscriptionTaskExecutor,
 };
-
 use codec::{Decode, Encode};
 use jsonrpsee::{core::async_trait, types::ErrorObject, Extensions, PendingSubscriptionSink};
 use sc_rpc_api::check_if_safe;
 use sc_transaction_pool_api::{
 	error::IntoPoolError, BlockHash, InPoolTransaction, TransactionFor, TransactionPool,
-	TransactionSource, TxHash,
+	TransactionSource, TxHash, TxInvalidityReportMap,
 };
 use sp_api::{ApiExt, ProvideRuntimeApi};
 use sp_blockchain::HeaderBackend;
@@ -41,8 +39,8 @@ use sp_core::Bytes;
 use sp_keystore::{KeystoreExt, KeystorePtr};
 use sp_runtime::traits::Block as BlockT;
 use sp_session::SessionKeys;
+use std::sync::Arc;
 
-use self::error::{Error, Result};
 /// Re-export the API for backward compatibility.
 pub use sc_rpc_api::author::*;
 
@@ -164,17 +162,17 @@ where
 		let hashes = bytes_or_hash
 			.into_iter()
 			.map(|x| match x {
-				hash::ExtrinsicOrHash::Hash(h) => Ok(h),
+				hash::ExtrinsicOrHash::Hash(h) => Ok((h, None)),
 				hash::ExtrinsicOrHash::Extrinsic(bytes) => {
 					let xt = Decode::decode(&mut &bytes[..])?;
-					Ok(self.pool.hash_of(&xt))
+					Ok((self.pool.hash_of(&xt), None))
 				},
 			})
-			.collect::<Result<Vec<_>>>()?;
+			.collect::<Result<TxInvalidityReportMap<TxHash<P>>>>()?;
 
 		Ok(self
 			.pool
-			.remove_invalid(&hashes)
+			.report_invalid(None, hashes)
 			.into_iter()
 			.map(|tx| tx.hash().clone())
 			.collect())
diff --git a/substrate/client/transaction-pool/api/Cargo.toml b/substrate/client/transaction-pool/api/Cargo.toml
index 6671492a4e926b8bed92384bc50cbadde3ed8573..d3ea499beec850c1356fd8a940438b5e58b99177 100644
--- a/substrate/client/transaction-pool/api/Cargo.toml
+++ b/substrate/client/transaction-pool/api/Cargo.toml
@@ -15,6 +15,7 @@ workspace = true
 async-trait = { workspace = true }
 codec = { workspace = true, default-features = true }
 futures = { workspace = true }
+indexmap = { workspace = true }
 log = { workspace = true, default-features = true }
 serde = { features = ["derive"], workspace = true, default-features = true }
 sp-blockchain = { workspace = true, default-features = true }
diff --git a/substrate/client/transaction-pool/api/src/lib.rs b/substrate/client/transaction-pool/api/src/lib.rs
index 6f771e9479bd4ee756bfd88469467acebd1ab535..2bbcc6035f46044069777135c29d245c988273fb 100644
--- a/substrate/client/transaction-pool/api/src/lib.rs
+++ b/substrate/client/transaction-pool/api/src/lib.rs
@@ -33,6 +33,7 @@ const LOG_TARGET: &str = "txpool::api";
 
 pub use sp_runtime::transaction_validity::{
 	TransactionLongevity, TransactionPriority, TransactionSource, TransactionTag,
+	TransactionValidityError,
 };
 
 /// Transaction pool status.
@@ -207,6 +208,9 @@ pub type TransactionStatusStreamFor<P> = TransactionStatusStream<TxHash<P>, Bloc
 pub type LocalTransactionFor<P> = <<P as LocalTransactionPool>::Block as BlockT>::Extrinsic;
 /// Transaction's index within the block in which it was included.
 pub type TxIndex = usize;
+/// Map containing validity errors associated with transaction hashes. Used to report invalid
+/// transactions to the pool.
+pub type TxInvalidityReportMap<H> = indexmap::IndexMap<H, Option<TransactionValidityError>>;
 
 /// In-pool transaction interface.
 ///
@@ -290,8 +294,28 @@ pub trait TransactionPool: Send + Sync {
 	fn ready(&self) -> Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>;
 
 	// *** Block production
-	/// Remove transactions identified by given hashes (and dependent transactions) from the pool.
-	fn remove_invalid(&self, hashes: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>>;
+	/// Reports invalid transactions to the transaction pool.
+	///
+	/// This function takes a map where the key is a transaction hash and the value is an
+	/// optional error encountered during the transaction execution, possibly within a specific
+	/// block.
+	///
+	/// The transaction pool implementation decides which transactions to remove. Transactions
+	/// removed from the pool will be notified with `TransactionStatus::Invalid` event (if
+	/// `submit_and_watch` was used for submission).
+	///
+	/// If the error associated to transaction is `None`, the transaction will be forcibly removed
+	/// from the pool.
+	///
+	/// The optional `at` parameter provides additional context regarding the block where the error
+	/// occurred.
+	///
+	/// Function returns the transactions actually removed from the pool.
+	fn report_invalid(
+		&self,
+		at: Option<<Self::Block as BlockT>::Hash>,
+		invalid_tx_errors: TxInvalidityReportMap<TxHash<Self>>,
+	) -> Vec<Arc<Self::InPoolTransaction>>;
 
 	// *** logging
 	/// Get futures transaction list.
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs
index be20a1608961945c8727e6e6feb24146ba64e9f5..e04c826a1d522aa55e9e8a9e9ca2810ab3da65ba 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs
@@ -62,6 +62,11 @@ impl<Hash> DroppedTransaction<Hash> {
 	pub fn new_enforced_by_limts(tx_hash: Hash) -> Self {
 		Self { reason: DroppedReason::LimitsEnforced, tx_hash }
 	}
+
+	/// Creates a new instance with reason set to `DroppedReason::Invalid`.
+	pub fn new_invalid(tx_hash: Hash) -> Self {
+		Self { reason: DroppedReason::Invalid, tx_hash }
+	}
 }
 
 /// Provides reason of why transactions was dropped.
@@ -71,6 +76,8 @@ pub enum DroppedReason<Hash> {
 	Usurped(Hash),
 	/// Transaction was dropped because of internal pool limits being enforced.
 	LimitsEnforced,
+	/// Transaction was dropped because of being invalid.
+	Invalid,
 }
 
 /// Dropped-logic related event from the single view.
@@ -255,7 +262,12 @@ where
 		let (tx_hash, status) = event;
 		match status {
 			TransactionStatus::Future => {
-				self.future_transaction_views.entry(tx_hash).or_default().insert(block_hash);
+				// see note below:
+				if let Some(mut views_keeping_tx_valid) = self.transaction_views(tx_hash) {
+					views_keeping_tx_valid.get_mut().insert(block_hash);
+				} else {
+					self.future_transaction_views.entry(tx_hash).or_default().insert(block_hash);
+				}
 			},
 			TransactionStatus::Ready | TransactionStatus::InBlock(..) => {
 				// note: if future transaction was once seen as the ready we may want to treat it
@@ -279,12 +291,23 @@ where
 						return Some(DroppedTransaction::new_enforced_by_limts(tx_hash))
 					}
 				} else {
-					debug!(target: LOG_TARGET, ?tx_hash, "dropped_watcher: removing (non-tracked) tx");
+					debug!(target: LOG_TARGET, ?tx_hash, "dropped_watcher: removing (non-tracked dropped) tx");
 					return Some(DroppedTransaction::new_enforced_by_limts(tx_hash))
 				}
 			},
 			TransactionStatus::Usurped(by) =>
 				return Some(DroppedTransaction::new_usurped(tx_hash, by)),
+			TransactionStatus::Invalid => {
+				if let Some(mut views_keeping_tx_valid) = self.transaction_views(tx_hash) {
+					views_keeping_tx_valid.get_mut().remove(&block_hash);
+					if views_keeping_tx_valid.get().is_empty() {
+						return Some(DroppedTransaction::new_invalid(tx_hash))
+					}
+				} else {
+					debug!(target: LOG_TARGET, ?tx_hash, "dropped_watcher: removing (non-tracked invalid) tx");
+					return Some(DroppedTransaction::new_invalid(tx_hash))
+				}
+			},
 			_ => {},
 		};
 		None
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs
index 6195cf53b6072aec2ab95fa6dc4138fdbe8bbefe..5b43d900848abecacdd599806eb9d9cb24bf042f 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs
@@ -54,7 +54,7 @@ use prometheus_endpoint::Registry as PrometheusRegistry;
 use sc_transaction_pool_api::{
 	error::Error as TxPoolApiError, ChainEvent, ImportNotificationStream,
 	MaintainedTransactionPool, PoolStatus, TransactionFor, TransactionPool, TransactionPriority,
-	TransactionSource, TransactionStatusStreamFor, TxHash,
+	TransactionSource, TransactionStatusStreamFor, TxHash, TxInvalidityReportMap,
 };
 use sp_blockchain::{HashAndNumber, TreeRoute};
 use sp_core::traits::SpawnEssentialNamed;
@@ -293,14 +293,16 @@ where
 							tx_hash = ?new_tx_hash,
 							"error: dropped_monitor_task: no entry in mempool for new transaction"
 						);
-					}
+					};
+				},
+				DroppedReason::LimitsEnforced | DroppedReason::Invalid => {
+					view_store.remove_transaction_subtree(tx_hash, |_, _| {});
 				},
-				DroppedReason::LimitsEnforced => {},
 			};
 
-			mempool.remove_transaction(&tx_hash);
-			view_store.listener.transaction_dropped(dropped);
+			mempool.remove_transactions(&[tx_hash]);
 			import_notification_sink.clean_notified_items(&[tx_hash]);
+			view_store.listener.transaction_dropped(dropped);
 		}
 	}
 
@@ -763,26 +765,24 @@ where
 		//
 		// Finally, it collects the hashes of updated transactions or submission errors (either
 		// from the mempool or view_store) into a returned vector.
+		const RESULTS_ASSUMPTION : &str =
+			"The number of Ok results in mempool is exactly the same as the size of view_store submission result. qed.";
 		Ok(mempool_results
-				.into_iter()
-				.map(|result| {
-					result
-						.map_err(Into::into)
-						.and_then(|insertion| {
-							submission_results
-								.next()
-								.expect("The number of Ok results in mempool is exactly the same as the size of view_store submission result. qed.")
-								.inspect_err(|_|{
-									mempool.remove_transaction(&insertion.hash);
-								})
+			.into_iter()
+			.map(|result| {
+				result.map_err(Into::into).and_then(|insertion| {
+					submission_results.next().expect(RESULTS_ASSUMPTION).inspect_err(|_| {
+						mempool.remove_transactions(&[insertion.hash]);
 					})
-
 				})
-				.map(|r| r.map(|r| {
+			})
+			.map(|r| {
+				r.map(|r| {
 					mempool.update_transaction_priority(&r);
 					r.hash()
-				}))
-				.collect::<Vec<_>>())
+				})
+			})
+			.collect::<Vec<_>>())
 	}
 
 	/// Submits a single transaction and returns a future resolving to the submission results.
@@ -839,7 +839,7 @@ where
 			.submit_and_watch(at, insertion.source, xt)
 			.await
 			.inspect_err(|_| {
-				self.mempool.remove_transaction(&insertion.hash);
+				self.mempool.remove_transactions(&[insertion.hash]);
 			})
 			.map(|mut outcome| {
 				self.mempool.update_transaction_priority(&outcome);
@@ -847,18 +847,34 @@ where
 			})
 	}
 
-	/// Intended to remove transactions identified by the given hashes, and any dependent
-	/// transactions, from the pool. In current implementation this function only outputs the error.
-	/// Seems that API change is needed here to make this call reasonable.
-	// todo [#5491]: api change? we need block hash here (assuming we need it at all - could be
-	// useful for verification for debugging purposes).
-	fn remove_invalid(&self, hashes: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>> {
-		if !hashes.is_empty() {
-			log_xt_trace!(target:LOG_TARGET, hashes, "fatp::remove_invalid");
-			self.metrics
-				.report(|metrics| metrics.removed_invalid_txs.inc_by(hashes.len() as _));
-		}
-		Default::default()
+	/// Reports invalid transactions to the transaction pool.
+	///
+	/// This function takes an array of tuples, each consisting of a transaction hash and the
+	/// corresponding error that occurred during transaction execution at given block.
+	///
+	/// The transaction pool implementation will determine which transactions should be
+	/// removed from the pool. Transactions that depend on invalid transactions will also
+	/// be removed.
+	fn report_invalid(
+		&self,
+		at: Option<<Self::Block as BlockT>::Hash>,
+		invalid_tx_errors: TxInvalidityReportMap<TxHash<Self>>,
+	) -> Vec<Arc<Self::InPoolTransaction>> {
+		debug!(target: LOG_TARGET, len = ?invalid_tx_errors.len(), "fatp::report_invalid");
+		log_xt_trace!(data: tuple, target:LOG_TARGET, invalid_tx_errors.iter(), "fatp::report_invalid {:?}");
+		self.metrics
+			.report(|metrics| metrics.reported_invalid_txs.inc_by(invalid_tx_errors.len() as _));
+
+		let removed = self.view_store.report_invalid(at, invalid_tx_errors);
+
+		let removed_hashes = removed.iter().map(|tx| tx.hash).collect::<Vec<_>>();
+		self.mempool.remove_transactions(&removed_hashes);
+		self.import_notification_sink.clean_notified_items(&removed_hashes);
+
+		self.metrics
+			.report(|metrics| metrics.removed_invalid_txs.inc_by(removed_hashes.len() as _));
+
+		removed
 	}
 
 	// todo [#5491]: api change?
@@ -987,7 +1003,7 @@ where
 		self.view_store
 			.submit_local(xt)
 			.inspect_err(|_| {
-				self.mempool.remove_transaction(&insertion.hash);
+				self.mempool.remove_transactions(&[insertion.hash]);
 			})
 			.map(|outcome| {
 				self.mempool.update_transaction_priority(&outcome);
@@ -1245,7 +1261,7 @@ where
 			for result in results {
 				if let Err(tx_hash) = result {
 					self.view_store.listener.transactions_invalidated(&[tx_hash]);
-					self.mempool.remove_transaction(&tx_hash);
+					self.mempool.remove_transactions(&[tx_hash]);
 				}
 			}
 		}
@@ -1382,6 +1398,7 @@ where
 			self.revalidation_queue
 				.revalidate_mempool(
 					self.mempool.clone(),
+					self.view_store.clone(),
 					HashAndNumber { hash: finalized_hash, number: finalized_number },
 				)
 				.await;
@@ -1487,7 +1504,12 @@ where
 			self.mempool.try_insert_with_replacement(xt, priority, source, watched)?;
 
 		for worst_hash in &insertion_info.removed {
-			log::trace!(target: LOG_TARGET, "removed: {worst_hash:?} replaced by {tx_hash:?}");
+			trace!(
+				target: LOG_TARGET,
+				tx_hash = ?worst_hash,
+				new_tx_hash = ?tx_hash,
+				"removed: replaced by"
+			);
 			self.view_store
 				.listener
 				.transaction_dropped(DroppedTransaction::new_enforced_by_limts(*worst_hash));
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/metrics.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/metrics.rs
index c04741e1c1d9e2645e4bc5c3c2979835f78d1c6e..8ddc9f588cc5e6cc99fd24f96ea746745a008c3c 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/metrics.rs
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/metrics.rs
@@ -29,6 +29,8 @@ use prometheus_endpoint::{
 	exponential_buckets, histogram_opts, linear_buckets, register, Counter, Gauge, Histogram,
 	PrometheusError, Registry, U64,
 };
+#[cfg(doc)]
+use sc_transaction_pool_api::TransactionPool;
 use sc_transaction_pool_api::TransactionStatus;
 use sc_utils::mpsc;
 use std::{
@@ -55,6 +57,11 @@ pub struct Metrics {
 	/// Total number of unwatched transactions in txpool.
 	pub unwatched_txs: Gauge<U64>,
 	/// Total number of transactions reported as invalid.
+	///
+	/// This only includes transaction reported as invalid by the
+	/// [`TransactionPool::report_invalid`] method.
+	pub reported_invalid_txs: Counter<U64>,
+	/// Total number of transactions removed as invalid.
 	pub removed_invalid_txs: Counter<U64>,
 	/// Total number of transactions from imported blocks that are unknown to the pool.
 	pub unknown_from_block_import_txs: Counter<U64>,
@@ -255,10 +262,17 @@ impl MetricsRegistrant for Metrics {
 				)?,
 				registry,
 			)?,
+			reported_invalid_txs: register(
+				Counter::new(
+					"substrate_sub_txpool_reported_invalid_txs_total",
+					"Total number of transactions reported as invalid by external entities using TxPool API.",
+				)?,
+				registry,
+			)?,
 			removed_invalid_txs: register(
 				Counter::new(
 					"substrate_sub_txpool_removed_invalid_txs_total",
-					"Total number of transactions reported as invalid.",
+					"Total number of transactions removed as invalid.",
 				)?,
 				registry,
 			)?,
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs
index 959df2ffe978471711387be3d671f06aa79d43e0..62c4320e5d3531abe22c531b90fa99dfcb30d585 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs
@@ -132,6 +132,8 @@ where
 				TransactionStatus::Usurped(*by),
 			TransactionStatusUpdate::TransactionDropped(_, DroppedReason::LimitsEnforced) =>
 				TransactionStatus::Dropped,
+			TransactionStatusUpdate::TransactionDropped(_, DroppedReason::Invalid) =>
+				TransactionStatus::Invalid,
 		}
 	}
 }
@@ -311,33 +313,9 @@ where
 		&mut self,
 		request: TransactionStatusUpdate<ChainApi>,
 	) -> Option<TransactionStatus<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>> {
-		match request {
-			TransactionStatusUpdate::TransactionInvalidated(..) =>
-				if self.handle_invalidate_transaction() {
-					log::trace!(target: LOG_TARGET, "[{:?}] mvl sending out: Invalid", self.tx_hash);
-					return Some(TransactionStatus::Invalid)
-				},
-			TransactionStatusUpdate::TransactionFinalized(_, block, index) => {
-				log::trace!(target: LOG_TARGET, "[{:?}] mvl sending out: Finalized", self.tx_hash);
-				self.terminate = true;
-				return Some(TransactionStatus::Finalized((block, index)))
-			},
-			TransactionStatusUpdate::TransactionBroadcasted(_, peers) => {
-				log::trace!(target: LOG_TARGET, "[{:?}] mvl sending out: Broadcasted", self.tx_hash);
-				return Some(TransactionStatus::Broadcast(peers))
-			},
-			TransactionStatusUpdate::TransactionDropped(_, DroppedReason::LimitsEnforced) => {
-				log::trace!(target: LOG_TARGET, "[{:?}] mvl sending out: Dropped", self.tx_hash);
-				self.terminate = true;
-				return Some(TransactionStatus::Dropped)
-			},
-			TransactionStatusUpdate::TransactionDropped(_, DroppedReason::Usurped(by)) => {
-				log::trace!(target: LOG_TARGET, "[{:?}] mvl sending out: Usurped({:?})", self.tx_hash, by);
-				self.terminate = true;
-				return Some(TransactionStatus::Usurped(by))
-			},
-		};
-		None
+		let status = Into::<TransactionStatus<_, _>>::into(&request);
+		status.is_final().then(|| self.terminate = true);
+		return Some(status);
 	}
 
 	/// Handles various transaction status updates from individual views and manages internal states
@@ -401,34 +379,6 @@ where
 		}
 	}
 
-	/// Handles transaction invalidation sent via side channel.
-	///
-	/// Function may set the context termination flag, which will close the stream.
-	///
-	/// Returns true if the event should be sent out, and false if the invalidation request should
-	/// be skipped.
-	fn handle_invalidate_transaction(&mut self) -> bool {
-		let keys = self.known_views.clone();
-		trace!(
-			target: LOG_TARGET,
-			tx_hash = ?self.tx_hash,
-			views = ?self.known_views.iter().collect::<Vec<_>>(),
-			"got invalidate_transaction"
-		);
-		if self.views_keeping_tx_valid.is_disjoint(&keys) {
-			self.terminate = true;
-			true
-		} else {
-			//todo [#5477]
-			// - handle corner case:  this may happen when tx is invalid for mempool, but somehow
-			//   some view still sees it as ready/future. In that case we don't send the invalid
-			//   event, as transaction can still be included. Probably we should set some flag here
-			//   and allow for invalid sent from the view.
-			// - add debug / metrics,
-			false
-		}
-	}
-
 	/// Adds a new aggragted transaction status stream.
 	///
 	/// Inserts a new view's transaction status stream into the stream map. The view is represented
@@ -493,32 +443,31 @@ where
 				Some((view_hash, (tx_hash, status))) =  next_event(&mut aggregated_streams_map) => {
 					events_metrics_collector.report_status(tx_hash, status.clone());
 					if let Entry::Occupied(mut ctrl) = external_watchers_tx_hash_map.write().entry(tx_hash) {
-						log::trace!(
+						trace!(
 							target: LOG_TARGET,
-							"[{:?}] aggregated_stream_map event: view:{} status:{:?}",
-							tx_hash,
-							view_hash,
-							status
+							?tx_hash,
+							?view_hash,
+							?status,
+							"aggregated_stream_map event",
 						);
-						if let Err(e) = ctrl
+						if let Err(error) = ctrl
 							.get_mut()
 							.unbounded_send(ExternalWatcherCommand::ViewTransactionStatus(view_hash, status))
 						{
-							trace!(target: LOG_TARGET, "[{:?}] send status failed: {:?}", tx_hash, e);
+							trace!(target: LOG_TARGET, ?tx_hash, ?error, "send status failed");
 							ctrl.remove();
 						}
 					}
 				},
 				cmd = command_receiver.next() => {
-					log::trace!(target: LOG_TARGET, "cmd {:?}", cmd);
 					match cmd {
 						Some(ControllerCommand::AddViewStream(h,stream)) => {
 							aggregated_streams_map.insert(h,stream);
 							// //todo: aysnc and join all?
 							external_watchers_tx_hash_map.write().retain(|tx_hash, ctrl| {
 								ctrl.unbounded_send(ExternalWatcherCommand::AddView(h))
-									.inspect_err(|e| {
-										trace!(target: LOG_TARGET, "[{:?}] invalidate_transaction: send message failed: {:?}", tx_hash, e);
+									.inspect_err(|error| {
+										trace!(target: LOG_TARGET, ?tx_hash, ?error, "add_view: send message failed");
 									})
 									.is_ok()
 							})
@@ -528,8 +477,8 @@ where
 							//todo: aysnc and join all?
 							external_watchers_tx_hash_map.write().retain(|tx_hash, ctrl| {
 								ctrl.unbounded_send(ExternalWatcherCommand::RemoveView(h))
-									.inspect_err(|e| {
-										trace!(target: LOG_TARGET, "[{:?}] invalidate_transaction: send message failed: {:?}", tx_hash, e);
+									.inspect_err(|error| {
+										trace!(target: LOG_TARGET, ?tx_hash, ?error, "remove_view: send message failed");
 									})
 									.is_ok()
 							})
@@ -539,11 +488,11 @@ where
 							let tx_hash = request.hash();
 							events_metrics_collector.report_status(tx_hash, (&request).into());
 							if let Entry::Occupied(mut ctrl) = external_watchers_tx_hash_map.write().entry(tx_hash) {
-								if let Err(e) = ctrl
+								if let Err(error) = ctrl
 									.get_mut()
 									.unbounded_send(ExternalWatcherCommand::PoolTransactionStatus(request))
 								{
-									trace!(target: LOG_TARGET, "[{:?}] send message failed: {:?}", tx_hash, e);
+									trace!(target: LOG_TARGET, ?tx_hash, ?error, "send message failed");
 									ctrl.remove();
 								}
 							}
@@ -621,7 +570,7 @@ where
 		Some(
 			futures::stream::unfold(external_ctx, |mut ctx| async move {
 				if ctx.terminate {
-					log::trace!(target: LOG_TARGET, "[{:?}] terminate", ctx.tx_hash);
+					trace!(target: LOG_TARGET, tx_hash = ?ctx.tx_hash, "terminate");
 					return None
 				}
 				loop {
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/revalidation_worker.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/revalidation_worker.rs
index 0025d3e9f2d42aad0db1c1747fe9e4cc9d7476ef..2f3d31d0e6fde7484a9fbbabfa3a54aa43ec0448 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/revalidation_worker.rs
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/revalidation_worker.rs
@@ -28,7 +28,7 @@ use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnbound
 use sp_blockchain::HashAndNumber;
 use sp_runtime::traits::Block as BlockT;
 
-use super::tx_mem_pool::TxMemPool;
+use super::{tx_mem_pool::TxMemPool, view_store::ViewStore};
 use futures::prelude::*;
 use tracing::{trace, warn};
 
@@ -45,7 +45,7 @@ where
 	/// Communication channels with maintain thread are also provided.
 	RevalidateView(Arc<View<Api>>, FinishRevalidationWorkerChannels<Api>),
 	/// Request to revalidated the given instance of the [`TxMemPool`] at provided block hash.
-	RevalidateMempool(Arc<TxMemPool<Api, Block>>, HashAndNumber<Block>),
+	RevalidateMempool(Arc<TxMemPool<Api, Block>>, Arc<ViewStore<Api, Block>>, HashAndNumber<Block>),
 }
 
 /// The background revalidation worker.
@@ -81,8 +81,11 @@ where
 			match payload {
 				WorkerPayload::RevalidateView(view, worker_channels) =>
 					view.revalidate(worker_channels).await,
-				WorkerPayload::RevalidateMempool(mempool, finalized_hash_and_number) =>
-					mempool.revalidate(finalized_hash_and_number).await,
+				WorkerPayload::RevalidateMempool(
+					mempool,
+					view_store,
+					finalized_hash_and_number,
+				) => mempool.revalidate(view_store, finalized_hash_and_number).await,
 			};
 		}
 	}
@@ -164,6 +167,7 @@ where
 	pub async fn revalidate_mempool(
 		&self,
 		mempool: Arc<TxMemPool<Api, Block>>,
+		view_store: Arc<ViewStore<Api, Block>>,
 		finalized_hash: HashAndNumber<Block>,
 	) {
 		trace!(
@@ -173,9 +177,11 @@ where
 		);
 
 		if let Some(ref to_worker) = self.background {
-			if let Err(error) =
-				to_worker.unbounded_send(WorkerPayload::RevalidateMempool(mempool, finalized_hash))
-			{
+			if let Err(error) = to_worker.unbounded_send(WorkerPayload::RevalidateMempool(
+				mempool,
+				view_store,
+				finalized_hash,
+			)) {
 				warn!(
 					target: LOG_TARGET,
 					?error,
@@ -183,7 +189,7 @@ where
 				);
 			}
 		} else {
-			mempool.revalidate(finalized_hash).await
+			mempool.revalidate(view_store, finalized_hash).await
 		}
 	}
 }
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs
index d64d80d434308647c9343c01f78ca8e15079da3a..559f11da4cdb2675424ebae43fe4c0ca5b829965 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs
@@ -28,7 +28,7 @@
 
 use std::{
 	cmp::Ordering,
-	collections::HashMap,
+	collections::{HashMap, HashSet},
 	sync::{
 		atomic::{self, AtomicU64},
 		Arc,
@@ -56,8 +56,9 @@ use crate::{
 };
 
 use super::{
-	metrics::MetricsLink as PrometheusMetrics, multi_view_listener::MultiViewListener,
-	view_store::ViewStoreSubmitOutcome,
+	metrics::MetricsLink as PrometheusMetrics,
+	multi_view_listener::MultiViewListener,
+	view_store::{ViewStore, ViewStoreSubmitOutcome},
 };
 
 /// The minimum interval between single transaction revalidations. Given in blocks.
@@ -467,13 +468,13 @@ where
 		self.transactions.clone_map()
 	}
 
-	/// Removes a transaction with given hash from the memory pool.
-	pub(super) fn remove_transaction(
-		&self,
-		tx_hash: &ExtrinsicHash<ChainApi>,
-	) -> Option<Arc<TxInMemPool<ChainApi, Block>>> {
-		debug!(target: LOG_TARGET, ?tx_hash, "mempool::remove_transaction");
-		self.transactions.write().remove(tx_hash)
+	/// Removes transactions with given hashes from the memory pool.
+	pub(super) fn remove_transactions(&self, tx_hashes: &[ExtrinsicHash<ChainApi>]) {
+		log_xt_trace!(target: LOG_TARGET, tx_hashes, "mempool::remove_transaction");
+		let mut transactions = self.transactions.write();
+		for tx_hash in tx_hashes {
+			transactions.remove(tx_hash);
+		}
 	}
 
 	/// Revalidates a batch of transactions against the provided finalized block.
@@ -483,7 +484,7 @@ where
 		trace!(
 			target: LOG_TARGET,
 			?finalized_block,
-			"mempool::revalidate"
+			"mempool::revalidate_inner"
 		);
 		let start = Instant::now();
 
@@ -531,7 +532,7 @@ where
 						target: LOG_TARGET,
 						?tx_hash,
 						?validation_result,
-						"Purging: invalid"
+						"mempool::revalidate_inner invalid"
 					);
 					Some(tx_hash)
 				},
@@ -545,7 +546,7 @@ where
 			count,
 			invalid_hashes = invalid_hashes.len(),
 			?duration,
-			"mempool::revalidate"
+			"mempool::revalidate_inner"
 		);
 
 		invalid_hashes
@@ -570,23 +571,50 @@ where
 
 	/// Revalidates transactions in the memory pool against a given finalized block and removes
 	/// invalid ones.
-	pub(super) async fn revalidate(&self, finalized_block: HashAndNumber<Block>) {
-		trace!(
-			target: LOG_TARGET,
-			?finalized_block,
-			"purge_transactions"
-		);
-		let invalid_hashes = self.revalidate_inner(finalized_block.clone()).await;
+	pub(super) async fn revalidate(
+		&self,
+		view_store: Arc<ViewStore<ChainApi, Block>>,
+		finalized_block: HashAndNumber<Block>,
+	) {
+		let revalidated_invalid_hashes = self.revalidate_inner(finalized_block.clone()).await;
+
+		let mut invalid_hashes_subtrees =
+			revalidated_invalid_hashes.clone().into_iter().collect::<HashSet<_>>();
+		for tx in &revalidated_invalid_hashes {
+			invalid_hashes_subtrees.extend(
+				view_store
+					.remove_transaction_subtree(*tx, |_, _| {})
+					.into_iter()
+					.map(|tx| tx.hash),
+			);
+		}
+
+		{
+			let mut transactions = self.transactions.write();
+			invalid_hashes_subtrees.iter().for_each(|tx_hash| {
+				transactions.remove(&tx_hash);
+			});
+		};
 
 		self.metrics.report(|metrics| {
-			metrics.mempool_revalidation_invalid_txs.inc_by(invalid_hashes.len() as _)
+			metrics
+				.mempool_revalidation_invalid_txs
+				.inc_by(invalid_hashes_subtrees.len() as _)
 		});
 
-		let mut transactions = self.transactions.write();
-		invalid_hashes.iter().for_each(|i| {
-			transactions.remove(i);
-		});
-		self.listener.transactions_invalidated(&invalid_hashes);
+		let revalidated_invalid_hashes_len = revalidated_invalid_hashes.len();
+		let invalid_hashes_subtrees_len = invalid_hashes_subtrees.len();
+
+		self.listener
+			.transactions_invalidated(&invalid_hashes_subtrees.into_iter().collect::<Vec<_>>());
+
+		trace!(
+			target: LOG_TARGET,
+			?finalized_block,
+			revalidated_invalid_hashes_len,
+			invalid_hashes_subtrees_len,
+			"mempool::revalidate"
+		);
 	}
 
 	/// Updates the priority of transaction stored in mempool using provided view_store submission
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs
index 555444956122b721d220b3754fa67d2308381e89..4fa83ccc79bfa4a0348043ff547e10718b7fe30b 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs
@@ -28,10 +28,11 @@ use crate::{
 	common::tracing_log_xt::log_xt_trace,
 	graph::{
 		self, base_pool::TimedTransactionSource, ExtrinsicFor, ExtrinsicHash, IsValidator,
-		ValidatedPoolSubmitOutcome, ValidatedTransaction, ValidatedTransactionFor,
+		TransactionFor, ValidatedPoolSubmitOutcome, ValidatedTransaction, ValidatedTransactionFor,
 	},
 	LOG_TARGET,
 };
+use indexmap::IndexMap;
 use parking_lot::Mutex;
 use sc_transaction_pool_api::{error::Error as TxPoolError, PoolStatus};
 use sp_blockchain::HashAndNumber;
@@ -39,11 +40,11 @@ use sp_runtime::{
 	generic::BlockId, traits::Block as BlockT, transaction_validity::TransactionValidityError,
 	SaturatedConversion,
 };
-use std::{collections::HashMap, sync::Arc, time::Instant};
+use std::{sync::Arc, time::Instant};
 use tracing::{debug, trace};
 
 pub(super) struct RevalidationResult<ChainApi: graph::ChainApi> {
-	revalidated: HashMap<ExtrinsicHash<ChainApi>, ValidatedTransactionFor<ChainApi>>,
+	revalidated: IndexMap<ExtrinsicHash<ChainApi>, ValidatedTransactionFor<ChainApi>>,
 	invalid_hashes: Vec<ExtrinsicHash<ChainApi>>,
 }
 
@@ -274,7 +275,7 @@ where
 		//todo: revalidate future, remove if invalid [#5496]
 
 		let mut invalid_hashes = Vec::new();
-		let mut revalidated = HashMap::new();
+		let mut revalidated = IndexMap::new();
 
 		let mut validation_results = vec![];
 		let mut batch_iter = batch.into_iter();
@@ -317,7 +318,12 @@ where
 			duration = ?revalidation_duration,
 			"view::revalidate"
 		);
-		log_xt_trace!(data:tuple, target:LOG_TARGET, validation_results.iter().map(|x| (x.1, &x.0)), "view::revalidate result: {:?}");
+		log_xt_trace!(
+			data:tuple,
+			target:LOG_TARGET,
+			validation_results.iter().map(|x| (x.1, &x.0)),
+			"view::revalidate result: {:?}"
+		);
 		for (validation_result, tx_hash, tx) in validation_results {
 			match validation_result {
 				Ok(Err(TransactionValidityError::Invalid(_))) => {
@@ -494,12 +500,12 @@ where
 	/// Refer to [`crate::graph::ValidatedPool::remove_subtree`] for more details.
 	pub fn remove_subtree<F>(
 		&self,
-		tx_hash: ExtrinsicHash<ChainApi>,
+		hashes: &[ExtrinsicHash<ChainApi>],
 		listener_action: F,
-	) -> Vec<ExtrinsicHash<ChainApi>>
+	) -> Vec<TransactionFor<ChainApi>>
 	where
 		F: Fn(&mut crate::graph::Listener<ChainApi>, ExtrinsicHash<ChainApi>),
 	{
-		self.pool.validated_pool().remove_subtree(tx_hash, listener_action)
+		self.pool.validated_pool().remove_subtree(hashes, listener_action)
 	}
 }
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs
index e534decf9b1ada3d692a1ed3392475b4983b56ae..96cf9f7106894d43e17e2f089f3a513e6f0c3e9d 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs
@@ -33,9 +33,13 @@ use crate::{
 };
 use itertools::Itertools;
 use parking_lot::RwLock;
-use sc_transaction_pool_api::{error::Error as PoolError, PoolStatus};
+use sc_transaction_pool_api::{error::Error as PoolError, PoolStatus, TxInvalidityReportMap};
 use sp_blockchain::TreeRoute;
-use sp_runtime::{generic::BlockId, traits::Block as BlockT};
+use sp_runtime::{
+	generic::BlockId,
+	traits::Block as BlockT,
+	transaction_validity::{InvalidTransaction, TransactionValidityError},
+};
 use std::{
 	collections::{hash_map::Entry, HashMap, HashSet},
 	sync::Arc,
@@ -257,7 +261,7 @@ where
 					target: LOG_TARGET,
 					?tx_hash,
 					%error,
-					"submit_local: err"
+					"submit_local failed"
 				);
 				Err(error)
 			},
@@ -306,7 +310,7 @@ where
 					target: LOG_TARGET,
 					?tx_hash,
 					%error,
-					"submit_and_watch: err"
+					"submit_and_watch failed"
 				);
 				return Err(error);
 			},
@@ -669,6 +673,72 @@ where
 		);
 	}
 
+	/// Reports invalid transactions to the view store.
+	///
+	/// This function accepts an array of tuples, each containing a transaction hash and an
+	/// optional error encountered during the transaction execution at a specific (also optional)
+	/// block.
+	///
+	/// Removal operation applies to provided transactions. Their descendants can be removed from
+	/// the view, but will not be invalidated or banned.
+	///
+	/// Invalid future and stale transaction will be removed only from given `at` view, and will be
+	/// kept in the view_store. Such transaction will not be reported in returned vector. They
+	/// also will not be banned from re-entering the pool (however can be rejected from re-entring
+	/// the view). No event will be triggered.
+	///
+	/// For other errors, the transaction will be removed from the view_store, and it will be
+	/// included in the returned vector. Additionally, transactions provided as input will be banned
+	/// from re-entering the pool.
+	///
+	/// If the tuple's error is None, the transaction will be forcibly removed from the view_store,
+	/// banned and included into the returned vector.
+	///
+	/// For every transaction removed from the view_store (excluding descendants) an Invalid event
+	/// is triggered.
+	///
+	/// Returns the list of actually removed transactions from the mempool, which were included in
+	/// the provided input list.
+	pub(crate) fn report_invalid(
+		&self,
+		at: Option<Block::Hash>,
+		invalid_tx_errors: TxInvalidityReportMap<ExtrinsicHash<ChainApi>>,
+	) -> Vec<TransactionFor<ChainApi>> {
+		let mut remove_from_view = vec![];
+		let mut remove_from_pool = vec![];
+
+		invalid_tx_errors.into_iter().for_each(|(hash, e)| match e {
+			Some(TransactionValidityError::Invalid(
+				InvalidTransaction::Future | InvalidTransaction::Stale,
+			)) => {
+				remove_from_view.push(hash);
+			},
+			_ => {
+				remove_from_pool.push(hash);
+			},
+		});
+
+		// transaction removed from view, won't be included into the final result, as they may still
+		// be in the pool.
+		at.map(|at| {
+			self.get_view_at(at, true)
+				.map(|(view, _)| view.remove_subtree(&remove_from_view, |_, _| {}))
+		});
+
+		let mut removed = vec![];
+		for tx_hash in &remove_from_pool {
+			let removed_from_pool = self.remove_transaction_subtree(*tx_hash, |_, _| {});
+			removed_from_pool
+				.iter()
+				.find(|tx| tx.hash == *tx_hash)
+				.map(|tx| removed.push(tx.clone()));
+		}
+
+		self.listener.transactions_invalidated(&remove_from_pool);
+
+		removed
+	}
+
 	/// Replaces an existing transaction in the view_store with a new one.
 	///
 	/// Attempts to replace a transaction identified by `replaced` with a new transaction `xt`.
@@ -724,7 +794,7 @@ where
 					));
 				},
 				PreInsertAction::RemoveSubtree(ref removal) => {
-					view.remove_subtree(removal.xt_hash, &*removal.listener_action);
+					view.remove_subtree(&[removal.xt_hash], &*removal.listener_action);
 				},
 			}
 		}
@@ -804,7 +874,7 @@ where
 		&self,
 		xt_hash: ExtrinsicHash<ChainApi>,
 		listener_action: F,
-	) -> Vec<ExtrinsicHash<ChainApi>>
+	) -> Vec<TransactionFor<ChainApi>>
 	where
 		F: Fn(&mut crate::graph::Listener<ChainApi>, ExtrinsicHash<ChainApi>)
 			+ Clone
@@ -827,8 +897,8 @@ where
 			.iter()
 			.chain(self.inactive_views.read().iter())
 			.filter(|(_, view)| view.is_imported(&xt_hash))
-			.flat_map(|(_, view)| view.remove_subtree(xt_hash, &listener_action))
-			.filter(|xt_hash| seen.insert(*xt_hash))
+			.flat_map(|(_, view)| view.remove_subtree(&[xt_hash], &listener_action))
+			.filter_map(|xt| seen.insert(xt.hash).then(|| xt.clone()))
 			.collect();
 
 		if let Some(removal_action) = self.pending_txs_tasks.write().get_mut(&xt_hash) {
diff --git a/substrate/client/transaction-pool/src/graph/listener.rs b/substrate/client/transaction-pool/src/graph/listener.rs
index 0e70334ea0e2485967e35e533dc7281a3cef9421..340b6d429ae7e969f81b59ea2bf5ed58ac313758 100644
--- a/substrate/client/transaction-pool/src/graph/listener.rs
+++ b/substrate/client/transaction-pool/src/graph/listener.rs
@@ -43,7 +43,8 @@ pub struct Listener<H: hash::Hash + Eq, C: ChainApi> {
 	watchers: HashMap<H, watcher::Sender<H, BlockHash<C>>>,
 	finality_watchers: LinkedHashMap<ExtrinsicHash<C>, Vec<H>>,
 
-	/// The sink used to notify dropped by enforcing limits or by being usurped transactions.
+	/// The sink used to notify dropped by enforcing limits or by being usurped, or invalid
+	/// transactions.
 	///
 	/// Note: Ready and future statuses are alse communicated through this channel, enabling the
 	/// stream consumer to track views that reference the transaction.
@@ -195,6 +196,8 @@ impl<H: hash::Hash + traits::Member + Serialize + Clone, C: ChainApi> Listener<H
 	pub fn invalid(&mut self, tx: &H) {
 		trace!(target: LOG_TARGET, "[{:?}] Extrinsic invalid", tx);
 		self.fire(tx, |watcher| watcher.invalid());
+
+		self.send_to_dropped_stream_sink(tx, TransactionStatus::Invalid);
 	}
 
 	/// Transaction was pruned from the pool.
diff --git a/substrate/client/transaction-pool/src/graph/pool.rs b/substrate/client/transaction-pool/src/graph/pool.rs
index 52b12e3fabae6c0ed883427d36111401fe9c9b13..d938e9bf06e7d84b75bb9173c9304a2e47ded5ce 100644
--- a/substrate/client/transaction-pool/src/graph/pool.rs
+++ b/substrate/client/transaction-pool/src/graph/pool.rs
@@ -29,7 +29,6 @@ use sp_runtime::{
 	},
 };
 use std::{
-	collections::HashMap,
 	sync::Arc,
 	time::{Duration, Instant},
 };
@@ -248,7 +247,7 @@ impl<B: ChainApi> Pool<B> {
 	/// Resubmit some transaction that were validated elsewhere.
 	pub fn resubmit(
 		&self,
-		revalidated_transactions: HashMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>,
+		revalidated_transactions: IndexMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>,
 	) {
 		let now = Instant::now();
 		self.validated_pool.resubmit(revalidated_transactions);
diff --git a/substrate/client/transaction-pool/src/graph/validated_pool.rs b/substrate/client/transaction-pool/src/graph/validated_pool.rs
index 9631a27ead93416b4fbc52aa35d2f0b2c036bfad..174b69da7611b46a4981ea568dcbefecfed34b87 100644
--- a/substrate/client/transaction-pool/src/graph/validated_pool.rs
+++ b/substrate/client/transaction-pool/src/graph/validated_pool.rs
@@ -23,6 +23,7 @@ use std::{
 
 use crate::{common::tracing_log_xt::log_xt_trace, LOG_TARGET};
 use futures::channel::mpsc::{channel, Sender};
+use indexmap::IndexMap;
 use parking_lot::{Mutex, RwLock};
 use sc_transaction_pool_api::{error, PoolStatus, ReadyTransactions, TransactionPriority};
 use sp_blockchain::HashAndNumber;
@@ -411,7 +412,7 @@ impl<B: ChainApi> ValidatedPool<B> {
 	/// Transactions that are missing from the pool are not submitted.
 	pub fn resubmit(
 		&self,
-		mut updated_transactions: HashMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>,
+		mut updated_transactions: IndexMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>,
 	) {
 		#[derive(Debug, Clone, Copy, PartialEq)]
 		enum Status {
@@ -446,7 +447,7 @@ impl<B: ChainApi> ValidatedPool<B> {
 				let removed = pool.remove_subtree(&[hash]);
 				for removed_tx in removed {
 					let removed_hash = removed_tx.hash;
-					let updated_transaction = updated_transactions.remove(&removed_hash);
+					let updated_transaction = updated_transactions.shift_remove(&removed_hash);
 					let tx_to_resubmit = if let Some(updated_tx) = updated_transaction {
 						updated_tx
 					} else {
@@ -463,7 +464,7 @@ impl<B: ChainApi> ValidatedPool<B> {
 					txs_to_resubmit.push((removed_hash, tx_to_resubmit));
 				}
 				// make sure to remove the hash even if it's not present in the pool anymore.
-				updated_transactions.remove(&hash);
+				updated_transactions.shift_remove(&hash);
 			}
 
 			// if we're rejecting future transactions, then insertion order matters here:
@@ -692,27 +693,24 @@ impl<B: ChainApi> ValidatedPool<B> {
 	/// to prevent them from entering the pool right away.
 	/// Note this is not the case for the dependent transactions - those may
 	/// still be valid so we want to be able to re-import them.
+	///
+	/// For every removed transaction an Invalid event is triggered.
+	///
+	/// Returns the list of actually removed transactions, which may include transactions dependent
+	/// on provided set.
 	pub fn remove_invalid(&self, hashes: &[ExtrinsicHash<B>]) -> Vec<TransactionFor<B>> {
 		// early exit in case there is no invalid transactions.
 		if hashes.is_empty() {
 			return vec![]
 		}
 
-		log::trace!(target: LOG_TARGET, "Removing invalid transactions: {:?}", hashes.len());
-
-		// temporarily ban invalid transactions
-		self.rotator.ban(&Instant::now(), hashes.iter().cloned());
-
-		let invalid = self.pool.write().remove_subtree(hashes);
+		let invalid = self.remove_subtree(hashes, |listener, removed_tx_hash| {
+			listener.invalid(&removed_tx_hash);
+		});
 
-		log::trace!(target: LOG_TARGET, "Removed invalid transactions: {:?}", invalid.len());
+		log::trace!(target: LOG_TARGET, "Removed invalid transactions: {:?}/{:?}", hashes.len(), invalid.len());
 		log_xt_trace!(target: LOG_TARGET, invalid.iter().map(|t| t.hash), "Removed invalid transaction");
 
-		let mut listener = self.listener.write();
-		for tx in &invalid {
-			listener.invalid(&tx.hash);
-		}
-
 		invalid
 	}
 
@@ -781,6 +779,9 @@ impl<B: ChainApi> ValidatedPool<B> {
 	/// This function traverses the dependency graph of transactions and removes the specified
 	/// transaction along with all its descendant transactions from the pool.
 	///
+	/// The root transaction will be banned from re-entrering the pool. Descendant transactions may
+	/// be re-submitted to the pool if required.
+	///
 	/// A `listener_action` callback function is invoked for every transaction that is removed,
 	/// providing a reference to the pool's listener and the hash of the removed transaction. This
 	/// allows to trigger the required events.
@@ -789,21 +790,23 @@ impl<B: ChainApi> ValidatedPool<B> {
 	/// transaction specified by `tx_hash`.
 	pub fn remove_subtree<F>(
 		&self,
-		tx_hash: ExtrinsicHash<B>,
+		hashes: &[ExtrinsicHash<B>],
 		listener_action: F,
-	) -> Vec<ExtrinsicHash<B>>
+	) -> Vec<TransactionFor<B>>
 	where
 		F: Fn(&mut Listener<B>, ExtrinsicHash<B>),
 	{
-		self.pool
-			.write()
-			.remove_subtree(&[tx_hash])
+		// temporarily ban invalid transactions
+		self.rotator.ban(&Instant::now(), hashes.iter().cloned());
+		let removed = self.pool.write().remove_subtree(hashes);
+
+		removed
 			.into_iter()
 			.map(|tx| {
 				let removed_tx_hash = tx.hash;
 				let mut listener = self.listener.write();
 				listener_action(&mut *listener, removed_tx_hash);
-				removed_tx_hash
+				tx.clone()
 			})
 			.collect::<Vec<_>>()
 	}
diff --git a/substrate/client/transaction-pool/src/single_state_txpool/revalidation.rs b/substrate/client/transaction-pool/src/single_state_txpool/revalidation.rs
index 2a691ae35eaf78549ca145d41d933a37edd64d1d..ffcade085916041d4a936a53b0f6ee0298faed75 100644
--- a/substrate/client/transaction-pool/src/single_state_txpool/revalidation.rs
+++ b/substrate/client/transaction-pool/src/single_state_txpool/revalidation.rs
@@ -18,17 +18,17 @@
 
 //! Pool periodic revalidation.
 
-use std::{
-	collections::{BTreeMap, HashMap, HashSet},
-	pin::Pin,
-	sync::Arc,
-};
-
 use crate::graph::{BlockHash, ChainApi, ExtrinsicHash, Pool, ValidatedTransaction};
+use indexmap::IndexMap;
 use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
 use sp_runtime::{
 	generic::BlockId, traits::SaturatedConversion, transaction_validity::TransactionValidityError,
 };
+use std::{
+	collections::{BTreeMap, HashMap, HashSet},
+	pin::Pin,
+	sync::Arc,
+};
 
 use futures::prelude::*;
 use std::time::Duration;
@@ -84,7 +84,7 @@ async fn batch_revalidate<Api: ChainApi>(
 	};
 
 	let mut invalid_hashes = Vec::new();
-	let mut revalidated = HashMap::new();
+	let mut revalidated = IndexMap::new();
 
 	let validation_results = futures::future::join_all(batch.into_iter().filter_map(|ext_hash| {
 		pool.validated_pool().ready_by_hash(&ext_hash).map(|ext| {
diff --git a/substrate/client/transaction-pool/src/single_state_txpool/single_state_txpool.rs b/substrate/client/transaction-pool/src/single_state_txpool/single_state_txpool.rs
index 3598f9dbc2af15f3b56cb8ccd8fce91777716abc..9f4d63f3ba3a9ab4354b14f25f1eb9d7d901127a 100644
--- a/substrate/client/transaction-pool/src/single_state_txpool/single_state_txpool.rs
+++ b/substrate/client/transaction-pool/src/single_state_txpool/single_state_txpool.rs
@@ -39,13 +39,16 @@ use prometheus_endpoint::Registry as PrometheusRegistry;
 use sc_transaction_pool_api::{
 	error::Error as TxPoolError, ChainEvent, ImportNotificationStream, MaintainedTransactionPool,
 	PoolStatus, TransactionFor, TransactionPool, TransactionSource, TransactionStatusStreamFor,
-	TxHash,
+	TxHash, TxInvalidityReportMap,
 };
 use sp_blockchain::{HashAndNumber, TreeRoute};
 use sp_core::traits::SpawnEssentialNamed;
 use sp_runtime::{
 	generic::BlockId,
-	traits::{AtLeast32Bit, Block as BlockT, Header as HeaderT, NumberFor, Zero},
+	traits::{
+		AtLeast32Bit, Block as BlockT, Header as HeaderT, NumberFor, SaturatedConversion, Zero,
+	},
+	transaction_validity::TransactionValidityError,
 };
 use std::{
 	collections::{HashMap, HashSet},
@@ -323,8 +326,13 @@ where
 		.map(|mut outcome| outcome.expect_watcher().into_stream().boxed())
 	}
 
-	fn remove_invalid(&self, hashes: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>> {
-		let removed = self.pool.validated_pool().remove_invalid(hashes);
+	fn report_invalid(
+		&self,
+		_at: Option<<Self::Block as BlockT>::Hash>,
+		invalid_tx_errors: TxInvalidityReportMap<TxHash<Self>>,
+	) -> Vec<Arc<Self::InPoolTransaction>> {
+		let hashes = invalid_tx_errors.keys().map(|h| *h).collect::<Vec<_>>();
+		let removed = self.pool.validated_pool().remove_invalid(&hashes);
 		self.metrics
 			.report(|metrics| metrics.validations_invalid.inc_by(removed.len() as u64));
 		removed
@@ -459,10 +467,6 @@ where
 		at: Block::Hash,
 		xt: sc_transaction_pool_api::LocalTransactionFor<Self>,
 	) -> Result<Self::Hash, Self::Error> {
-		use sp_runtime::{
-			traits::SaturatedConversion, transaction_validity::TransactionValidityError,
-		};
-
 		let validity = self
 			.api
 			.validate_transaction_blocking(at, TransactionSource::Local, Arc::from(xt.clone()))?
diff --git a/substrate/client/transaction-pool/src/transaction_pool_wrapper.rs b/substrate/client/transaction-pool/src/transaction_pool_wrapper.rs
index e373c0278d804587642c6735a1d3d3daa8a03fb3..b86fde73fdaacaf70301e0305285c6bd66192c04 100644
--- a/substrate/client/transaction-pool/src/transaction_pool_wrapper.rs
+++ b/substrate/client/transaction-pool/src/transaction_pool_wrapper.rs
@@ -28,7 +28,7 @@ use async_trait::async_trait;
 use sc_transaction_pool_api::{
 	ChainEvent, ImportNotificationStream, LocalTransactionFor, LocalTransactionPool,
 	MaintainedTransactionPool, PoolStatus, ReadyTransactions, TransactionFor, TransactionPool,
-	TransactionSource, TransactionStatusStreamFor, TxHash,
+	TransactionSource, TransactionStatusStreamFor, TxHash, TxInvalidityReportMap,
 };
 use sp_runtime::traits::Block as BlockT;
 use std::{collections::HashMap, pin::Pin, sync::Arc};
@@ -107,8 +107,12 @@ where
 		self.0.ready()
 	}
 
-	fn remove_invalid(&self, hashes: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>> {
-		self.0.remove_invalid(hashes)
+	fn report_invalid(
+		&self,
+		at: Option<<Self::Block as BlockT>::Hash>,
+		invalid_tx_errors: TxInvalidityReportMap<TxHash<Self>>,
+	) -> Vec<Arc<Self::InPoolTransaction>> {
+		self.0.report_invalid(at, invalid_tx_errors)
 	}
 
 	fn futures(&self) -> Vec<Self::InPoolTransaction> {
diff --git a/substrate/client/transaction-pool/tests/fatp.rs b/substrate/client/transaction-pool/tests/fatp.rs
index dd82c52a6047b865c516ddf99197bec638cbfe48..a4a932dd8536186c6ff9f1ff5e0dc207e42b55c4 100644
--- a/substrate/client/transaction-pool/tests/fatp.rs
+++ b/substrate/client/transaction-pool/tests/fatp.rs
@@ -25,8 +25,8 @@ use fatp_common::{
 use futures::{executor::block_on, task::Poll, FutureExt, StreamExt};
 use sc_transaction_pool::ChainApi;
 use sc_transaction_pool_api::{
-	error::{Error as TxPoolError, IntoPoolError},
-	ChainEvent, MaintainedTransactionPool, TransactionPool, TransactionStatus,
+	error::Error as TxPoolError, ChainEvent, MaintainedTransactionPool, TransactionPool,
+	TransactionStatus,
 };
 use sp_runtime::transaction_validity::InvalidTransaction;
 use std::{sync::Arc, time::Duration};
@@ -788,11 +788,12 @@ fn fatp_linear_progress_finalization() {
 	let f00 = forks[0][0].hash();
 	let f12 = forks[1][2].hash();
 	let f14 = forks[1][4].hash();
+	let f15 = forks[1][5].hash();
 
 	let event = new_best_block_event(&pool, None, f00);
 	block_on(pool.maintain(event));
 
-	let xt0 = uxt(Bob, 204);
+	let xt0 = uxt(Bob, 205);
 	let submissions = vec![pool.submit_one(invalid_hash(), SOURCE, xt0.clone())];
 	block_on(futures::future::join_all(submissions));
 
@@ -803,13 +804,13 @@ fn fatp_linear_progress_finalization() {
 
 	log::debug!(target:LOG_TARGET, "stats: {:#?}", pool.status_all());
 
-	let event = ChainEvent::Finalized { hash: f14, tree_route: Arc::from(vec![]) };
-	block_on(pool.maintain(event));
+	block_on(pool.maintain(new_best_block_event(&pool, Some(f12), f15)));
+	block_on(pool.maintain(finalized_block_event(&pool, f00, f14)));
 
 	log::debug!(target:LOG_TARGET, "stats: {:#?}", pool.status_all());
 
 	assert_eq!(pool.active_views_count(), 1);
-	assert_pool_status!(f14, &pool, 1, 0);
+	assert_pool_status!(f15, &pool, 1, 0);
 }
 
 #[test]
@@ -854,112 +855,6 @@ fn fatp_fork_finalization_removes_stale_views() {
 	assert_eq!(pool.active_views_count(), 1);
 }
 
-#[test]
-fn fatp_watcher_invalid_fails_on_submission() {
-	sp_tracing::try_init_simple();
-
-	let (pool, api, _) = pool();
-
-	let header01 = api.push_block(1, vec![], true);
-
-	let event = new_best_block_event(&pool, None, header01.hash());
-	block_on(pool.maintain(event));
-
-	let xt0 = uxt(Alice, 150);
-	api.add_invalid(&xt0);
-	let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone()));
-	let xt0_watcher = xt0_watcher.map(|_| ());
-
-	assert_pool_status!(header01.hash(), &pool, 0, 0);
-	assert!(matches!(
-		xt0_watcher.unwrap_err().into_pool_error(),
-		Ok(TxPoolError::InvalidTransaction(InvalidTransaction::Stale))
-	));
-}
-
-#[test]
-fn fatp_watcher_invalid_single_revalidation() {
-	sp_tracing::try_init_simple();
-
-	let (pool, api, _) = pool();
-
-	let header01 = api.push_block(1, vec![], true);
-	let event = new_best_block_event(&pool, Some(api.genesis_hash()), header01.hash());
-	block_on(pool.maintain(event));
-
-	let xt0 = uxt(Alice, 200);
-	let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap();
-
-	api.add_invalid(&xt0);
-
-	let header02 = api.push_block_with_parent(header01.hash(), vec![], true);
-	let event = finalized_block_event(&pool, header01.hash(), header02.hash());
-	block_on(pool.maintain(event));
-
-	// wait 10 blocks for revalidation
-	let mut prev_header = header02;
-	for n in 3..=11 {
-		let header = api.push_block(n, vec![], true);
-		let event = finalized_block_event(&pool, prev_header.hash(), header.hash());
-		block_on(pool.maintain(event));
-		prev_header = header;
-	}
-
-	let xt0_events = futures::executor::block_on_stream(xt0_watcher).collect::<Vec<_>>();
-	log::debug!("xt0_events: {:#?}", xt0_events);
-	assert_eq!(xt0_events, vec![TransactionStatus::Ready, TransactionStatus::Invalid]);
-}
-
-#[test]
-fn fatp_watcher_invalid_single_revalidation2() {
-	sp_tracing::try_init_simple();
-
-	let (pool, api, _) = pool();
-
-	let xt0 = uxt(Alice, 200);
-	let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap();
-	assert_eq!(pool.mempool_len(), (0, 1));
-	api.add_invalid(&xt0);
-
-	let header01 = api.push_block(1, vec![], true);
-	let event = new_best_block_event(&pool, None, header01.hash());
-	block_on(pool.maintain(event));
-
-	let xt0_events = futures::executor::block_on_stream(xt0_watcher).collect::<Vec<_>>();
-	log::debug!("xt0_events: {:#?}", xt0_events);
-	assert_eq!(xt0_events, vec![TransactionStatus::Invalid]);
-	assert_eq!(pool.mempool_len(), (0, 0));
-}
-
-#[test]
-fn fatp_watcher_invalid_single_revalidation3() {
-	sp_tracing::try_init_simple();
-
-	let (pool, api, _) = pool();
-
-	let xt0 = uxt(Alice, 150);
-	let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap();
-	assert_eq!(pool.mempool_len(), (0, 1));
-
-	let header01 = api.push_block(1, vec![], true);
-	let event = finalized_block_event(&pool, api.genesis_hash(), header01.hash());
-	block_on(pool.maintain(event));
-
-	// wait 10 blocks for revalidation
-	let mut prev_header = header01;
-	for n in 2..=11 {
-		let header = api.push_block(n, vec![], true);
-		let event = finalized_block_event(&pool, prev_header.hash(), header.hash());
-		block_on(pool.maintain(event));
-		prev_header = header;
-	}
-
-	let xt0_events = futures::executor::block_on_stream(xt0_watcher).collect::<Vec<_>>();
-	log::debug!("xt0_events: {:#?}", xt0_events);
-	assert_eq!(xt0_events, vec![TransactionStatus::Invalid]);
-	assert_eq!(pool.mempool_len(), (0, 0));
-}
-
 #[test]
 fn fatp_watcher_future() {
 	sp_tracing::try_init_simple();
@@ -976,14 +871,12 @@ fn fatp_watcher_future() {
 
 	assert_pool_status!(header01.hash(), &pool, 0, 1);
 
-	let header02 = api.push_block(2, vec![], true);
-	let event = ChainEvent::Finalized {
-		hash: header02.hash(),
-		tree_route: Arc::from(vec![header01.hash()]),
-	};
-	block_on(pool.maintain(event));
+	let header02 = api.push_block_with_parent(header01.hash(), vec![], true);
+	let header03 = api.push_block_with_parent(header02.hash(), vec![], true);
+	block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header03.hash())));
+	block_on(pool.maintain(finalized_block_event(&pool, header01.hash(), header02.hash())));
 
-	assert_pool_status!(header02.hash(), &pool, 0, 1);
+	assert_pool_status!(header03.hash(), &pool, 0, 1);
 
 	let xt0_events = block_on(xt0_watcher.take(1).collect::<Vec<_>>());
 	assert_eq!(xt0_events, vec![TransactionStatus::Future]);
@@ -1107,15 +1000,12 @@ fn fatp_watcher_future_and_finalized() {
 
 	assert_pool_status!(header01.hash(), &pool, 1, 1);
 
-	let header02 = api.push_block(2, vec![xt0], true);
-	let event = ChainEvent::Finalized {
-		hash: header02.hash(),
-		tree_route: Arc::from(vec![header01.hash()]),
-	};
-	// let event = new_best_block_event(&pool, Some(header01.hash()), header02.hash());
-	block_on(pool.maintain(event));
+	let header02 = api.push_block_with_parent(header01.hash(), vec![xt0], true);
+	let header03 = api.push_block_with_parent(header02.hash(), vec![], true);
+	block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header03.hash())));
+	block_on(pool.maintain(finalized_block_event(&pool, header01.hash(), header02.hash())));
 
-	assert_pool_status!(header02.hash(), &pool, 0, 1);
+	assert_pool_status!(header03.hash(), &pool, 0, 1);
 
 	let xt1_status = block_on(xt1_watcher.take(1).collect::<Vec<_>>());
 	assert_eq!(xt1_status, vec![TransactionStatus::Future]);
@@ -1835,180 +1725,6 @@ fn fatp_watcher_best_block_after_finalization_does_not_retract() {
 	);
 }
 
-#[test]
-fn fatp_watcher_invalid_many_revalidation() {
-	sp_tracing::try_init_simple();
-
-	let (pool, api, _) = pool();
-
-	let header01 = api.push_block(1, vec![], true);
-	block_on(pool.maintain(new_best_block_event(&pool, None, header01.hash())));
-
-	let xt0 = uxt(Alice, 200);
-	let xt1 = uxt(Alice, 201);
-	let xt2 = uxt(Alice, 202);
-	let xt3 = uxt(Alice, 203);
-	let xt4 = uxt(Alice, 204);
-
-	let submissions = vec![
-		pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone()),
-		pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone()),
-		pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone()),
-		pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone()),
-		pool.submit_and_watch(invalid_hash(), SOURCE, xt4.clone()),
-	];
-
-	let submissions = block_on(futures::future::join_all(submissions));
-	assert_eq!(pool.status_all()[&header01.hash()].ready, 5);
-
-	let mut watchers = submissions.into_iter().map(Result::unwrap).collect::<Vec<_>>();
-	let xt4_watcher = watchers.remove(4);
-	let xt3_watcher = watchers.remove(3);
-	let xt2_watcher = watchers.remove(2);
-	let xt1_watcher = watchers.remove(1);
-	let xt0_watcher = watchers.remove(0);
-
-	api.add_invalid(&xt3);
-	api.add_invalid(&xt4);
-
-	let header02 = api.push_block(2, vec![], true);
-	block_on(pool.maintain(finalized_block_event(&pool, header01.hash(), header02.hash())));
-
-	//todo: shall revalidation check finalized (fork's tip) view?
-	assert_eq!(pool.status_all()[&header02.hash()].ready, 5);
-
-	let header03 = api.push_block(3, vec![xt0.clone(), xt1.clone(), xt2.clone()], true);
-	block_on(pool.maintain(finalized_block_event(&pool, header02.hash(), header03.hash())));
-
-	// wait 10 blocks for revalidation
-	let mut prev_header = header03.clone();
-	for n in 4..=11 {
-		let header = api.push_block(n, vec![], true);
-		let event = finalized_block_event(&pool, prev_header.hash(), header.hash());
-		block_on(pool.maintain(event));
-		prev_header = header;
-	}
-
-	let xt0_events = futures::executor::block_on_stream(xt0_watcher).collect::<Vec<_>>();
-	let xt1_events = futures::executor::block_on_stream(xt1_watcher).collect::<Vec<_>>();
-	let xt2_events = futures::executor::block_on_stream(xt2_watcher).collect::<Vec<_>>();
-	let xt3_events = futures::executor::block_on_stream(xt3_watcher).collect::<Vec<_>>();
-	let xt4_events = futures::executor::block_on_stream(xt4_watcher).collect::<Vec<_>>();
-
-	log::debug!("xt0_events: {:#?}", xt0_events);
-	log::debug!("xt1_events: {:#?}", xt1_events);
-	log::debug!("xt2_events: {:#?}", xt2_events);
-	log::debug!("xt3_events: {:#?}", xt3_events);
-	log::debug!("xt4_events: {:#?}", xt4_events);
-
-	assert_eq!(
-		xt0_events,
-		vec![
-			TransactionStatus::Ready,
-			TransactionStatus::InBlock((header03.hash(), 0)),
-			TransactionStatus::Finalized((header03.hash(), 0))
-		],
-	);
-	assert_eq!(
-		xt1_events,
-		vec![
-			TransactionStatus::Ready,
-			TransactionStatus::InBlock((header03.hash(), 1)),
-			TransactionStatus::Finalized((header03.hash(), 1))
-		],
-	);
-	assert_eq!(
-		xt2_events,
-		vec![
-			TransactionStatus::Ready,
-			TransactionStatus::InBlock((header03.hash(), 2)),
-			TransactionStatus::Finalized((header03.hash(), 2))
-		],
-	);
-	assert_eq!(xt3_events, vec![TransactionStatus::Ready, TransactionStatus::Invalid],);
-	assert_eq!(xt4_events, vec![TransactionStatus::Ready, TransactionStatus::Invalid],);
-}
-
-#[test]
-fn should_not_retain_invalid_hashes_from_retracted() {
-	sp_tracing::try_init_simple();
-
-	let (pool, api, _) = pool();
-	let xt = uxt(Alice, 200);
-
-	let header01 = api.push_block(1, vec![], true);
-	block_on(pool.maintain(new_best_block_event(&pool, None, header01.hash())));
-	let watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt.clone())).unwrap();
-
-	let header02a = api.push_block_with_parent(header01.hash(), vec![xt.clone()], true);
-
-	block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header02a.hash())));
-	assert_eq!(pool.status_all()[&header02a.hash()].ready, 0);
-
-	api.add_invalid(&xt);
-	let header02b = api.push_block_with_parent(header01.hash(), vec![], true);
-	block_on(pool.maintain(finalized_block_event(&pool, api.genesis_hash(), header02b.hash())));
-
-	// wait 10 blocks for revalidation
-	let mut prev_header = header02b.clone();
-	for _ in 3..=11 {
-		let header = api.push_block_with_parent(prev_header.hash(), vec![], true);
-		let event = finalized_block_event(&pool, prev_header.hash(), header.hash());
-		block_on(pool.maintain(event));
-		prev_header = header;
-	}
-
-	assert_eq!(
-		futures::executor::block_on_stream(watcher).collect::<Vec<_>>(),
-		vec![
-			TransactionStatus::Ready,
-			TransactionStatus::InBlock((header02a.hash(), 0)),
-			TransactionStatus::Invalid
-		],
-	);
-
-	//todo: shall revalidation check finalized (fork's tip) view?
-	assert_eq!(pool.status_all()[&prev_header.hash()].ready, 0);
-}
-
-#[test]
-fn should_revalidate_during_maintenance() {
-	sp_tracing::try_init_simple();
-
-	let (pool, api, _) = pool();
-	let xt1 = uxt(Alice, 200);
-	let xt2 = uxt(Alice, 201);
-
-	let header01 = api.push_block(1, vec![], true);
-	block_on(pool.maintain(new_best_block_event(&pool, None, header01.hash())));
-
-	block_on(pool.submit_one(invalid_hash(), SOURCE, xt1.clone())).unwrap();
-	let watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap();
-	assert_eq!(pool.status_all()[&header01.hash()].ready, 2);
-	assert_eq!(api.validation_requests().len(), 2);
-
-	let header02 = api.push_block(2, vec![xt1.clone()], true);
-	api.add_invalid(&xt2);
-	block_on(pool.maintain(finalized_block_event(&pool, api.genesis_hash(), header02.hash())));
-
-	//todo: shall revalidation check finalized (fork's tip) view?
-	assert_eq!(pool.status_all()[&header02.hash()].ready, 1);
-
-	// wait 10 blocks for revalidation
-	let mut prev_header = header02.clone();
-	for _ in 3..=11 {
-		let header = api.push_block_with_parent(prev_header.hash(), vec![], true);
-		let event = finalized_block_event(&pool, prev_header.hash(), header.hash());
-		block_on(pool.maintain(event));
-		prev_header = header;
-	}
-
-	assert_eq!(
-		futures::executor::block_on_stream(watcher).collect::<Vec<_>>(),
-		vec![TransactionStatus::Ready, TransactionStatus::Invalid],
-	);
-}
-
 #[test]
 fn fatp_transactions_purging_stale_on_finalization_works() {
 	sp_tracing::try_init_simple();
@@ -2057,53 +1773,6 @@ fn fatp_transactions_purging_stale_on_finalization_works() {
 	);
 }
 
-#[test]
-fn fatp_transactions_purging_invalid_on_finalization_works() {
-	sp_tracing::try_init_simple();
-
-	let (pool, api, _) = pool();
-
-	let xt1 = uxt(Alice, 200);
-	let xt2 = uxt(Alice, 201);
-	let xt3 = uxt(Alice, 202);
-
-	let header01 = api.push_block(1, vec![], true);
-	block_on(pool.maintain(new_best_block_event(&pool, None, header01.hash())));
-
-	let watcher1 = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap();
-	let watcher2 = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap();
-	block_on(pool.submit_one(invalid_hash(), SOURCE, xt3.clone())).unwrap();
-
-	assert_eq!(api.validation_requests().len(), 3);
-	assert_eq!(pool.status_all()[&header01.hash()].ready, 3);
-	assert_eq!(pool.mempool_len(), (1, 2));
-
-	let header02 = api.push_block(2, vec![], true);
-	api.add_invalid(&xt1);
-	api.add_invalid(&xt2);
-	api.add_invalid(&xt3);
-	block_on(pool.maintain(finalized_block_event(&pool, header01.hash(), header02.hash())));
-
-	// wait 10 blocks for revalidation
-	let mut prev_header = header02;
-	for n in 3..=13 {
-		let header = api.push_block(n, vec![], true);
-		let event = finalized_block_event(&pool, prev_header.hash(), header.hash());
-		block_on(pool.maintain(event));
-		prev_header = header;
-	}
-
-	//todo: should it work at all? (it requires better revalidation: mempool keeping validated txs)
-	//additionally it also requires revalidation of finalized view.
-	// assert_eq!(pool.status_all()[&header02.hash()].ready, 0);
-	assert_eq!(pool.mempool_len(), (0, 0));
-
-	let xt1_events = futures::executor::block_on_stream(watcher1).collect::<Vec<_>>();
-	let xt2_events = futures::executor::block_on_stream(watcher2).collect::<Vec<_>>();
-	assert_eq!(xt1_events, vec![TransactionStatus::Ready, TransactionStatus::Invalid]);
-	assert_eq!(xt2_events, vec![TransactionStatus::Ready, TransactionStatus::Invalid]);
-}
-
 #[test]
 fn import_sink_works() {
 	sp_tracing::try_init_simple();
diff --git a/substrate/client/transaction-pool/tests/fatp_invalid.rs b/substrate/client/transaction-pool/tests/fatp_invalid.rs
new file mode 100644
index 0000000000000000000000000000000000000000..0076d2a7cbbfbad5f5c5dce15b7d45d4aa49a5ab
--- /dev/null
+++ b/substrate/client/transaction-pool/tests/fatp_invalid.rs
@@ -0,0 +1,690 @@
+// This file is part of Substrate.
+
+// Copyright (C) Parity Technologies (UK) Ltd.
+// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <https://www.gnu.org/licenses/>.
+
+//! Tests of invalid transactions handling for fork-aware transaction pool.
+
+pub mod fatp_common;
+
+use fatp_common::{
+	finalized_block_event, invalid_hash, new_best_block_event, pool, TestPoolBuilder, LOG_TARGET,
+	SOURCE,
+};
+use futures::{executor::block_on, FutureExt};
+use sc_transaction_pool::ChainApi;
+use sc_transaction_pool_api::{
+	error::{Error as TxPoolError, IntoPoolError},
+	MaintainedTransactionPool, TransactionPool, TransactionStatus,
+};
+use sp_runtime::transaction_validity::{InvalidTransaction, TransactionValidityError};
+use substrate_test_runtime_client::Sr25519Keyring::*;
+use substrate_test_runtime_transaction_pool::uxt;
+
+#[test]
+fn fatp_invalid_three_views_stale_gets_rejected() {
+	sp_tracing::try_init_simple();
+
+	let (pool, api, _) = pool();
+
+	let header01 = api.push_block(1, vec![], true);
+	block_on(pool.maintain(new_best_block_event(&pool, None, header01.hash())));
+
+	let xt0 = uxt(Alice, 200);
+	let xt1 = uxt(Alice, 200);
+
+	let header02a = api.push_block_with_parent(header01.hash(), vec![], true);
+	let header02b = api.push_block_with_parent(header01.hash(), vec![], true);
+	let header02c = api.push_block_with_parent(header01.hash(), vec![], true);
+	api.set_nonce(header02a.hash(), Alice.into(), 201);
+	api.set_nonce(header02b.hash(), Alice.into(), 201);
+	api.set_nonce(header02c.hash(), Alice.into(), 201);
+
+	block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header02a.hash())));
+	block_on(pool.maintain(new_best_block_event(&pool, Some(header02a.hash()), header02b.hash())));
+	block_on(pool.maintain(new_best_block_event(&pool, Some(header02b.hash()), header02c.hash())));
+
+	let result0 = block_on(pool.submit_one(invalid_hash(), SOURCE, xt0.clone()));
+	let result1 = block_on(pool.submit_one(invalid_hash(), SOURCE, xt1.clone()));
+
+	assert!(matches!(
+		result0.as_ref().unwrap_err().0,
+		TxPoolError::InvalidTransaction(InvalidTransaction::Stale)
+	));
+	assert!(matches!(
+		result1.as_ref().unwrap_err().0,
+		TxPoolError::InvalidTransaction(InvalidTransaction::Stale)
+	));
+}
+
+#[test]
+fn fatp_invalid_three_views_invalid_gets_rejected() {
+	sp_tracing::try_init_simple();
+
+	let (pool, api, _) = pool();
+
+	let header01 = api.push_block(1, vec![], true);
+	block_on(pool.maintain(new_best_block_event(&pool, None, header01.hash())));
+
+	let xt0 = uxt(Alice, 200);
+	let xt1 = uxt(Alice, 200);
+	let header02a = api.push_block_with_parent(header01.hash(), vec![], true);
+	let header02b = api.push_block_with_parent(header01.hash(), vec![], true);
+	let header02c = api.push_block_with_parent(header01.hash(), vec![], true);
+
+	block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header02a.hash())));
+	block_on(pool.maintain(new_best_block_event(&pool, Some(header02a.hash()), header02b.hash())));
+	block_on(pool.maintain(new_best_block_event(&pool, Some(header02b.hash()), header02c.hash())));
+
+	api.add_invalid(&xt0);
+	api.add_invalid(&xt1);
+
+	let result0 = block_on(pool.submit_one(invalid_hash(), SOURCE, xt0.clone()));
+	let result1 = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).map(|_| ());
+
+	assert!(matches!(
+		result0.as_ref().unwrap_err().0,
+		TxPoolError::InvalidTransaction(InvalidTransaction::Custom(_))
+	));
+	assert!(matches!(
+		result1.as_ref().unwrap_err().0,
+		TxPoolError::InvalidTransaction(InvalidTransaction::Custom(_))
+	));
+}
+
+#[test]
+fn fatp_transactions_purging_invalid_on_finalization_works() {
+	sp_tracing::try_init_simple();
+
+	let (pool, api, _) = pool();
+
+	let xt1 = uxt(Alice, 200);
+	let xt2 = uxt(Alice, 201);
+	let xt3 = uxt(Alice, 202);
+
+	let header01 = api.push_block(1, vec![], true);
+	block_on(pool.maintain(new_best_block_event(&pool, None, header01.hash())));
+
+	let watcher1 = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap();
+	let watcher2 = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap();
+	let watcher3 = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).unwrap();
+
+	assert_eq!(api.validation_requests().len(), 3);
+	assert_eq!(pool.status_all()[&header01.hash()].ready, 3);
+	assert_eq!(pool.mempool_len(), (0, 3));
+
+	let header02 = api.push_block(2, vec![], true);
+	api.add_invalid(&xt1);
+	api.add_invalid(&xt2);
+	api.add_invalid(&xt3);
+	block_on(pool.maintain(finalized_block_event(&pool, header01.hash(), header02.hash())));
+
+	// wait 10 blocks for revalidation
+	let mut prev_header = header02.clone();
+	for n in 3..=11 {
+		let header = api.push_block(n, vec![], true);
+		let event = finalized_block_event(&pool, prev_header.hash(), header.hash());
+		block_on(pool.maintain(event));
+		prev_header = header;
+	}
+
+	assert_eq!(pool.mempool_len(), (0, 0));
+
+	assert_watcher_stream!(watcher1, [TransactionStatus::Ready, TransactionStatus::Invalid]);
+	assert_watcher_stream!(watcher2, [TransactionStatus::Ready, TransactionStatus::Invalid]);
+	assert_watcher_stream!(watcher3, [TransactionStatus::Ready, TransactionStatus::Invalid]);
+}
+
+#[test]
+fn fatp_transactions_purging_invalid_on_finalization_works2() {
+	sp_tracing::try_init_simple();
+
+	let (pool, api, _) = pool();
+	let xt1 = uxt(Alice, 200);
+	let xt2 = uxt(Alice, 201);
+
+	let header01 = api.push_block(1, vec![], true);
+	block_on(pool.maintain(new_best_block_event(&pool, None, header01.hash())));
+	block_on(pool.submit_one(invalid_hash(), SOURCE, xt1.clone())).unwrap();
+
+	let watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap();
+	assert_eq!(pool.status_all()[&header01.hash()].ready, 2);
+	assert_eq!(api.validation_requests().len(), 2);
+
+	let header02 = api.push_block(2, vec![xt1.clone()], true);
+	api.add_invalid(&xt2);
+	block_on(pool.maintain(finalized_block_event(&pool, api.genesis_hash(), header02.hash())));
+
+	assert_eq!(pool.status_all()[&header02.hash()].ready, 1);
+
+	// wait 10 blocks for revalidation
+	let mut prev_header = header02.clone();
+	for _ in 3..=11 {
+		let header = api.push_block_with_parent(prev_header.hash(), vec![], true);
+		let event = finalized_block_event(&pool, prev_header.hash(), header.hash());
+		block_on(pool.maintain(event));
+		prev_header = header;
+	}
+
+	assert_watcher_stream!(watcher, [TransactionStatus::Ready, TransactionStatus::Invalid]);
+	assert_eq!(pool.status_all()[&prev_header.hash()].ready, 0);
+}
+
+#[test]
+fn should_not_retain_invalid_hashes_from_retracted() {
+	sp_tracing::try_init_simple();
+
+	let (pool, api, _) = pool();
+	let xt = uxt(Alice, 200);
+
+	let header01 = api.push_block(1, vec![], true);
+	block_on(pool.maintain(new_best_block_event(&pool, None, header01.hash())));
+	let watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt.clone())).unwrap();
+
+	let header02a = api.push_block_with_parent(header01.hash(), vec![xt.clone()], true);
+
+	block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header02a.hash())));
+	assert_eq!(pool.status_all()[&header02a.hash()].ready, 0);
+
+	api.add_invalid(&xt);
+	let header02b = api.push_block_with_parent(header01.hash(), vec![], true);
+	block_on(pool.maintain(finalized_block_event(&pool, api.genesis_hash(), header02b.hash())));
+
+	// wait 10 blocks for revalidation
+	let mut prev_header = header02b.clone();
+	for _ in 3..=11 {
+		let header = api.push_block_with_parent(prev_header.hash(), vec![], true);
+		let event = finalized_block_event(&pool, prev_header.hash(), header.hash());
+		block_on(pool.maintain(event));
+		prev_header = header;
+	}
+
+	assert_watcher_stream!(
+		watcher,
+		[
+			TransactionStatus::Ready,
+			TransactionStatus::InBlock((header02a.hash(), 0)),
+			TransactionStatus::Invalid
+		]
+	);
+
+	//todo: shall revalidation check finalized (fork's tip) view?
+	assert_eq!(pool.status_all()[&prev_header.hash()].ready, 0);
+}
+
+#[test]
+fn fatp_watcher_invalid_many_revalidation() {
+	sp_tracing::try_init_simple();
+
+	let (pool, api, _) = pool();
+
+	let header01 = api.push_block(1, vec![], true);
+	block_on(pool.maintain(new_best_block_event(&pool, None, header01.hash())));
+
+	let xt0 = uxt(Alice, 200);
+	let xt1 = uxt(Alice, 201);
+	let xt2 = uxt(Alice, 202);
+	let xt3 = uxt(Alice, 203);
+	let xt4 = uxt(Alice, 204);
+
+	let submissions = vec![
+		pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone()),
+		pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone()),
+		pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone()),
+		pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone()),
+		pool.submit_and_watch(invalid_hash(), SOURCE, xt4.clone()),
+	];
+
+	let submissions = block_on(futures::future::join_all(submissions));
+	assert_eq!(pool.status_all()[&header01.hash()].ready, 5);
+
+	let mut watchers = submissions.into_iter().map(Result::unwrap).collect::<Vec<_>>();
+	let xt4_watcher = watchers.remove(4);
+	let xt3_watcher = watchers.remove(3);
+	let xt2_watcher = watchers.remove(2);
+	let xt1_watcher = watchers.remove(1);
+	let xt0_watcher = watchers.remove(0);
+
+	api.add_invalid(&xt3);
+	api.add_invalid(&xt4);
+
+	let header02 = api.push_block(2, vec![], true);
+	block_on(pool.maintain(finalized_block_event(&pool, header01.hash(), header02.hash())));
+
+	//todo: shall revalidation check finalized (fork's tip) view?
+	assert_eq!(pool.status_all()[&header02.hash()].ready, 5);
+
+	let header03 = api.push_block(3, vec![xt0.clone(), xt1.clone(), xt2.clone()], true);
+	block_on(pool.maintain(finalized_block_event(&pool, header02.hash(), header03.hash())));
+
+	// wait 10 blocks for revalidation
+	let mut prev_header = header03.clone();
+	for n in 4..=11 {
+		let header = api.push_block(n, vec![], true);
+		let event = finalized_block_event(&pool, prev_header.hash(), header.hash());
+		block_on(pool.maintain(event));
+		prev_header = header;
+	}
+
+	assert_watcher_stream!(
+		xt0_watcher,
+		[
+			TransactionStatus::Ready,
+			TransactionStatus::InBlock((header03.hash(), 0)),
+			TransactionStatus::Finalized((header03.hash(), 0))
+		]
+	);
+	assert_watcher_stream!(
+		xt1_watcher,
+		[
+			TransactionStatus::Ready,
+			TransactionStatus::InBlock((header03.hash(), 1)),
+			TransactionStatus::Finalized((header03.hash(), 1))
+		]
+	);
+	assert_watcher_stream!(
+		xt2_watcher,
+		[
+			TransactionStatus::Ready,
+			TransactionStatus::InBlock((header03.hash(), 2)),
+			TransactionStatus::Finalized((header03.hash(), 2))
+		]
+	);
+	assert_watcher_stream!(xt3_watcher, [TransactionStatus::Ready, TransactionStatus::Invalid]);
+	assert_watcher_stream!(xt4_watcher, [TransactionStatus::Ready, TransactionStatus::Invalid]);
+}
+
+#[test]
+fn fatp_watcher_invalid_fails_on_submission() {
+	sp_tracing::try_init_simple();
+
+	let (pool, api, _) = pool();
+
+	let header01 = api.push_block(1, vec![], true);
+
+	let event = new_best_block_event(&pool, None, header01.hash());
+	block_on(pool.maintain(event));
+
+	let xt0 = uxt(Alice, 150);
+	let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone()));
+	let xt0_watcher = xt0_watcher.map(|_| ());
+
+	assert_pool_status!(header01.hash(), &pool, 0, 0);
+	// Alice's nonce in state is 200, tx is 150.
+	assert!(matches!(
+		xt0_watcher.unwrap_err().into_pool_error(),
+		Ok(TxPoolError::InvalidTransaction(InvalidTransaction::Stale))
+	));
+}
+
+#[test]
+fn fatp_watcher_invalid_single_revalidation() {
+	sp_tracing::try_init_simple();
+
+	let (pool, api, _) = pool();
+
+	let header01 = api.push_block(1, vec![], true);
+	let event = new_best_block_event(&pool, Some(api.genesis_hash()), header01.hash());
+	block_on(pool.maintain(event));
+
+	let xt0 = uxt(Alice, 200);
+	let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap();
+
+	api.add_invalid(&xt0);
+
+	let header02 = api.push_block_with_parent(header01.hash(), vec![], true);
+	let event = finalized_block_event(&pool, header01.hash(), header02.hash());
+	block_on(pool.maintain(event));
+
+	// wait 10 blocks for revalidation
+	let mut prev_header = header02;
+	for n in 3..=11 {
+		let header = api.push_block(n, vec![], true);
+		let event = finalized_block_event(&pool, prev_header.hash(), header.hash());
+		block_on(pool.maintain(event));
+		prev_header = header;
+	}
+
+	let xt0_events = futures::executor::block_on_stream(xt0_watcher).collect::<Vec<_>>();
+	log::debug!("xt0_events: {:#?}", xt0_events);
+	assert_eq!(xt0_events, vec![TransactionStatus::Ready, TransactionStatus::Invalid]);
+}
+
+#[test]
+fn fatp_watcher_invalid_single_revalidation2() {
+	sp_tracing::try_init_simple();
+
+	let (pool, api, _) = pool();
+
+	let xt0 = uxt(Alice, 200);
+	let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap();
+	assert_eq!(pool.mempool_len(), (0, 1));
+	api.add_invalid(&xt0);
+
+	let header01 = api.push_block(1, vec![], true);
+	let event = new_best_block_event(&pool, None, header01.hash());
+	block_on(pool.maintain(event));
+
+	let xt0_events = futures::executor::block_on_stream(xt0_watcher).collect::<Vec<_>>();
+	log::debug!("xt0_events: {:#?}", xt0_events);
+	assert_eq!(xt0_events, vec![TransactionStatus::Invalid]);
+	assert_eq!(pool.mempool_len(), (0, 0));
+}
+
+#[test]
+fn fatp_watcher_invalid_single_revalidation3() {
+	sp_tracing::try_init_simple();
+
+	let (pool, api, _) = pool();
+
+	let xt0 = uxt(Alice, 150);
+	let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap();
+	assert_eq!(pool.mempool_len(), (0, 1));
+
+	let header01 = api.push_block(1, vec![], true);
+	let event = finalized_block_event(&pool, api.genesis_hash(), header01.hash());
+	block_on(pool.maintain(event));
+
+	// wait 10 blocks for revalidation
+	let mut prev_header = header01;
+	for n in 2..=11 {
+		let header = api.push_block(n, vec![], true);
+		let event = finalized_block_event(&pool, prev_header.hash(), header.hash());
+		block_on(pool.maintain(event));
+		prev_header = header;
+	}
+
+	let xt0_events = futures::executor::block_on_stream(xt0_watcher).collect::<Vec<_>>();
+	log::debug!("xt0_events: {:#?}", xt0_events);
+	assert_eq!(xt0_events, vec![TransactionStatus::Invalid]);
+	assert_eq!(pool.mempool_len(), (0, 0));
+}
+
+#[test]
+fn fatp_invalid_report_stale_or_future_works_as_expected() {
+	sp_tracing::try_init_simple();
+
+	let (pool, api, _) = TestPoolBuilder::new().build();
+	api.set_nonce(api.genesis_hash(), Bob.into(), 300);
+	api.set_nonce(api.genesis_hash(), Charlie.into(), 400);
+	api.set_nonce(api.genesis_hash(), Dave.into(), 500);
+
+	let header01 = api.push_block(1, vec![], true);
+	block_on(pool.maintain(new_best_block_event(&pool, None, header01.hash())));
+
+	let xt0 = uxt(Alice, 200);
+	let xt1 = uxt(Bob, 300);
+	let xt2 = uxt(Charlie, 400);
+	let xt3 = uxt(Dave, 500);
+
+	let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap();
+	let xt1_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap();
+	let xt2_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap();
+	let xt3_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).unwrap();
+
+	assert_pool_status!(header01.hash(), &pool, 4, 0);
+	assert_ready_iterator!(header01.hash(), pool, [xt0, xt1, xt2, xt3]);
+
+	// future/stale are ignored when at is None
+	let xt0_report = (
+		pool.api().hash_and_length(&xt0).0,
+		Some(TransactionValidityError::Invalid(InvalidTransaction::Future)),
+	);
+	let invalid_txs = [xt0_report].into();
+	let result = pool.report_invalid(None, invalid_txs);
+	assert!(result.is_empty());
+	assert_ready_iterator!(header01.hash(), pool, [xt0, xt1, xt2, xt3]);
+
+	// future/stale are applied when at is provided
+	let xt0_report = (
+		pool.api().hash_and_length(&xt0).0,
+		Some(TransactionValidityError::Invalid(InvalidTransaction::Future)),
+	);
+	let xt1_report = (
+		pool.api().hash_and_length(&xt1).0,
+		Some(TransactionValidityError::Invalid(InvalidTransaction::Stale)),
+	);
+	let invalid_txs = [xt0_report, xt1_report].into();
+	let result = pool.report_invalid(Some(header01.hash()), invalid_txs);
+	// stale/future does not cause tx to be removed from the pool
+	assert!(result.is_empty());
+	// assert_eq!(result[0].hash, pool.api().hash_and_length(&xt0).0);
+	assert_ready_iterator!(header01.hash(), pool, [xt2, xt3]);
+
+	// None error means force removal
+	// todo
+
+	assert_watcher_stream!(xt0_watcher, [TransactionStatus::Ready]);
+	assert_watcher_stream!(xt1_watcher, [TransactionStatus::Ready]);
+	assert_watcher_stream!(xt2_watcher, [TransactionStatus::Ready]);
+	assert_watcher_stream!(xt3_watcher, [TransactionStatus::Ready]);
+}
+
+#[test]
+fn fatp_invalid_report_future_dont_remove_from_pool() {
+	sp_tracing::try_init_simple();
+
+	let (pool, api, _) = TestPoolBuilder::new().build();
+	api.set_nonce(api.genesis_hash(), Bob.into(), 300);
+	api.set_nonce(api.genesis_hash(), Charlie.into(), 400);
+	api.set_nonce(api.genesis_hash(), Dave.into(), 500);
+	api.set_nonce(api.genesis_hash(), Eve.into(), 600);
+
+	let header01 = api.push_block(1, vec![], true);
+	block_on(pool.maintain(new_best_block_event(&pool, None, header01.hash())));
+
+	let xt0 = uxt(Alice, 200);
+	let xt1 = uxt(Bob, 300);
+	let xt2 = uxt(Charlie, 400);
+	let xt3 = uxt(Dave, 500);
+	let xt4 = uxt(Eve, 600);
+
+	let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap();
+	let xt1_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap();
+	let xt2_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap();
+	let xt3_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).unwrap();
+	let xt4_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt4.clone())).unwrap();
+
+	assert_pool_status!(header01.hash(), &pool, 5, 0);
+	assert_ready_iterator!(header01.hash(), pool, [xt0, xt1, xt2, xt3, xt4]);
+
+	let header02 = api.push_block_with_parent(header01.hash(), vec![], true);
+	block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header02.hash())));
+
+	assert_pool_status!(header02.hash(), &pool, 5, 0);
+	assert_ready_iterator!(header02.hash(), pool, [xt0, xt1, xt2, xt3, xt4]);
+
+	let xt0_report = (
+		pool.api().hash_and_length(&xt0).0,
+		Some(TransactionValidityError::Invalid(InvalidTransaction::Stale)),
+	);
+	let xt1_report = (
+		pool.api().hash_and_length(&xt1).0,
+		Some(TransactionValidityError::Invalid(InvalidTransaction::Future)),
+	);
+	let xt4_report = (
+		pool.api().hash_and_length(&xt4).0,
+		Some(TransactionValidityError::Invalid(InvalidTransaction::BadProof)),
+	);
+	let invalid_txs = [xt0_report, xt1_report, xt4_report].into();
+	let result = pool.report_invalid(Some(header01.hash()), invalid_txs);
+
+	assert_watcher_stream!(xt4_watcher, [TransactionStatus::Ready, TransactionStatus::Invalid]);
+
+	// future does not cause tx to be removed from the pool
+	assert!(result.len() == 1);
+	assert!(result[0].hash == pool.api().hash_and_length(&xt4).0);
+	assert_ready_iterator!(header01.hash(), pool, [xt2, xt3]);
+
+	assert_pool_status!(header02.hash(), &pool, 4, 0);
+	assert_ready_iterator!(header02.hash(), pool, [xt0, xt1, xt2, xt3]);
+
+	assert_watcher_stream!(xt0_watcher, [TransactionStatus::Ready]);
+	assert_watcher_stream!(xt1_watcher, [TransactionStatus::Ready]);
+	assert_watcher_stream!(xt2_watcher, [TransactionStatus::Ready]);
+	assert_watcher_stream!(xt3_watcher, [TransactionStatus::Ready]);
+}
+
+#[test]
+fn fatp_invalid_tx_is_removed_from_the_pool() {
+	sp_tracing::try_init_simple();
+
+	let (pool, api, _) = TestPoolBuilder::new().build();
+	api.set_nonce(api.genesis_hash(), Bob.into(), 300);
+	api.set_nonce(api.genesis_hash(), Charlie.into(), 400);
+	api.set_nonce(api.genesis_hash(), Dave.into(), 500);
+
+	let header01 = api.push_block(1, vec![], true);
+	block_on(pool.maintain(new_best_block_event(&pool, None, header01.hash())));
+
+	let xt0 = uxt(Alice, 200);
+	let xt1 = uxt(Bob, 300);
+	let xt2 = uxt(Charlie, 400);
+	let xt3 = uxt(Dave, 500);
+
+	let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap();
+	let xt1_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap();
+	let xt2_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap();
+	let xt3_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).unwrap();
+
+	assert_pool_status!(header01.hash(), &pool, 4, 0);
+	assert_ready_iterator!(header01.hash(), pool, [xt0, xt1, xt2, xt3]);
+
+	let xt0_report = (
+		pool.api().hash_and_length(&xt0).0,
+		Some(TransactionValidityError::Invalid(InvalidTransaction::BadProof)),
+	);
+	let xt1_report = (pool.api().hash_and_length(&xt1).0, None);
+	let invalid_txs = [xt0_report, xt1_report].into();
+	let result = pool.report_invalid(Some(header01.hash()), invalid_txs);
+	assert!(result.iter().any(|tx| tx.hash == pool.api().hash_and_length(&xt0).0));
+	assert_pool_status!(header01.hash(), &pool, 2, 0);
+	assert_ready_iterator!(header01.hash(), pool, [xt2, xt3]);
+
+	let header02 = api.push_block_with_parent(header01.hash(), vec![], true);
+	block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header02.hash())));
+	assert_pool_status!(header02.hash(), &pool, 2, 0);
+	assert_ready_iterator!(header02.hash(), pool, [xt2, xt3]);
+
+	assert_watcher_stream!(xt0_watcher, [TransactionStatus::Ready, TransactionStatus::Invalid]);
+	assert_watcher_stream!(xt1_watcher, [TransactionStatus::Ready, TransactionStatus::Invalid]);
+	assert_watcher_stream!(xt2_watcher, [TransactionStatus::Ready]);
+	assert_watcher_stream!(xt3_watcher, [TransactionStatus::Ready]);
+}
+
+#[test]
+fn fatp_invalid_tx_is_removed_from_the_pool_future_subtree_stays() {
+	sp_tracing::try_init_simple();
+
+	let (pool, api, _) = TestPoolBuilder::new().build();
+
+	let header01 = api.push_block(1, vec![], true);
+	block_on(pool.maintain(new_best_block_event(&pool, None, header01.hash())));
+
+	let xt0 = uxt(Alice, 200);
+	let xt1 = uxt(Alice, 201);
+	let xt2 = uxt(Alice, 202);
+	let xt3 = uxt(Alice, 203);
+
+	let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap();
+	let xt1_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap();
+	let xt2_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap();
+	let xt3_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).unwrap();
+
+	assert_pool_status!(header01.hash(), &pool, 4, 0);
+	assert_ready_iterator!(header01.hash(), pool, [xt0, xt1, xt2, xt3]);
+
+	let xt0_report = (
+		pool.api().hash_and_length(&xt0).0,
+		Some(TransactionValidityError::Invalid(InvalidTransaction::BadProof)),
+	);
+	let invalid_txs = [xt0_report].into();
+	let result = pool.report_invalid(Some(header01.hash()), invalid_txs);
+	assert_eq!(result[0].hash, pool.api().hash_and_length(&xt0).0);
+	assert_pool_status!(header01.hash(), &pool, 0, 0);
+	assert_ready_iterator!(header01.hash(), pool, []);
+
+	let header02 = api.push_block_with_parent(header01.hash(), vec![], true);
+	block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header02.hash())));
+	assert_pool_status!(header02.hash(), &pool, 0, 3);
+	assert_future_iterator!(header02.hash(), pool, [xt1, xt2, xt3]);
+
+	assert_watcher_stream!(xt0_watcher, [TransactionStatus::Ready, TransactionStatus::Invalid]);
+	assert_watcher_stream!(xt1_watcher, [TransactionStatus::Ready]);
+	assert_watcher_stream!(xt2_watcher, [TransactionStatus::Ready]);
+	assert_watcher_stream!(xt3_watcher, [TransactionStatus::Ready]);
+}
+
+#[test]
+fn fatp_invalid_tx_is_removed_from_the_pool2() {
+	sp_tracing::try_init_simple();
+
+	let (pool, api, _) = TestPoolBuilder::new().build();
+	api.set_nonce(api.genesis_hash(), Bob.into(), 300);
+	api.set_nonce(api.genesis_hash(), Charlie.into(), 400);
+	api.set_nonce(api.genesis_hash(), Dave.into(), 500);
+
+	let header01 = api.push_block(1, vec![], true);
+	block_on(pool.maintain(new_best_block_event(&pool, None, header01.hash())));
+
+	let xt0 = uxt(Alice, 200);
+	let xt1 = uxt(Bob, 300);
+	let xt2 = uxt(Charlie, 400);
+	let xt3 = uxt(Dave, 500);
+
+	let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap();
+	let xt1_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap();
+	let xt2_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap();
+	let xt3_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).unwrap();
+
+	assert_pool_status!(header01.hash(), &pool, 4, 0);
+	assert_ready_iterator!(header01.hash(), pool, [xt0, xt1, xt2, xt3]);
+
+	let header02a = api.push_block_with_parent(header01.hash(), vec![], true);
+	block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header02a.hash())));
+	assert_pool_status!(header02a.hash(), &pool, 4, 0);
+	assert_ready_iterator!(header02a.hash(), pool, [xt0, xt1, xt2, xt3]);
+
+	let header02b = api.push_block_with_parent(header01.hash(), vec![], true);
+	block_on(pool.maintain(new_best_block_event(&pool, Some(header02a.hash()), header02b.hash())));
+
+	assert_pool_status!(header02b.hash(), &pool, 4, 0);
+	assert_ready_iterator!(header02b.hash(), pool, [xt0, xt1, xt2, xt3]);
+
+	let xt0_report = (
+		pool.api().hash_and_length(&xt0).0,
+		Some(TransactionValidityError::Invalid(InvalidTransaction::BadProof)),
+	);
+	let xt1_report = (pool.api().hash_and_length(&xt1).0, None);
+	let invalid_txs = [xt0_report, xt1_report].into();
+	let result = pool.report_invalid(Some(header01.hash()), invalid_txs);
+	assert!(result.iter().any(|tx| tx.hash == pool.api().hash_and_length(&xt0).0));
+	assert_ready_iterator!(header01.hash(), pool, [xt2, xt3]);
+	assert_pool_status!(header02a.hash(), &pool, 2, 0);
+	assert_ready_iterator!(header02a.hash(), pool, [xt2, xt3]);
+	assert_pool_status!(header02b.hash(), &pool, 2, 0);
+	assert_ready_iterator!(header02b.hash(), pool, [xt2, xt3]);
+
+	let header03 = api.push_block_with_parent(header02b.hash(), vec![], true);
+	block_on(pool.maintain(new_best_block_event(&pool, Some(header02b.hash()), header03.hash())));
+	assert_pool_status!(header03.hash(), &pool, 2, 0);
+	assert_ready_iterator!(header03.hash(), pool, [xt2, xt3]);
+
+	assert_watcher_stream!(xt0_watcher, [TransactionStatus::Ready, TransactionStatus::Invalid]);
+	assert_watcher_stream!(xt1_watcher, [TransactionStatus::Ready, TransactionStatus::Invalid]);
+	assert_watcher_stream!(xt2_watcher, [TransactionStatus::Ready]);
+	assert_watcher_stream!(xt3_watcher, [TransactionStatus::Ready]);
+}