From f4743b009280e47398790bd85943819540a9ce0a Mon Sep 17 00:00:00 2001
From: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com>
Date: Tue, 14 Jan 2025 14:09:01 +0100
Subject: [PATCH] `fatxpool`: proper handling of priorities when mempool is
 full (#6647)

Higher-priority transactions can now replace lower-priority transactions
even when the internal _tx_mem_pool_ is full.

**Notes for reviewers:**
- The _tx_mem_pool_ now maintains information about transaction
priority. Although _tx_mem_pool_ itself is stateless, transaction
priority is updated after submission to the view. An alternative
approach could involve validating transactions at the `at` block, but
this is computationally expensive. To avoid additional validation
overhead, I opted to use the priority obtained from runtime during
submission to the view. This is the rationale behind introducing the
`SubmitOutcome` struct, which synchronously communicates transaction
priority from the view to the pool. This results in a very brief window
during which the transaction priority remains unknown - those
transaction are not taken into consideration while dropping takes place.
In the future, if needed, we could update transaction priority using
view revalidation results to keep this information fully up-to-date (as
priority of transaction may change with chain-state evolution).
- When _tx_mem_pool_ becomes full (an event anticipated to be rare),
transaction priority must be known to perform priority-based removal. In
such cases, the most recent block known is utilized for validation. I
think that speculative submission to the view and re-using the priority
from this submission would be an unnecessary complication.
- Once the priority is determined, lower-priority transactions whose
cumulative size meets or exceeds the size of the new transaction are
collected to ensure the pool size limit is not exceeded.
- Transaction removed from _tx_mem_pool_ , also needs to be removed from
all the views with appropriate event (which is done by
`remove_transaction_subtree`). To ensure complete removal, the
`PendingTxReplacement` struct was re-factored to more generic
`PendingPreInsertTask` (introduced in #6405) which covers removal and
submssion of transaction in the view which may be potentially created in
the background. This is to ensure that removed transaction will not
re-enter to the newly created view.
- `submit_local` implementation was also improved to properly handle
priorities in case when mempool is full. Some missing tests for this
method were also added.

Closes: #5809

---------

Co-authored-by: command-bot <>
Co-authored-by: Iulian Barbu <14218860+iulianbarbu@users.noreply.github.com>
---
 prdoc/pr_6647.prdoc                           |   8 +
 .../src/fork_aware_txpool/dropped_watcher.rs  |  18 +-
 .../fork_aware_txpool/fork_aware_txpool.rs    | 238 +++++++++--
 .../src/fork_aware_txpool/tx_mem_pool.rs      | 402 ++++++++++++++++--
 .../src/fork_aware_txpool/view.rs             |  22 +-
 .../src/fork_aware_txpool/view_store.rs       | 261 +++++++++---
 .../transaction-pool/src/graph/base_pool.rs   |  44 +-
 .../transaction-pool/src/graph/listener.rs    |   4 +-
 .../client/transaction-pool/src/graph/mod.rs  |   8 +-
 .../client/transaction-pool/src/graph/pool.rs |  84 ++--
 .../transaction-pool/src/graph/ready.rs       |  10 +-
 .../transaction-pool/src/graph/tracked_map.rs |   5 +
 .../src/graph/validated_pool.rs               | 119 +++++-
 .../src/single_state_txpool/revalidation.rs   |   5 +-
 .../single_state_txpool.rs                    |  30 +-
 .../transaction-pool/tests/fatp_common/mod.rs |  19 +-
 .../transaction-pool/tests/fatp_prios.rs      | 317 +++++++++++++-
 .../client/transaction-pool/tests/pool.rs     |  14 +-
 .../runtime/transaction-pool/src/lib.rs       |  36 +-
 19 files changed, 1393 insertions(+), 251 deletions(-)
 create mode 100644 prdoc/pr_6647.prdoc

diff --git a/prdoc/pr_6647.prdoc b/prdoc/pr_6647.prdoc
new file mode 100644
index 00000000000..47af9924ef1
--- /dev/null
+++ b/prdoc/pr_6647.prdoc
@@ -0,0 +1,8 @@
+title: '`fatxpool`: proper handling of priorities when mempool is full'
+doc:
+- audience: Node Dev
+  description: |-
+    Higher-priority transactions can now replace lower-priority transactions even when the internal _tx_mem_pool_ is full.
+crates:
+- name: sc-transaction-pool
+  bump: minor
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs
index d69aa37c94a..bf61558b00b 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs
@@ -53,11 +53,13 @@ pub struct DroppedTransaction<Hash> {
 }
 
 impl<Hash> DroppedTransaction<Hash> {
-	fn new_usurped(tx_hash: Hash, by: Hash) -> Self {
+	/// Creates a new instance with reason set to `DroppedReason::Usurped(by)`.
+	pub fn new_usurped(tx_hash: Hash, by: Hash) -> Self {
 		Self { reason: DroppedReason::Usurped(by), tx_hash }
 	}
 
-	fn new_enforced_by_limts(tx_hash: Hash) -> Self {
+	/// Creates a new instance with reason set to `DroppedReason::LimitsEnforced`.
+	pub fn new_enforced_by_limts(tx_hash: Hash) -> Self {
 		Self { reason: DroppedReason::LimitsEnforced, tx_hash }
 	}
 }
@@ -256,11 +258,13 @@ where
 				self.future_transaction_views.entry(tx_hash).or_default().insert(block_hash);
 			},
 			TransactionStatus::Ready | TransactionStatus::InBlock(..) => {
-				// note: if future transaction was once seens as the ready we may want to treat it
-				// as ready transactions. Unreferenced future transactions are more likely to be
-				// removed when the last referencing view is removed then ready transactions.
-				// Transcaction seen as ready is likely quite close to be included in some
-				// future fork.
+				// note: if future transaction was once seen as the ready we may want to treat it
+				// as ready transaction. The rationale behind this is as follows: we want to remove
+				// unreferenced future transactions when the last referencing view is removed (to
+				// avoid clogging mempool). For ready transactions we prefer to keep them in mempool
+				// even if no view is currently referencing them. Future transcaction once seen as
+				// ready is likely quite close to be included in some future fork (it is close to be
+				// ready, so we make exception and treat such transaction as ready).
 				if let Some(mut views) = self.future_transaction_views.remove(&tx_hash) {
 					views.insert(block_hash);
 					self.ready_transaction_views.insert(tx_hash, views);
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs
index e57256943cc..76604571825 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs
@@ -31,7 +31,10 @@ use crate::{
 	api::FullChainApi,
 	common::log_xt::log_xt_trace,
 	enactment_state::{EnactmentAction, EnactmentState},
-	fork_aware_txpool::{dropped_watcher::DroppedReason, revalidation_worker},
+	fork_aware_txpool::{
+		dropped_watcher::{DroppedReason, DroppedTransaction},
+		revalidation_worker,
+	},
 	graph::{
 		self,
 		base_pool::{TimedTransactionSource, Transaction},
@@ -49,14 +52,16 @@ use futures::{
 use parking_lot::Mutex;
 use prometheus_endpoint::Registry as PrometheusRegistry;
 use sc_transaction_pool_api::{
-	ChainEvent, ImportNotificationStream, MaintainedTransactionPool, PoolStatus, TransactionFor,
-	TransactionPool, TransactionSource, TransactionStatusStreamFor, TxHash,
+	error::Error as TxPoolApiError, ChainEvent, ImportNotificationStream,
+	MaintainedTransactionPool, PoolStatus, TransactionFor, TransactionPool, TransactionPriority,
+	TransactionSource, TransactionStatusStreamFor, TxHash,
 };
 use sp_blockchain::{HashAndNumber, TreeRoute};
 use sp_core::traits::SpawnEssentialNamed;
 use sp_runtime::{
 	generic::BlockId,
 	traits::{Block as BlockT, NumberFor},
+	transaction_validity::{TransactionValidityError, ValidTransaction},
 };
 use std::{
 	collections::{HashMap, HashSet},
@@ -287,7 +292,7 @@ where
 				DroppedReason::LimitsEnforced => {},
 			};
 
-			mempool.remove_dropped_transaction(&dropped_tx_hash).await;
+			mempool.remove_transaction(&dropped_tx_hash);
 			view_store.listener.transaction_dropped(dropped);
 			import_notification_sink.clean_notified_items(&[dropped_tx_hash]);
 		}
@@ -598,7 +603,7 @@ where
 /// out:
 /// [ Ok(xth0), Ok(xth1), Err ]
 /// ```
-fn reduce_multiview_result<H, E>(input: HashMap<H, Vec<Result<H, E>>>) -> Vec<Result<H, E>> {
+fn reduce_multiview_result<H, D, E>(input: HashMap<H, Vec<Result<D, E>>>) -> Vec<Result<D, E>> {
 	let mut values = input.values();
 	let Some(first) = values.next() else {
 		return Default::default();
@@ -650,9 +655,28 @@ where
 		let mempool_results = self.mempool.extend_unwatched(source, &xts);
 
 		if view_store.is_empty() {
-			return Ok(mempool_results.into_iter().map(|r| r.map(|r| r.hash)).collect::<Vec<_>>())
+			return Ok(mempool_results
+				.into_iter()
+				.map(|r| r.map(|r| r.hash).map_err(Into::into))
+				.collect::<Vec<_>>())
 		}
 
+		// Submit all the transactions to the mempool
+		let retries = mempool_results
+			.into_iter()
+			.zip(xts.clone())
+			.map(|(result, xt)| async move {
+				match result {
+					Err(TxPoolApiError::ImmediatelyDropped) =>
+						self.attempt_transaction_replacement(source, false, xt).await,
+					_ => result,
+				}
+			})
+			.collect::<Vec<_>>();
+
+		let mempool_results = futures::future::join_all(retries).await;
+
+		// Collect transactions that were successfully submitted to the mempool...
 		let to_be_submitted = mempool_results
 			.iter()
 			.zip(xts)
@@ -664,22 +688,47 @@ where
 		self.metrics
 			.report(|metrics| metrics.submitted_transactions.inc_by(to_be_submitted.len() as _));
 
+		// ... and submit them to the view_store. Please note that transactions rejected by mempool
+		// are not sent here.
 		let mempool = self.mempool.clone();
 		let results_map = view_store.submit(to_be_submitted.into_iter()).await;
 		let mut submission_results = reduce_multiview_result(results_map).into_iter();
 
+		// Note for composing final result:
+		//
+		// For each failed insertion into the mempool, the mempool result should be placed into
+		// the returned vector.
+		//
+		// For each successful insertion into the mempool, the corresponding
+		// view_store submission result needs to be examined:
+		// - If there is an error during view_store submission, the transaction is removed from
+		// the mempool, and the final result recorded in the vector for this transaction is the
+		// view_store submission error.
+		//
+		// - If the view_store submission is successful, the transaction priority is updated in the
+		// mempool.
+		//
+		// Finally, it collects the hashes of updated transactions or submission errors (either
+		// from the mempool or view_store) into a returned vector.
 		Ok(mempool_results
 				.into_iter()
 				.map(|result| {
-					result.and_then(|insertion| {
-						submission_results
-							.next()
-							.expect("The number of Ok results in mempool is exactly the same as the size of to-views-submission result. qed.")
-							.inspect_err(|_|
-								mempool.remove(insertion.hash)
-							)
+					result
+						.map_err(Into::into)
+						.and_then(|insertion| {
+							submission_results
+								.next()
+								.expect("The number of Ok results in mempool is exactly the same as the size of view_store submission result. qed.")
+								.inspect_err(|_|{
+									mempool.remove_transaction(&insertion.hash);
+								})
 					})
+
 				})
+				.map(|r| r.map(|r| {
+					mempool.update_transaction_priority(&r);
+					r.hash()
+				}))
 				.collect::<Vec<_>>())
 	}
 
@@ -712,10 +761,13 @@ where
 	) -> Result<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error> {
 		log::trace!(target: LOG_TARGET, "[{:?}] fatp::submit_and_watch views:{}", self.tx_hash(&xt), self.active_views_count());
 		let xt = Arc::from(xt);
-		let InsertionInfo { hash: xt_hash, source: timed_source } =
+
+		let InsertionInfo { hash: xt_hash, source: timed_source, .. } =
 			match self.mempool.push_watched(source, xt.clone()) {
 				Ok(result) => result,
-				Err(e) => return Err(e),
+				Err(TxPoolApiError::ImmediatelyDropped) =>
+					self.attempt_transaction_replacement(source, true, xt.clone()).await?,
+				Err(e) => return Err(e.into()),
 			};
 
 		self.metrics.report(|metrics| metrics.submitted_transactions.inc());
@@ -723,7 +775,13 @@ where
 		self.view_store
 			.submit_and_watch(at, timed_source, xt)
 			.await
-			.inspect_err(|_| self.mempool.remove(xt_hash))
+			.inspect_err(|_| {
+				self.mempool.remove_transaction(&xt_hash);
+			})
+			.map(|mut outcome| {
+				self.mempool.update_transaction_priority(&outcome);
+				outcome.expect_watcher()
+			})
 	}
 
 	/// Intended to remove transactions identified by the given hashes, and any dependent
@@ -828,22 +886,16 @@ where
 	}
 }
 
-impl<Block, Client> sc_transaction_pool_api::LocalTransactionPool
-	for ForkAwareTxPool<FullChainApi<Client, Block>, Block>
+impl<ChainApi, Block> sc_transaction_pool_api::LocalTransactionPool
+	for ForkAwareTxPool<ChainApi, Block>
 where
 	Block: BlockT,
+	ChainApi: 'static + graph::ChainApi<Block = Block>,
 	<Block as BlockT>::Hash: Unpin,
-	Client: sp_api::ProvideRuntimeApi<Block>
-		+ sc_client_api::BlockBackend<Block>
-		+ sc_client_api::blockchain::HeaderBackend<Block>
-		+ sp_runtime::traits::BlockIdTo<Block>
-		+ sp_blockchain::HeaderMetadata<Block, Error = sp_blockchain::Error>,
-	Client: Send + Sync + 'static,
-	Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
 {
 	type Block = Block;
-	type Hash = ExtrinsicHash<FullChainApi<Client, Block>>;
-	type Error = <FullChainApi<Client, Block> as graph::ChainApi>::Error;
+	type Hash = ExtrinsicHash<ChainApi>;
+	type Error = ChainApi::Error;
 
 	fn submit_local(
 		&self,
@@ -852,12 +904,29 @@ where
 	) -> Result<Self::Hash, Self::Error> {
 		log::debug!(target: LOG_TARGET, "fatp::submit_local views:{}", self.active_views_count());
 		let xt = Arc::from(xt);
-		let InsertionInfo { hash: xt_hash, .. } = self
-			.mempool
-			.extend_unwatched(TransactionSource::Local, &[xt.clone()])
-			.remove(0)?;
 
-		self.view_store.submit_local(xt).or_else(|_| Ok(xt_hash))
+		let result =
+			self.mempool.extend_unwatched(TransactionSource::Local, &[xt.clone()]).remove(0);
+
+		let insertion = match result {
+			Err(TxPoolApiError::ImmediatelyDropped) => self.attempt_transaction_replacement_sync(
+				TransactionSource::Local,
+				false,
+				xt.clone(),
+			),
+			_ => result,
+		}?;
+
+		self.view_store
+			.submit_local(xt)
+			.inspect_err(|_| {
+				self.mempool.remove_transaction(&insertion.hash);
+			})
+			.map(|outcome| {
+				self.mempool.update_transaction_priority(&outcome);
+				outcome.hash()
+			})
+			.or_else(|_| Ok(insertion.hash))
 	}
 }
 
@@ -1109,7 +1178,11 @@ where
 			.await
 			.into_iter()
 			.zip(hashes)
-			.map(|(result, tx_hash)| result.or_else(|_| Err(tx_hash)))
+			.map(|(result, tx_hash)| {
+				result
+					.map(|outcome| self.mempool.update_transaction_priority(&outcome.into()))
+					.or_else(|_| Err(tx_hash))
+			})
 			.collect::<Vec<_>>();
 
 		let submitted_count = watched_results.len();
@@ -1131,7 +1204,7 @@ where
 			for result in watched_results {
 				if let Err(tx_hash) = result {
 					self.view_store.listener.invalidate_transactions(&[tx_hash]);
-					self.mempool.remove(tx_hash);
+					self.mempool.remove_transaction(&tx_hash);
 				}
 			}
 		}
@@ -1263,6 +1336,101 @@ where
 	fn tx_hash(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
 		self.api.hash_and_length(xt).0
 	}
+
+	/// Attempts to find and replace a lower-priority transaction in the transaction pool with a new
+	/// one.
+	///
+	/// This asynchronous function verifies the new transaction against the most recent view. If a
+	/// transaction with a lower priority exists in the transaction pool, it is replaced with the
+	/// new transaction.
+	///
+	/// If no lower-priority transaction is found, the function returns an error indicating the
+	/// transaction was dropped immediately.
+	async fn attempt_transaction_replacement(
+		&self,
+		source: TransactionSource,
+		watched: bool,
+		xt: ExtrinsicFor<ChainApi>,
+	) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, TxPoolApiError> {
+		let at = self
+			.view_store
+			.most_recent_view
+			.read()
+			.ok_or(TxPoolApiError::ImmediatelyDropped)?;
+
+		let (best_view, _) = self
+			.view_store
+			.get_view_at(at, false)
+			.ok_or(TxPoolApiError::ImmediatelyDropped)?;
+
+		let (xt_hash, validated_tx) = best_view
+			.pool
+			.verify_one(
+				best_view.at.hash,
+				best_view.at.number,
+				TimedTransactionSource::from_transaction_source(source, false),
+				xt.clone(),
+				crate::graph::CheckBannedBeforeVerify::Yes,
+			)
+			.await;
+
+		let Some(priority) = validated_tx.priority() else {
+			return Err(TxPoolApiError::ImmediatelyDropped)
+		};
+
+		self.attempt_transaction_replacement_inner(xt, xt_hash, priority, source, watched)
+	}
+
+	/// Sync version of [`Self::attempt_transaction_replacement`].
+	fn attempt_transaction_replacement_sync(
+		&self,
+		source: TransactionSource,
+		watched: bool,
+		xt: ExtrinsicFor<ChainApi>,
+	) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, TxPoolApiError> {
+		let at = self
+			.view_store
+			.most_recent_view
+			.read()
+			.ok_or(TxPoolApiError::ImmediatelyDropped)?;
+
+		let ValidTransaction { priority, .. } = self
+			.api
+			.validate_transaction_blocking(at, TransactionSource::Local, Arc::from(xt.clone()))
+			.map_err(|_| TxPoolApiError::ImmediatelyDropped)?
+			.map_err(|e| match e {
+				TransactionValidityError::Invalid(i) => TxPoolApiError::InvalidTransaction(i),
+				TransactionValidityError::Unknown(u) => TxPoolApiError::UnknownTransaction(u),
+			})?;
+		let xt_hash = self.hash_of(&xt);
+		self.attempt_transaction_replacement_inner(xt, xt_hash, priority, source, watched)
+	}
+
+	fn attempt_transaction_replacement_inner(
+		&self,
+		xt: ExtrinsicFor<ChainApi>,
+		tx_hash: ExtrinsicHash<ChainApi>,
+		priority: TransactionPriority,
+		source: TransactionSource,
+		watched: bool,
+	) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, TxPoolApiError> {
+		let insertion_info =
+			self.mempool.try_insert_with_replacement(xt, priority, source, watched)?;
+
+		for worst_hash in &insertion_info.removed {
+			log::trace!(target: LOG_TARGET, "removed: {worst_hash:?} replaced by {tx_hash:?}");
+			self.view_store
+				.listener
+				.transaction_dropped(DroppedTransaction::new_enforced_by_limts(*worst_hash));
+
+			self.view_store
+				.remove_transaction_subtree(*worst_hash, |listener, removed_tx_hash| {
+					listener.limits_enforced(&removed_tx_hash);
+				});
+		}
+
+		return Ok(insertion_info)
+	}
 }
 
 #[async_trait]
@@ -1410,7 +1578,7 @@ mod reduce_multiview_result_tests {
 	fn empty() {
 		sp_tracing::try_init_simple();
 		let input = HashMap::default();
-		let r = reduce_multiview_result::<H256, Error>(input);
+		let r = reduce_multiview_result::<H256, H256, Error>(input);
 		assert!(r.is_empty());
 	}
 
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs
index 989ae4425dc..c8a4d0c72dd 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs
@@ -26,7 +26,10 @@
 //!   it), while on other forks tx can be valid. Depending on which view is chosen to be cloned,
 //!   such transaction could not be present in the newly created view.
 
-use super::{metrics::MetricsLink as PrometheusMetrics, multi_view_listener::MultiViewListener};
+use super::{
+	metrics::MetricsLink as PrometheusMetrics, multi_view_listener::MultiViewListener,
+	view_store::ViewStoreSubmitOutcome,
+};
 use crate::{
 	common::log_xt::log_xt_trace,
 	graph,
@@ -35,15 +38,20 @@ use crate::{
 };
 use futures::FutureExt;
 use itertools::Itertools;
-use sc_transaction_pool_api::TransactionSource;
+use parking_lot::RwLock;
+use sc_transaction_pool_api::{TransactionPriority, TransactionSource};
 use sp_blockchain::HashAndNumber;
 use sp_runtime::{
 	traits::Block as BlockT,
 	transaction_validity::{InvalidTransaction, TransactionValidityError},
 };
 use std::{
+	cmp::Ordering,
 	collections::HashMap,
-	sync::{atomic, atomic::AtomicU64, Arc},
+	sync::{
+		atomic::{self, AtomicU64},
+		Arc,
+	},
 	time::Instant,
 };
 
@@ -77,6 +85,9 @@ where
 	source: TimedTransactionSource,
 	/// When the transaction was revalidated, used to periodically revalidate the mem pool buffer.
 	validated_at: AtomicU64,
+	/// Priority of transaction at some block. It is assumed it will not be changed often. None if
+	/// not known.
+	priority: RwLock<Option<TransactionPriority>>,
 	//todo: we need to add future / ready status at finalized block.
 	//If future transactions are stuck in tx_mem_pool (due to limits being hit), we need a means
 	// to replace them somehow with newly coming transactions.
@@ -101,23 +112,50 @@ where
 
 	/// Creates a new instance of wrapper for unwatched transaction.
 	fn new_unwatched(source: TransactionSource, tx: ExtrinsicFor<ChainApi>, bytes: usize) -> Self {
-		Self {
-			watched: false,
-			tx,
-			source: TimedTransactionSource::from_transaction_source(source, true),
-			validated_at: AtomicU64::new(0),
-			bytes,
-		}
+		Self::new(false, source, tx, bytes)
 	}
 
 	/// Creates a new instance of wrapper for watched transaction.
 	fn new_watched(source: TransactionSource, tx: ExtrinsicFor<ChainApi>, bytes: usize) -> Self {
+		Self::new(true, source, tx, bytes)
+	}
+
+	/// Creates a new instance of wrapper for a transaction with no priority.
+	fn new(
+		watched: bool,
+		source: TransactionSource,
+		tx: ExtrinsicFor<ChainApi>,
+		bytes: usize,
+	) -> Self {
+		Self::new_with_optional_priority(watched, source, tx, bytes, None)
+	}
+
+	/// Creates a new instance of wrapper for a transaction with given priority.
+	fn new_with_priority(
+		watched: bool,
+		source: TransactionSource,
+		tx: ExtrinsicFor<ChainApi>,
+		bytes: usize,
+		priority: TransactionPriority,
+	) -> Self {
+		Self::new_with_optional_priority(watched, source, tx, bytes, Some(priority))
+	}
+
+	/// Creates a new instance of wrapper for a transaction with optional priority.
+	fn new_with_optional_priority(
+		watched: bool,
+		source: TransactionSource,
+		tx: ExtrinsicFor<ChainApi>,
+		bytes: usize,
+		priority: Option<TransactionPriority>,
+	) -> Self {
 		Self {
-			watched: true,
+			watched,
 			tx,
 			source: TimedTransactionSource::from_transaction_source(source, true),
 			validated_at: AtomicU64::new(0),
 			bytes,
+			priority: priority.into(),
 		}
 	}
 
@@ -132,6 +170,11 @@ where
 	pub(crate) fn source(&self) -> TimedTransactionSource {
 		self.source.clone()
 	}
+
+	/// Returns the priority of the transaction.
+	pub(crate) fn priority(&self) -> Option<TransactionPriority> {
+		*self.priority.read()
+	}
 }
 
 impl<ChainApi, Block> Size for Arc<TxInMemPool<ChainApi, Block>>
@@ -191,11 +234,15 @@ where
 pub(super) struct InsertionInfo<Hash> {
 	pub(super) hash: Hash,
 	pub(super) source: TimedTransactionSource,
+	pub(super) removed: Vec<Hash>,
 }
 
 impl<Hash> InsertionInfo<Hash> {
 	fn new(hash: Hash, source: TimedTransactionSource) -> Self {
-		Self { hash, source }
+		Self::new_with_removed(hash, source, Default::default())
+	}
+	fn new_with_removed(hash: Hash, source: TimedTransactionSource, removed: Vec<Hash>) -> Self {
+		Self { hash, source, removed }
 	}
 }
 
@@ -279,27 +326,109 @@ where
 		&self,
 		hash: ExtrinsicHash<ChainApi>,
 		tx: TxInMemPool<ChainApi, Block>,
-	) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, ChainApi::Error> {
-		let bytes = self.transactions.bytes();
+	) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, sc_transaction_pool_api::error::Error> {
 		let mut transactions = self.transactions.write();
+
+		let bytes = self.transactions.bytes();
+
 		let result = match (
-			!self.is_limit_exceeded(transactions.len() + 1, bytes + tx.bytes),
+			self.is_limit_exceeded(transactions.len() + 1, bytes + tx.bytes),
 			transactions.contains_key(&hash),
 		) {
-			(true, false) => {
+			(false, false) => {
 				let source = tx.source();
 				transactions.insert(hash, Arc::from(tx));
 				Ok(InsertionInfo::new(hash, source))
 			},
 			(_, true) =>
-				Err(sc_transaction_pool_api::error::Error::AlreadyImported(Box::new(hash)).into()),
-			(false, _) => Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped.into()),
+				Err(sc_transaction_pool_api::error::Error::AlreadyImported(Box::new(hash))),
+			(true, _) => Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped),
 		};
 		log::trace!(target: LOG_TARGET, "[{:?}] mempool::try_insert: {:?}", hash, result.as_ref().map(|r| r.hash));
 
 		result
 	}
 
+	/// Attempts to insert a new transaction in the memory pool and drop some worse existing
+	/// transactions.
+	///
+	/// A "worse" transaction means transaction with lower priority, or older transaction with the
+	/// same prio.
+	///
+	/// This operation will not overflow the limit of the mempool. It means that cumulative
+	/// size of removed transactions will be equal (or greated) then size of newly inserted
+	/// transaction.
+	///
+	/// Returns a `Result` containing `InsertionInfo` if the new transaction is successfully
+	/// inserted; otherwise, returns an appropriate error indicating the failure.
+	pub(super) fn try_insert_with_replacement(
+		&self,
+		new_tx: ExtrinsicFor<ChainApi>,
+		priority: TransactionPriority,
+		source: TransactionSource,
+		watched: bool,
+	) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, sc_transaction_pool_api::error::Error> {
+		let (hash, length) = self.api.hash_and_length(&new_tx);
+		let new_tx = TxInMemPool::new_with_priority(watched, source, new_tx, length, priority);
+		if new_tx.bytes > self.max_transactions_total_bytes {
+			return Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped);
+		}
+
+		let mut transactions = self.transactions.write();
+
+		if transactions.contains_key(&hash) {
+			return Err(sc_transaction_pool_api::error::Error::AlreadyImported(Box::new(hash)));
+		}
+
+		let mut sorted = transactions
+			.iter()
+			.filter_map(|(h, v)| v.priority().map(|_| (*h, v.clone())))
+			.collect::<Vec<_>>();
+
+		// When pushing higher prio transaction, we need to find a number of lower prio txs, such
+		// that the sum of their bytes is ge then size of new tx. Otherwise we could overflow size
+		// limits. Naive way to do it - rev-sort by priority and eat the tail.
+
+		// reverse (oldest, lowest prio last)
+		sorted.sort_by(|(_, a), (_, b)| match b.priority().cmp(&a.priority()) {
+			Ordering::Equal => match (a.source.timestamp, b.source.timestamp) {
+				(Some(a), Some(b)) => b.cmp(&a),
+				_ => Ordering::Equal,
+			},
+			ordering => ordering,
+		});
+
+		let mut total_size_removed = 0usize;
+		let mut to_be_removed = vec![];
+		let free_bytes = self.max_transactions_total_bytes - self.transactions.bytes();
+
+		loop {
+			let Some((worst_hash, worst_tx)) = sorted.pop() else {
+				return Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped);
+			};
+
+			if worst_tx.priority() >= new_tx.priority() {
+				return Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped);
+			}
+
+			total_size_removed += worst_tx.bytes;
+			to_be_removed.push(worst_hash);
+
+			if free_bytes + total_size_removed >= new_tx.bytes {
+				break;
+			}
+		}
+
+		let source = new_tx.source();
+		transactions.insert(hash, Arc::from(new_tx));
+		for worst_hash in &to_be_removed {
+			transactions.remove(worst_hash);
+		}
+		debug_assert!(!self.is_limit_exceeded(transactions.len(), self.transactions.bytes()));
+
+		Ok(InsertionInfo::new_with_removed(hash, source, to_be_removed))
+	}
+
 	/// Adds a new unwatched transactions to the internal buffer not exceeding the limit.
 	///
 	/// Returns the vector of results for each transaction, the order corresponds to the input
@@ -308,7 +437,8 @@ where
 		&self,
 		source: TransactionSource,
 		xts: &[ExtrinsicFor<ChainApi>],
-	) -> Vec<Result<InsertionInfo<ExtrinsicHash<ChainApi>>, ChainApi::Error>> {
+	) -> Vec<Result<InsertionInfo<ExtrinsicHash<ChainApi>>, sc_transaction_pool_api::error::Error>>
+	{
 		let result = xts
 			.iter()
 			.map(|xt| {
@@ -325,20 +455,11 @@ where
 		&self,
 		source: TransactionSource,
 		xt: ExtrinsicFor<ChainApi>,
-	) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, ChainApi::Error> {
+	) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, sc_transaction_pool_api::error::Error> {
 		let (hash, length) = self.api.hash_and_length(&xt);
 		self.try_insert(hash, TxInMemPool::new_watched(source, xt.clone(), length))
 	}
 
-	/// Removes transaction from the memory pool which are specified by the given list of hashes.
-	pub(super) async fn remove_dropped_transaction(
-		&self,
-		dropped: &ExtrinsicHash<ChainApi>,
-	) -> Option<Arc<TxInMemPool<ChainApi, Block>>> {
-		log::debug!(target: LOG_TARGET, "[{:?}] mempool::remove_dropped_transaction", dropped);
-		self.transactions.write().remove(dropped)
-	}
-
 	/// Clones and returns a `HashMap` of references to all unwatched transactions in the memory
 	/// pool.
 	pub(super) fn clone_unwatched(
@@ -362,9 +483,13 @@ where
 			.collect::<HashMap<_, _>>()
 	}
 
-	/// Removes a transaction from the memory pool based on a given hash.
-	pub(super) fn remove(&self, hash: ExtrinsicHash<ChainApi>) {
-		let _ = self.transactions.write().remove(&hash);
+	/// Removes a transaction with given hash from the memory pool.
+	pub(super) fn remove_transaction(
+		&self,
+		hash: &ExtrinsicHash<ChainApi>,
+	) -> Option<Arc<TxInMemPool<ChainApi, Block>>> {
+		log::debug!(target: LOG_TARGET, "[{hash:?}] mempool::remove_transaction");
+		self.transactions.write().remove(hash)
 	}
 
 	/// Revalidates a batch of transactions against the provided finalized block.
@@ -462,6 +587,17 @@ where
 		});
 		self.listener.invalidate_transactions(&invalid_hashes);
 	}
+
+	/// Updates the priority of transaction stored in mempool using provided view_store submission
+	/// outcome.
+	pub(super) fn update_transaction_priority(&self, outcome: &ViewStoreSubmitOutcome<ChainApi>) {
+		outcome.priority().map(|priority| {
+			self.transactions
+				.write()
+				.get_mut(&outcome.hash())
+				.map(|p| *p.priority.write() = Some(priority))
+		});
+	}
 }
 
 #[cfg(test)]
@@ -583,6 +719,9 @@ mod tx_mem_pool_tests {
 		assert_eq!(mempool.unwatched_and_watched_count(), (10, 5));
 	}
 
+	/// size of large extrinsic
+	const LARGE_XT_SIZE: usize = 1129;
+
 	fn large_uxt(x: usize) -> Extrinsic {
 		ExtrinsicBuilder::new_include_data(vec![x as u8; 1024]).build()
 	}
@@ -592,8 +731,7 @@ mod tx_mem_pool_tests {
 		sp_tracing::try_init_simple();
 		let max = 10;
 		let api = Arc::from(TestApi::default());
-		//size of large extrinsic is: 1129
-		let mempool = TxMemPool::new_test(api.clone(), usize::MAX, max * 1129);
+		let mempool = TxMemPool::new_test(api.clone(), usize::MAX, max * LARGE_XT_SIZE);
 
 		let xts = (0..max).map(|x| Arc::from(large_uxt(x))).collect::<Vec<_>>();
 
@@ -617,4 +755,200 @@ mod tx_mem_pool_tests {
 			sc_transaction_pool_api::error::Error::ImmediatelyDropped
 		));
 	}
+
+	#[test]
+	fn replacing_txs_works_for_same_tx_size() {
+		sp_tracing::try_init_simple();
+		let max = 10;
+		let api = Arc::from(TestApi::default());
+		let mempool = TxMemPool::new_test(api.clone(), usize::MAX, max * LARGE_XT_SIZE);
+
+		let xts = (0..max).map(|x| Arc::from(large_uxt(x))).collect::<Vec<_>>();
+
+		let low_prio = 0u64;
+		let hi_prio = u64::MAX;
+
+		let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1);
+		let (submit_outcomes, hashes): (Vec<_>, Vec<_>) = xts
+			.iter()
+			.map(|t| {
+				let h = api.hash_and_length(t).0;
+				(ViewStoreSubmitOutcome::new(h, Some(low_prio)), h)
+			})
+			.unzip();
+
+		let results = mempool.extend_unwatched(TransactionSource::External, &xts);
+		assert!(results.iter().all(Result::is_ok));
+		assert_eq!(mempool.bytes(), total_xts_bytes);
+
+		submit_outcomes
+			.into_iter()
+			.for_each(|o| mempool.update_transaction_priority(&o));
+
+		let xt = Arc::from(large_uxt(98));
+		let hash = api.hash_and_length(&xt).0;
+		let result = mempool
+			.try_insert_with_replacement(xt, hi_prio, TransactionSource::External, false)
+			.unwrap();
+
+		assert_eq!(result.hash, hash);
+		assert_eq!(result.removed, hashes[0..1]);
+	}
+
+	#[test]
+	fn replacing_txs_removes_proper_size_of_txs() {
+		sp_tracing::try_init_simple();
+		let max = 10;
+		let api = Arc::from(TestApi::default());
+		let mempool = TxMemPool::new_test(api.clone(), usize::MAX, max * LARGE_XT_SIZE);
+
+		let xts = (0..max).map(|x| Arc::from(large_uxt(x))).collect::<Vec<_>>();
+
+		let low_prio = 0u64;
+		let hi_prio = u64::MAX;
+
+		let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1);
+		let (submit_outcomes, hashes): (Vec<_>, Vec<_>) = xts
+			.iter()
+			.map(|t| {
+				let h = api.hash_and_length(t).0;
+				(ViewStoreSubmitOutcome::new(h, Some(low_prio)), h)
+			})
+			.unzip();
+
+		let results = mempool.extend_unwatched(TransactionSource::External, &xts);
+		assert!(results.iter().all(Result::is_ok));
+		assert_eq!(mempool.bytes(), total_xts_bytes);
+		assert_eq!(total_xts_bytes, max * LARGE_XT_SIZE);
+
+		submit_outcomes
+			.into_iter()
+			.for_each(|o| mempool.update_transaction_priority(&o));
+
+		//this one should drop 2 xts (size: 1130):
+		let xt = Arc::from(ExtrinsicBuilder::new_include_data(vec![98 as u8; 1025]).build());
+		let (hash, length) = api.hash_and_length(&xt);
+		assert_eq!(length, 1130);
+		let result = mempool
+			.try_insert_with_replacement(xt, hi_prio, TransactionSource::External, false)
+			.unwrap();
+
+		assert_eq!(result.hash, hash);
+		assert_eq!(result.removed, hashes[0..2]);
+	}
+
+	#[test]
+	fn replacing_txs_removes_proper_size_and_prios() {
+		sp_tracing::try_init_simple();
+		const COUNT: usize = 10;
+		let api = Arc::from(TestApi::default());
+		let mempool = TxMemPool::new_test(api.clone(), usize::MAX, COUNT * LARGE_XT_SIZE);
+
+		let xts = (0..COUNT).map(|x| Arc::from(large_uxt(x))).collect::<Vec<_>>();
+
+		let hi_prio = u64::MAX;
+
+		let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1);
+		let (submit_outcomes, hashes): (Vec<_>, Vec<_>) = xts
+			.iter()
+			.enumerate()
+			.map(|(prio, t)| {
+				let h = api.hash_and_length(t).0;
+				(ViewStoreSubmitOutcome::new(h, Some((COUNT - prio).try_into().unwrap())), h)
+			})
+			.unzip();
+
+		let results = mempool.extend_unwatched(TransactionSource::External, &xts);
+		assert!(results.iter().all(Result::is_ok));
+		assert_eq!(mempool.bytes(), total_xts_bytes);
+
+		submit_outcomes
+			.into_iter()
+			.for_each(|o| mempool.update_transaction_priority(&o));
+
+		//this one should drop 3 xts (each of size 1129)
+		let xt = Arc::from(ExtrinsicBuilder::new_include_data(vec![98 as u8; 2154]).build());
+		let (hash, length) = api.hash_and_length(&xt);
+		// overhead is 105, thus length: 105 + 2154
+		assert_eq!(length, 2 * LARGE_XT_SIZE + 1);
+		let result = mempool
+			.try_insert_with_replacement(xt, hi_prio, TransactionSource::External, false)
+			.unwrap();
+
+		assert_eq!(result.hash, hash);
+		assert!(result.removed.iter().eq(hashes[COUNT - 3..COUNT].iter().rev()));
+	}
+
+	#[test]
+	fn replacing_txs_skips_lower_prio_tx() {
+		sp_tracing::try_init_simple();
+		const COUNT: usize = 10;
+		let api = Arc::from(TestApi::default());
+		let mempool = TxMemPool::new_test(api.clone(), usize::MAX, COUNT * LARGE_XT_SIZE);
+
+		let xts = (0..COUNT).map(|x| Arc::from(large_uxt(x))).collect::<Vec<_>>();
+
+		let hi_prio = 100u64;
+		let low_prio = 10u64;
+
+		let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1);
+		let submit_outcomes = xts
+			.iter()
+			.map(|t| {
+				let h = api.hash_and_length(t).0;
+				ViewStoreSubmitOutcome::new(h, Some(hi_prio))
+			})
+			.collect::<Vec<_>>();
+
+		let results = mempool.extend_unwatched(TransactionSource::External, &xts);
+		assert!(results.iter().all(Result::is_ok));
+		assert_eq!(mempool.bytes(), total_xts_bytes);
+
+		submit_outcomes
+			.into_iter()
+			.for_each(|o| mempool.update_transaction_priority(&o));
+
+		let xt = Arc::from(large_uxt(98));
+		let result =
+			mempool.try_insert_with_replacement(xt, low_prio, TransactionSource::External, false);
+
+		// lower prio tx is rejected immediately
+		assert!(matches!(
+			result.unwrap_err(),
+			sc_transaction_pool_api::error::Error::ImmediatelyDropped
+		));
+	}
+
+	#[test]
+	fn replacing_txs_is_skipped_if_prios_are_not_set() {
+		sp_tracing::try_init_simple();
+		const COUNT: usize = 10;
+		let api = Arc::from(TestApi::default());
+		let mempool = TxMemPool::new_test(api.clone(), usize::MAX, COUNT * LARGE_XT_SIZE);
+
+		let xts = (0..COUNT).map(|x| Arc::from(large_uxt(x))).collect::<Vec<_>>();
+
+		let hi_prio = u64::MAX;
+
+		let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1);
+
+		let results = mempool.extend_unwatched(TransactionSource::External, &xts);
+		assert!(results.iter().all(Result::is_ok));
+		assert_eq!(mempool.bytes(), total_xts_bytes);
+
+		//this one could drop 3 xts (each of size 1129)
+		let xt = Arc::from(ExtrinsicBuilder::new_include_data(vec![98 as u8; 2154]).build());
+		let length = api.hash_and_length(&xt).1;
+		// overhead is 105, thus length: 105 + 2154
+		assert_eq!(length, 2 * LARGE_XT_SIZE + 1);
+
+		let result =
+			mempool.try_insert_with_replacement(xt, hi_prio, TransactionSource::External, false);
+
+		// we did not update priorities (update_transaction_priority was not called):
+		assert!(matches!(
+			result.unwrap_err(),
+			sc_transaction_pool_api::error::Error::ImmediatelyDropped
+		));
+	}
 }
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs
index 3cbb8fa4871..a35d68120a3 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs
@@ -28,7 +28,7 @@ use crate::{
 	common::log_xt::log_xt_trace,
 	graph::{
 		self, base_pool::TimedTransactionSource, watcher::Watcher, ExtrinsicFor, ExtrinsicHash,
-		IsValidator, ValidatedTransaction, ValidatedTransactionFor,
+		IsValidator, ValidatedPoolSubmitOutcome, ValidatedTransaction, ValidatedTransactionFor,
 	},
 	LOG_TARGET,
 };
@@ -158,7 +158,7 @@ where
 	pub(super) async fn submit_many(
 		&self,
 		xts: impl IntoIterator<Item = (TimedTransactionSource, ExtrinsicFor<ChainApi>)>,
-	) -> Vec<Result<ExtrinsicHash<ChainApi>, ChainApi::Error>> {
+	) -> Vec<Result<ValidatedPoolSubmitOutcome<ChainApi>, ChainApi::Error>> {
 		if log::log_enabled!(target: LOG_TARGET, log::Level::Trace) {
 			let xts = xts.into_iter().collect::<Vec<_>>();
 			log_xt_trace!(target: LOG_TARGET, xts.iter().map(|(_,xt)| self.pool.validated_pool().api().hash_and_length(xt).0), "[{:?}] view::submit_many at:{}", self.at.hash);
@@ -173,7 +173,7 @@ where
 		&self,
 		source: TimedTransactionSource,
 		xt: ExtrinsicFor<ChainApi>,
-	) -> Result<Watcher<ExtrinsicHash<ChainApi>, ExtrinsicHash<ChainApi>>, ChainApi::Error> {
+	) -> Result<ValidatedPoolSubmitOutcome<ChainApi>, ChainApi::Error> {
 		log::trace!(target: LOG_TARGET, "[{:?}] view::submit_and_watch at:{}", self.pool.validated_pool().api().hash_and_length(&xt).0, self.at.hash);
 		self.pool.submit_and_watch(&self.at, source, xt).await
 	}
@@ -182,7 +182,7 @@ where
 	pub(super) fn submit_local(
 		&self,
 		xt: ExtrinsicFor<ChainApi>,
-	) -> Result<ExtrinsicHash<ChainApi>, ChainApi::Error> {
+	) -> Result<ValidatedPoolSubmitOutcome<ChainApi>, ChainApi::Error> {
 		let (hash, length) = self.pool.validated_pool().api().hash_and_length(&xt);
 		log::trace!(target: LOG_TARGET, "[{:?}] view::submit_local at:{}", hash, self.at.hash);
 
@@ -460,4 +460,18 @@ where
 		const IGNORE_BANNED: bool = false;
 		self.pool.validated_pool().check_is_known(tx_hash, IGNORE_BANNED).is_err()
 	}
+
+	/// Removes the whole transaction subtree from the inner pool.
+	///
+	/// Refer to [`crate::graph::ValidatedPool::remove_subtree`] for more details.
+	pub fn remove_subtree<F>(
+		&self,
+		tx_hash: ExtrinsicHash<ChainApi>,
+		listener_action: F,
+	) -> Vec<ExtrinsicHash<ChainApi>>
+	where
+		F: Fn(&mut crate::graph::Listener<ChainApi>, ExtrinsicHash<ChainApi>),
+	{
+		self.pool.validated_pool().remove_subtree(tx_hash, listener_action)
+	}
 }
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs
index a06c051f0a7..43ed5bbf886 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs
@@ -27,7 +27,7 @@ use crate::{
 	graph::{
 		self,
 		base_pool::{TimedTransactionSource, Transaction},
-		ExtrinsicFor, ExtrinsicHash, TransactionFor,
+		BaseSubmitOutcome, ExtrinsicFor, ExtrinsicHash, TransactionFor, ValidatedPoolSubmitOutcome,
 	},
 	ReadyIteratorFor, LOG_TARGET,
 };
@@ -38,20 +38,18 @@ use sc_transaction_pool_api::{error::Error as PoolError, PoolStatus};
 use sp_blockchain::TreeRoute;
 use sp_runtime::{generic::BlockId, traits::Block as BlockT};
 use std::{
-	collections::{hash_map::Entry, HashMap},
+	collections::{hash_map::Entry, HashMap, HashSet},
 	sync::Arc,
 	time::Instant,
 };
 
-/// Helper struct to keep the context for transaction replacements.
+/// Helper struct to maintain the context for pending transaction submission, executed for
+/// newly inserted views.
 #[derive(Clone)]
-struct PendingTxReplacement<ChainApi>
+struct PendingTxSubmission<ChainApi>
 where
 	ChainApi: graph::ChainApi,
 {
-	/// Indicates if the new transaction was already submitted to all the views in the view_store.
-	/// If true, it can be removed after inserting any new view.
-	processed: bool,
 	/// New transaction replacing the old one.
 	xt: ExtrinsicFor<ChainApi>,
 	/// Source of the transaction.
@@ -60,13 +58,84 @@ where
 	watched: bool,
 }
 
-impl<ChainApi> PendingTxReplacement<ChainApi>
+/// Helper type representing the callback allowing to trigger per-transaction events on
+/// `ValidatedPool`'s listener.
+type RemovalListener<ChainApi> =
+	Arc<dyn Fn(&mut crate::graph::Listener<ChainApi>, ExtrinsicHash<ChainApi>) + Send + Sync>;
+
+/// Helper struct to maintain the context for pending transaction removal, executed for
+/// newly inserted views.
+struct PendingTxRemoval<ChainApi>
+where
+	ChainApi: graph::ChainApi,
+{
+	/// Hash of the transaction that will be removed,
+	xt_hash: ExtrinsicHash<ChainApi>,
+	/// Action that shall be executed on underlying `ValidatedPool`'s listener.
+	listener_action: RemovalListener<ChainApi>,
+}
+
+/// This enum represents an action that should be executed on the newly built
+/// view before this view is inserted into the view store.
+enum PreInsertAction<ChainApi>
+where
+	ChainApi: graph::ChainApi,
+{
+	/// Represents the action of submitting a new transaction. Intended to use to handle usurped
+	/// transactions.
+	SubmitTx(PendingTxSubmission<ChainApi>),
+
+	/// Represents the action of removing a subtree of transactions.
+	RemoveSubtree(PendingTxRemoval<ChainApi>),
+}
+
+/// Represents a task awaiting execution, to be performed immediately prior to the view insertion
+/// into the view store.
+struct PendingPreInsertTask<ChainApi>
+where
+	ChainApi: graph::ChainApi,
+{
+	/// The action to be applied when inserting a new view.
+	action: PreInsertAction<ChainApi>,
+	/// Indicates if the action was already applied to all the views in the view_store.
+	/// If true, it can be removed after inserting any new view.
+	processed: bool,
+}
+
+impl<ChainApi> PendingPreInsertTask<ChainApi>
 where
 	ChainApi: graph::ChainApi,
 {
-	/// Creates new unprocessed instance of pending transaction replacement.
-	fn new(xt: ExtrinsicFor<ChainApi>, source: TimedTransactionSource, watched: bool) -> Self {
-		Self { processed: false, xt, source, watched }
+	/// Creates new unprocessed instance of pending transaction submission.
+	fn new_submission_action(
+		xt: ExtrinsicFor<ChainApi>,
+		source: TimedTransactionSource,
+		watched: bool,
+	) -> Self {
+		Self {
+			processed: false,
+			action: PreInsertAction::SubmitTx(PendingTxSubmission { xt, source, watched }),
+		}
+	}
+
+	/// Creates new unprocessed instance of pending transaction removal.
+	fn new_removal_action(
+		xt_hash: ExtrinsicHash<ChainApi>,
+		listener: RemovalListener<ChainApi>,
+	) -> Self {
+		Self {
+			processed: false,
+			action: PreInsertAction::RemoveSubtree(PendingTxRemoval {
+				xt_hash,
+				listener_action: listener,
+			}),
+		}
+	}
+
+	/// Marks a task as done for every view present in view store. Basically means that can be
+	/// removed on new view insertion.
+	fn mark_processed(&mut self) {
+		self.processed = true;
 	}
 }
 
@@ -100,9 +169,20 @@ where
 	/// notifcication threads. It is meant to assure that replaced transaction is also removed from
 	/// newly built views in maintain process.
 	///
-	/// The map's key is hash of replaced extrinsic.
-	pending_txs_replacements:
-		RwLock<HashMap<ExtrinsicHash<ChainApi>, PendingTxReplacement<ChainApi>>>,
+	/// The map's key is hash of actionable extrinsic (to avoid duplicated entries).
+	pending_txs_tasks: RwLock<HashMap<ExtrinsicHash<ChainApi>, PendingPreInsertTask<ChainApi>>>,
+}
+
+/// Type alias to outcome of submission to `ViewStore`.
+pub(super) type ViewStoreSubmitOutcome<ChainApi> =
+	BaseSubmitOutcome<ChainApi, TxStatusStream<ChainApi>>;
+
+impl<ChainApi: graph::ChainApi> From<ValidatedPoolSubmitOutcome<ChainApi>>
+	for ViewStoreSubmitOutcome<ChainApi>
+{
+	fn from(value: ValidatedPoolSubmitOutcome<ChainApi>) -> Self {
+		Self::new(value.hash(), value.priority())
+	}
 }
 
 impl<ChainApi, Block> ViewStore<ChainApi, Block>
@@ -124,7 +204,7 @@ where
 			listener,
 			most_recent_view: RwLock::from(None),
 			dropped_stream_controller,
-			pending_txs_replacements: Default::default(),
+			pending_txs_tasks: Default::default(),
 		}
 	}
 
@@ -132,7 +212,7 @@ where
 	pub(super) async fn submit(
 		&self,
 		xts: impl IntoIterator<Item = (TimedTransactionSource, ExtrinsicFor<ChainApi>)> + Clone,
-	) -> HashMap<Block::Hash, Vec<Result<ExtrinsicHash<ChainApi>, ChainApi::Error>>> {
+	) -> HashMap<Block::Hash, Vec<Result<ViewStoreSubmitOutcome<ChainApi>, ChainApi::Error>>> {
 		let submit_futures = {
 			let active_views = self.active_views.read();
 			active_views
@@ -140,7 +220,16 @@ where
 				.map(|(_, view)| {
 					let view = view.clone();
 					let xts = xts.clone();
-					async move { (view.at.hash, view.submit_many(xts).await) }
+					async move {
+						(
+							view.at.hash,
+							view.submit_many(xts)
+								.await
+								.into_iter()
+								.map(|r| r.map(Into::into))
+								.collect::<Vec<_>>(),
+						)
+					}
 				})
 				.collect::<Vec<_>>()
 		};
@@ -153,7 +242,7 @@ where
 	pub(super) fn submit_local(
 		&self,
 		xt: ExtrinsicFor<ChainApi>,
-	) -> Result<ExtrinsicHash<ChainApi>, ChainApi::Error> {
+	) -> Result<ViewStoreSubmitOutcome<ChainApi>, ChainApi::Error> {
 		let active_views = self
 			.active_views
 			.read()
@@ -168,12 +257,14 @@ where
 			.map(|view| view.submit_local(xt.clone()))
 			.find_or_first(Result::is_ok);
 
-		if let Some(Err(err)) = result {
-			log::trace!(target: LOG_TARGET, "[{:?}] submit_local: err: {}", tx_hash, err);
-			return Err(err)
-		};
-
-		Ok(tx_hash)
+		match result {
+			Some(Err(err)) => {
+				log::trace!(target: LOG_TARGET, "[{:?}] submit_local: err: {}", tx_hash, err);
+				Err(err)
+			},
+			None => Ok(ViewStoreSubmitOutcome::new(tx_hash, None)),
+			Some(Ok(r)) => Ok(r.into()),
+		}
 	}
 
 	/// Import a single extrinsic and starts to watch its progress in the pool.
@@ -188,7 +279,7 @@ where
 		_at: Block::Hash,
 		source: TimedTransactionSource,
 		xt: ExtrinsicFor<ChainApi>,
-	) -> Result<TxStatusStream<ChainApi>, ChainApi::Error> {
+	) -> Result<ViewStoreSubmitOutcome<ChainApi>, ChainApi::Error> {
 		let tx_hash = self.api.hash_and_length(&xt).0;
 		let Some(external_watcher) = self.listener.create_external_watcher_for_tx(tx_hash) else {
 			return Err(PoolError::AlreadyImported(Box::new(tx_hash)).into())
@@ -203,13 +294,13 @@ where
 					let source = source.clone();
 					async move {
 						match view.submit_and_watch(source, xt).await {
-							Ok(watcher) => {
+							Ok(mut result) => {
 								self.listener.add_view_watcher_for_tx(
 									tx_hash,
 									view.at.hash,
-									watcher.into_stream().boxed(),
+									result.expect_watcher().into_stream().boxed(),
 								);
-								Ok(())
+								Ok(result)
 							},
 							Err(e) => Err(e),
 						}
@@ -217,17 +308,20 @@ where
 				})
 				.collect::<Vec<_>>()
 		};
-		let maybe_error = futures::future::join_all(submit_and_watch_futures)
+		let result = futures::future::join_all(submit_and_watch_futures)
 			.await
 			.into_iter()
 			.find_or_first(Result::is_ok);
 
-		if let Some(Err(err)) = maybe_error {
-			log::trace!(target: LOG_TARGET, "[{:?}] submit_and_watch: err: {}", tx_hash, err);
-			return Err(err);
-		};
-
-		Ok(external_watcher)
+		match result {
+			Some(Err(err)) => {
+				log::trace!(target: LOG_TARGET, "[{:?}] submit_and_watch: err: {}", tx_hash, err);
+				return Err(err);
+			},
+			Some(Ok(result)) =>
+				Ok(ViewStoreSubmitOutcome::from(result).with_watcher(external_watcher)),
+			None => Ok(ViewStoreSubmitOutcome::new(tx_hash, None).with_watcher(external_watcher)),
+		}
 	}
 
 	/// Returns the pool status for every active view.
@@ -575,8 +669,12 @@ where
 		replaced: ExtrinsicHash<ChainApi>,
 		watched: bool,
 	) {
-		if let Entry::Vacant(entry) = self.pending_txs_replacements.write().entry(replaced) {
-			entry.insert(PendingTxReplacement::new(xt.clone(), source.clone(), watched));
+		if let Entry::Vacant(entry) = self.pending_txs_tasks.write().entry(replaced) {
+			entry.insert(PendingPreInsertTask::new_submission_action(
+				xt.clone(),
+				source.clone(),
+				watched,
+			));
 		} else {
 			return
 		};
@@ -586,8 +684,8 @@ where
 
 		self.replace_transaction_in_views(source, xt, xt_hash, replaced, watched).await;
 
-		if let Some(replacement) = self.pending_txs_replacements.write().get_mut(&replaced) {
-			replacement.processed = true;
+		if let Some(replacement) = self.pending_txs_tasks.write().get_mut(&replaced) {
+			replacement.mark_processed();
 		}
 	}
 
@@ -596,18 +694,25 @@ where
 	/// After application, all already processed replacements are removed.
 	async fn apply_pending_tx_replacements(&self, view: Arc<View<ChainApi>>) {
 		let mut futures = vec![];
-		for replacement in self.pending_txs_replacements.read().values() {
-			let xt_hash = self.api.hash_and_length(&replacement.xt).0;
-			futures.push(self.replace_transaction_in_view(
-				view.clone(),
-				replacement.source.clone(),
-				replacement.xt.clone(),
-				xt_hash,
-				replacement.watched,
-			));
+		for replacement in self.pending_txs_tasks.read().values() {
+			match replacement.action {
+				PreInsertAction::SubmitTx(ref submission) => {
+					let xt_hash = self.api.hash_and_length(&submission.xt).0;
+					futures.push(self.replace_transaction_in_view(
+						view.clone(),
+						submission.source.clone(),
+						submission.xt.clone(),
+						xt_hash,
+						submission.watched,
+					));
+				},
+				PreInsertAction::RemoveSubtree(ref removal) => {
+					view.remove_subtree(removal.xt_hash, &*removal.listener_action);
+				},
+			}
 		}
 		let _results = futures::future::join_all(futures).await;
-		self.pending_txs_replacements.write().retain(|_, r| r.processed);
+		self.pending_txs_tasks.write().retain(|_, r| r.processed);
 	}
 
 	/// Submits `xt` to the given view.
@@ -623,11 +728,11 @@ where
 	) {
 		if watched {
 			match view.submit_and_watch(source, xt).await {
-				Ok(watcher) => {
+				Ok(mut result) => {
 					self.listener.add_view_watcher_for_tx(
 						xt_hash,
 						view.at.hash,
-						watcher.into_stream().boxed(),
+						result.expect_watcher().into_stream().boxed(),
 					);
 				},
 				Err(e) => {
@@ -690,4 +795,58 @@ where
 		};
 		let _results = futures::future::join_all(submit_futures).await;
 	}
+
+	/// Removes a transaction subtree from every view in the view_store, starting from the given
+	/// transaction hash.
+	///
+	/// This function traverses the dependency graph of transactions and removes the specified
+	/// transaction along with all its descendant transactions from every view.
+	///
+	/// A `listener_action` callback function is invoked for every transaction that is removed,
+	/// providing a reference to the pool's listener and the hash of the removed transaction. This
+	/// allows to trigger the required events. Note that listener may be called multiple times for
+	/// the same hash.
+	///
+	/// Function will also schedule view pre-insertion actions to ensure that transactions will be
+	/// removed from newly created view.
+	///
+	/// Returns a vector containing the hashes of all removed transactions, including the root
+	/// transaction specified by `tx_hash`. Vector contains only unique hashes.
+	pub(super) fn remove_transaction_subtree<F>(
+		&self,
+		xt_hash: ExtrinsicHash<ChainApi>,
+		listener_action: F,
+	) -> Vec<ExtrinsicHash<ChainApi>>
+	where
+		F: Fn(&mut crate::graph::Listener<ChainApi>, ExtrinsicHash<ChainApi>)
+			+ Clone
+			+ Send
+			+ Sync
+			+ 'static,
+	{
+		if let Entry::Vacant(entry) = self.pending_txs_tasks.write().entry(xt_hash) {
+			entry.insert(PendingPreInsertTask::new_removal_action(
+				xt_hash,
+				Arc::from(listener_action.clone()),
+			));
+		};
+
+		let mut seen = HashSet::new();
+
+		let removed = self
+			.active_views
+			.read()
+			.iter()
+			.chain(self.inactive_views.read().iter())
+			.filter(|(_, view)| view.is_imported(&xt_hash))
+			.flat_map(|(_, view)| view.remove_subtree(xt_hash, &listener_action))
+			.filter(|xt_hash| seen.insert(*xt_hash))
+			.collect();
+
+		if let Some(removal_action) = self.pending_txs_tasks.write().get_mut(&xt_hash) {
+			removal_action.mark_processed();
+		}
+
+		removed
+	}
 }
diff --git a/substrate/client/transaction-pool/src/graph/base_pool.rs b/substrate/client/transaction-pool/src/graph/base_pool.rs
index 04eaa998f42..3b4afc88b78 100644
--- a/substrate/client/transaction-pool/src/graph/base_pool.rs
+++ b/substrate/client/transaction-pool/src/graph/base_pool.rs
@@ -453,27 +453,29 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash,
 
 		while ready.is_exceeded(self.ready.len(), self.ready.bytes()) {
 			// find the worst transaction
-			let worst = self.ready.fold::<TransactionRef<Hash, Ex>, _>(|worst, current| {
-				let transaction = &current.transaction;
-				worst
-					.map(|worst| {
-						// Here we don't use `TransactionRef`'s ordering implementation because
-						// while it prefers priority like need here, it also prefers older
-						// transactions for inclusion purposes and limit enforcement needs to prefer
-						// newer transactions instead and drop the older ones.
-						match worst.transaction.priority.cmp(&transaction.transaction.priority) {
-							Ordering::Less => worst,
-							Ordering::Equal =>
-								if worst.insertion_id > transaction.insertion_id {
-									transaction.clone()
-								} else {
-									worst
-								},
-							Ordering::Greater => transaction.clone(),
-						}
-					})
-					.or_else(|| Some(transaction.clone()))
-			});
+			let worst =
+				self.ready.fold::<Option<TransactionRef<Hash, Ex>>, _>(None, |worst, current| {
+					let transaction = &current.transaction;
+					worst
+						.map(|worst| {
+							// Here we don't use `TransactionRef`'s ordering implementation because
+							// while it prefers priority like need here, it also prefers older
+							// transactions for inclusion purposes and limit enforcement needs to
+							// prefer newer transactions instead and drop the older ones.
+							match worst.transaction.priority.cmp(&transaction.transaction.priority)
+							{
+								Ordering::Less => worst,
+								Ordering::Equal =>
+									if worst.insertion_id > transaction.insertion_id {
+										transaction.clone()
+									} else {
+										worst
+									},
+								Ordering::Greater => transaction.clone(),
+							}
+						})
+						.or_else(|| Some(transaction.clone()))
+				});
 
 			if let Some(worst) = worst {
 				removed.append(&mut self.remove_subtree(&[worst.transaction.hash.clone()]))
diff --git a/substrate/client/transaction-pool/src/graph/listener.rs b/substrate/client/transaction-pool/src/graph/listener.rs
index 41daf5491f7..7b09ee4c640 100644
--- a/substrate/client/transaction-pool/src/graph/listener.rs
+++ b/substrate/client/transaction-pool/src/graph/listener.rs
@@ -126,8 +126,8 @@ impl<H: hash::Hash + traits::Member + Serialize + Clone, C: ChainApi> Listener<H
 	}
 
 	/// Transaction was dropped from the pool because of enforcing the limit.
-	pub fn limit_enforced(&mut self, tx: &H) {
-		trace!(target: LOG_TARGET, "[{:?}] Dropped (limit enforced)", tx);
+	pub fn limits_enforced(&mut self, tx: &H) {
+		trace!(target: LOG_TARGET, "[{:?}] Dropped (limits enforced)", tx);
 		self.fire(tx, |watcher| watcher.limit_enforced());
 
 		if let Some(ref sink) = self.dropped_by_limits_sink {
diff --git a/substrate/client/transaction-pool/src/graph/mod.rs b/substrate/client/transaction-pool/src/graph/mod.rs
index d93898b1b22..2114577f4de 100644
--- a/substrate/client/transaction-pool/src/graph/mod.rs
+++ b/substrate/client/transaction-pool/src/graph/mod.rs
@@ -41,6 +41,12 @@ pub use self::pool::{
 	BlockHash, ChainApi, ExtrinsicFor, ExtrinsicHash, NumberFor, Options, Pool, RawExtrinsicFor,
 	TransactionFor, ValidatedTransactionFor,
 };
-pub use validated_pool::{IsValidator, ValidatedTransaction};
+pub use validated_pool::{
+	BaseSubmitOutcome, IsValidator, Listener, ValidatedPoolSubmitOutcome, ValidatedTransaction,
+};
 
+pub(crate) use self::pool::CheckBannedBeforeVerify;
 pub(crate) use listener::DroppedByLimitsEvent;
+
+#[cfg(doc)]
+pub(crate) use validated_pool::ValidatedPool;
diff --git a/substrate/client/transaction-pool/src/graph/pool.rs b/substrate/client/transaction-pool/src/graph/pool.rs
index 4c0ace0b1c7..403712662ad 100644
--- a/substrate/client/transaction-pool/src/graph/pool.rs
+++ b/substrate/client/transaction-pool/src/graph/pool.rs
@@ -37,7 +37,7 @@ use std::{
 use super::{
 	base_pool as base,
 	validated_pool::{IsValidator, ValidatedPool, ValidatedTransaction},
-	watcher::Watcher,
+	ValidatedPoolSubmitOutcome,
 };
 
 /// Modification notification event stream type;
@@ -168,7 +168,7 @@ impl Options {
 /// Should we check that the transaction is banned
 /// in the pool, before we verify it?
 #[derive(Copy, Clone)]
-enum CheckBannedBeforeVerify {
+pub(crate) enum CheckBannedBeforeVerify {
 	Yes,
 	No,
 }
@@ -204,7 +204,7 @@ impl<B: ChainApi> Pool<B> {
 		&self,
 		at: &HashAndNumber<B::Block>,
 		xts: impl IntoIterator<Item = (base::TimedTransactionSource, ExtrinsicFor<B>)>,
-	) -> Vec<Result<ExtrinsicHash<B>, B::Error>> {
+	) -> Vec<Result<ValidatedPoolSubmitOutcome<B>, B::Error>> {
 		let validated_transactions = self.verify(at, xts, CheckBannedBeforeVerify::Yes).await;
 		self.validated_pool.submit(validated_transactions.into_values())
 	}
@@ -216,7 +216,7 @@ impl<B: ChainApi> Pool<B> {
 		&self,
 		at: &HashAndNumber<B::Block>,
 		xts: impl IntoIterator<Item = (base::TimedTransactionSource, ExtrinsicFor<B>)>,
-	) -> Vec<Result<ExtrinsicHash<B>, B::Error>> {
+	) -> Vec<Result<ValidatedPoolSubmitOutcome<B>, B::Error>> {
 		let validated_transactions = self.verify(at, xts, CheckBannedBeforeVerify::No).await;
 		self.validated_pool.submit(validated_transactions.into_values())
 	}
@@ -227,7 +227,7 @@ impl<B: ChainApi> Pool<B> {
 		at: &HashAndNumber<B::Block>,
 		source: base::TimedTransactionSource,
 		xt: ExtrinsicFor<B>,
-	) -> Result<ExtrinsicHash<B>, B::Error> {
+	) -> Result<ValidatedPoolSubmitOutcome<B>, B::Error> {
 		let res = self.submit_at(at, std::iter::once((source, xt))).await.pop();
 		res.expect("One extrinsic passed; one result returned; qed")
 	}
@@ -238,7 +238,7 @@ impl<B: ChainApi> Pool<B> {
 		at: &HashAndNumber<B::Block>,
 		source: base::TimedTransactionSource,
 		xt: ExtrinsicFor<B>,
-	) -> Result<Watcher<ExtrinsicHash<B>, ExtrinsicHash<B>>, B::Error> {
+	) -> Result<ValidatedPoolSubmitOutcome<B>, B::Error> {
 		let (_, tx) = self
 			.verify_one(at.hash, at.number, source, xt, CheckBannedBeforeVerify::Yes)
 			.await;
@@ -432,7 +432,7 @@ impl<B: ChainApi> Pool<B> {
 	}
 
 	/// Returns future that validates single transaction at given block.
-	async fn verify_one(
+	pub(crate) async fn verify_one(
 		&self,
 		block_hash: <B::Block as BlockT>::Hash,
 		block_number: NumberFor<B>,
@@ -539,6 +539,7 @@ mod tests {
 				.into(),
 			),
 		)
+		.map(|outcome| outcome.hash())
 		.unwrap();
 
 		// then
@@ -567,7 +568,10 @@ mod tests {
 
 		// when
 		let txs = txs.into_iter().map(|x| (SOURCE, Arc::from(x))).collect::<Vec<_>>();
-		let hashes = block_on(pool.submit_at(&api.expect_hash_and_number(0), txs));
+		let hashes = block_on(pool.submit_at(&api.expect_hash_and_number(0), txs))
+			.into_iter()
+			.map(|r| r.map(|o| o.hash()))
+			.collect::<Vec<_>>();
 		log::debug!("--> {hashes:#?}");
 
 		// then
@@ -591,7 +595,8 @@ mod tests {
 
 		// when
 		pool.validated_pool.ban(&Instant::now(), vec![pool.hash_of(&uxt)]);
-		let res = block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, uxt.into()));
+		let res = block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, uxt.into()))
+			.map(|o| o.hash());
 		assert_eq!(pool.validated_pool().status().ready, 0);
 		assert_eq!(pool.validated_pool().status().future, 0);
 
@@ -614,7 +619,8 @@ mod tests {
 		let uxt = ExtrinsicBuilder::new_include_data(vec![42]).build();
 
 		// when
-		let res = block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, uxt.into()));
+		let res = block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, uxt.into()))
+			.map(|o| o.hash());
 
 		// then
 		assert_matches!(res.unwrap_err(), error::Error::Unactionable);
@@ -642,7 +648,8 @@ mod tests {
 					.into(),
 				),
 			)
-			.unwrap();
+			.unwrap()
+			.hash();
 			let hash1 = block_on(
 				pool.submit_one(
 					&han_of_block0,
@@ -656,7 +663,8 @@ mod tests {
 					.into(),
 				),
 			)
-			.unwrap();
+			.unwrap()
+			.hash();
 			// future doesn't count
 			let _hash = block_on(
 				pool.submit_one(
@@ -671,7 +679,8 @@ mod tests {
 					.into(),
 				),
 			)
-			.unwrap();
+			.unwrap()
+			.hash();
 
 			assert_eq!(pool.validated_pool().status().ready, 2);
 			assert_eq!(pool.validated_pool().status().future, 1);
@@ -704,7 +713,8 @@ mod tests {
 				.into(),
 			),
 		)
-		.unwrap();
+		.unwrap()
+		.hash();
 		let hash2 = block_on(
 			pool.submit_one(
 				&han_of_block0,
@@ -718,7 +728,8 @@ mod tests {
 				.into(),
 			),
 		)
-		.unwrap();
+		.unwrap()
+		.hash();
 		let hash3 = block_on(
 			pool.submit_one(
 				&han_of_block0,
@@ -732,7 +743,8 @@ mod tests {
 				.into(),
 			),
 		)
-		.unwrap();
+		.unwrap()
+		.hash();
 
 		// when
 		pool.validated_pool.clear_stale(&api.expect_hash_and_number(5));
@@ -764,7 +776,8 @@ mod tests {
 				.into(),
 			),
 		)
-		.unwrap();
+		.unwrap()
+		.hash();
 
 		// when
 		block_on(pool.prune_tags(&api.expect_hash_and_number(1), vec![vec![0]], vec![hash1]));
@@ -792,8 +805,9 @@ mod tests {
 		let api = Arc::new(TestApi::default());
 		let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());
 
-		let hash1 =
-			block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, xt.into())).unwrap();
+		let hash1 = block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, xt.into()))
+			.unwrap()
+			.hash();
 		assert_eq!(pool.validated_pool().status().future, 1);
 
 		// when
@@ -810,7 +824,8 @@ mod tests {
 				.into(),
 			),
 		)
-		.unwrap();
+		.unwrap()
+		.hash();
 
 		// then
 		assert_eq!(pool.validated_pool().status().future, 1);
@@ -842,6 +857,7 @@ mod tests {
 				.into(),
 			),
 		)
+		.map(|o| o.hash())
 		.unwrap_err();
 
 		// then
@@ -868,6 +884,7 @@ mod tests {
 				.into(),
 			),
 		)
+		.map(|o| o.hash())
 		.unwrap_err();
 
 		// then
@@ -896,7 +913,8 @@ mod tests {
 					.into(),
 				),
 			)
-			.unwrap();
+			.unwrap()
+			.expect_watcher();
 			assert_eq!(pool.validated_pool().status().ready, 1);
 			assert_eq!(pool.validated_pool().status().future, 0);
 
@@ -933,7 +951,8 @@ mod tests {
 					.into(),
 				),
 			)
-			.unwrap();
+			.unwrap()
+			.expect_watcher();
 			assert_eq!(pool.validated_pool().status().ready, 1);
 			assert_eq!(pool.validated_pool().status().future, 0);
 
@@ -972,7 +991,8 @@ mod tests {
 					.into(),
 				),
 			)
-			.unwrap();
+			.unwrap()
+			.expect_watcher();
 			assert_eq!(pool.validated_pool().status().ready, 0);
 			assert_eq!(pool.validated_pool().status().future, 1);
 
@@ -1011,7 +1031,8 @@ mod tests {
 			});
 			let watcher =
 				block_on(pool.submit_and_watch(&api.expect_hash_and_number(0), SOURCE, uxt.into()))
-					.unwrap();
+					.unwrap()
+					.expect_watcher();
 			assert_eq!(pool.validated_pool().status().ready, 1);
 
 			// when
@@ -1036,7 +1057,8 @@ mod tests {
 			});
 			let watcher =
 				block_on(pool.submit_and_watch(&api.expect_hash_and_number(0), SOURCE, uxt.into()))
-					.unwrap();
+					.unwrap()
+					.expect_watcher();
 			assert_eq!(pool.validated_pool().status().ready, 1);
 
 			// when
@@ -1069,7 +1091,8 @@ mod tests {
 			});
 			let watcher =
 				block_on(pool.submit_and_watch(&api.expect_hash_and_number(0), SOURCE, xt.into()))
-					.unwrap();
+					.unwrap()
+					.expect_watcher();
 			assert_eq!(pool.validated_pool().status().ready, 1);
 
 			// when
@@ -1136,7 +1159,9 @@ mod tests {
 				// after validation `IncludeData` will have priority set to 9001
 				// (validate_transaction mock)
 				let xt = ExtrinsicBuilder::new_include_data(Vec::new()).build();
-				block_on(pool.submit_and_watch(&han_of_block0, SOURCE, xt.into())).unwrap();
+				block_on(pool.submit_and_watch(&han_of_block0, SOURCE, xt.into()))
+					.unwrap()
+					.expect_watcher();
 				assert_eq!(pool.validated_pool().status().ready, 1);
 
 				// after validation `Transfer` will have priority set to 4 (validate_transaction
@@ -1147,8 +1172,9 @@ mod tests {
 					amount: 5,
 					nonce: 0,
 				});
-				let watcher =
-					block_on(pool.submit_and_watch(&han_of_block0, SOURCE, xt.into())).unwrap();
+				let watcher = block_on(pool.submit_and_watch(&han_of_block0, SOURCE, xt.into()))
+					.unwrap()
+					.expect_watcher();
 				assert_eq!(pool.validated_pool().status().ready, 2);
 
 				// when
diff --git a/substrate/client/transaction-pool/src/graph/ready.rs b/substrate/client/transaction-pool/src/graph/ready.rs
index 9061d0e2558..b8aef99e638 100644
--- a/substrate/client/transaction-pool/src/graph/ready.rs
+++ b/substrate/client/transaction-pool/src/graph/ready.rs
@@ -232,12 +232,10 @@ impl<Hash: hash::Hash + Member + Serialize, Ex> ReadyTransactions<Hash, Ex> {
 		Ok(replaced)
 	}
 
-	/// Fold a list of ready transactions to compute a single value.
-	pub fn fold<R, F: FnMut(Option<R>, &ReadyTx<Hash, Ex>) -> Option<R>>(
-		&mut self,
-		f: F,
-	) -> Option<R> {
-		self.ready.read().values().fold(None, f)
+	/// Fold a list of ready transactions to compute a single value using initial value of
+	/// accumulator.
+	pub fn fold<R, F: FnMut(R, &ReadyTx<Hash, Ex>) -> R>(&self, init: R, f: F) -> R {
+		self.ready.read().values().fold(init, f)
 	}
 
 	/// Returns true if given transaction is part of the queue.
diff --git a/substrate/client/transaction-pool/src/graph/tracked_map.rs b/substrate/client/transaction-pool/src/graph/tracked_map.rs
index 6c3bbbf34b5..fe15c6eca30 100644
--- a/substrate/client/transaction-pool/src/graph/tracked_map.rs
+++ b/substrate/client/transaction-pool/src/graph/tracked_map.rs
@@ -173,6 +173,11 @@ where
 	pub fn len(&mut self) -> usize {
 		self.inner_guard.len()
 	}
+
+	/// Returns an iterator over all key-value pairs.
+	pub fn iter(&self) -> Iter<'_, K, V> {
+		self.inner_guard.iter()
+	}
 }
 
 #[cfg(test)]
diff --git a/substrate/client/transaction-pool/src/graph/validated_pool.rs b/substrate/client/transaction-pool/src/graph/validated_pool.rs
index 3f7bf4773de..bc2b07896db 100644
--- a/substrate/client/transaction-pool/src/graph/validated_pool.rs
+++ b/substrate/client/transaction-pool/src/graph/validated_pool.rs
@@ -18,25 +18,22 @@
 
 use std::{
 	collections::{HashMap, HashSet},
-	hash,
 	sync::Arc,
 };
 
 use crate::{common::log_xt::log_xt_trace, LOG_TARGET};
 use futures::channel::mpsc::{channel, Sender};
 use parking_lot::{Mutex, RwLock};
-use sc_transaction_pool_api::{error, PoolStatus, ReadyTransactions};
-use serde::Serialize;
+use sc_transaction_pool_api::{error, PoolStatus, ReadyTransactions, TransactionPriority};
 use sp_blockchain::HashAndNumber;
 use sp_runtime::{
-	traits::{self, SaturatedConversion},
+	traits::SaturatedConversion,
 	transaction_validity::{TransactionTag as Tag, ValidTransaction},
 };
 use std::time::Instant;
 
 use super::{
 	base_pool::{self as base, PruneStatus},
-	listener::Listener,
 	pool::{
 		BlockHash, ChainApi, EventStream, ExtrinsicFor, ExtrinsicHash, Options, TransactionFor,
 	},
@@ -79,12 +76,23 @@ impl<Hash, Ex, Error> ValidatedTransaction<Hash, Ex, Error> {
 			valid_till: at.saturated_into::<u64>().saturating_add(validity.longevity),
 		})
 	}
+
+	/// Returns priority for valid transaction, None if transaction is not valid.
+	pub fn priority(&self) -> Option<TransactionPriority> {
+		match self {
+			ValidatedTransaction::Valid(base::Transaction { priority, .. }) => Some(*priority),
+			_ => None,
+		}
+	}
 }
 
-/// A type of validated transaction stored in the pool.
+/// A type of validated transaction stored in the validated pool.
 pub type ValidatedTransactionFor<B> =
 	ValidatedTransaction<ExtrinsicHash<B>, ExtrinsicFor<B>, <B as ChainApi>::Error>;
 
+/// A type alias representing ValidatedPool listener for given ChainApi type.
+pub type Listener<B> = super::listener::Listener<ExtrinsicHash<B>, B>;
+
 /// A closure that returns true if the local node is a validator that can author blocks.
 #[derive(Clone)]
 pub struct IsValidator(Arc<Box<dyn Fn() -> bool + Send + Sync>>);
@@ -101,12 +109,56 @@ impl From<Box<dyn Fn() -> bool + Send + Sync>> for IsValidator {
 	}
 }
 
+/// Represents the result of `submit` or `submit_and_watch` operations.
+pub struct BaseSubmitOutcome<B: ChainApi, W> {
+	/// The hash of the submitted transaction.
+	hash: ExtrinsicHash<B>,
+	/// A transaction watcher. This is `Some` for `submit_and_watch` and `None` for `submit`.
+	watcher: Option<W>,
+
+	/// The priority of the transaction. Defaults to None if unknown.
+	priority: Option<TransactionPriority>,
+}
+
+/// Type alias to outcome of submission to `ValidatedPool`.
+pub type ValidatedPoolSubmitOutcome<B> =
+	BaseSubmitOutcome<B, Watcher<ExtrinsicHash<B>, ExtrinsicHash<B>>>;
+
+impl<B: ChainApi, W> BaseSubmitOutcome<B, W> {
+	/// Creates a new instance with given hash and priority.
+	pub fn new(hash: ExtrinsicHash<B>, priority: Option<TransactionPriority>) -> Self {
+		Self { hash, priority, watcher: None }
+	}
+
+	/// Sets the transaction watcher.
+	pub fn with_watcher(mut self, watcher: W) -> Self {
+		self.watcher = Some(watcher);
+		self
+	}
+
+	/// Provides priority of submitted transaction.
+	pub fn priority(&self) -> Option<TransactionPriority> {
+		self.priority
+	}
+
+	/// Provides hash of submitted transaction.
+	pub fn hash(&self) -> ExtrinsicHash<B> {
+		self.hash
+	}
+
+	/// Provides a watcher. Should only be called on outcomes of `submit_and_watch`. Otherwise will
+	/// panic (that would mean logical error in program).
+	pub fn expect_watcher(&mut self) -> W {
+		self.watcher.take().expect("watcher was set in submit_and_watch. qed")
+	}
+}
+
 /// Pool that deals with validated transactions.
 pub struct ValidatedPool<B: ChainApi> {
 	api: Arc<B>,
 	is_validator: IsValidator,
 	options: Options,
-	listener: RwLock<Listener<ExtrinsicHash<B>, B>>,
+	listener: RwLock<Listener<B>>,
 	pub(crate) pool: RwLock<base::BasePool<ExtrinsicHash<B>, ExtrinsicFor<B>>>,
 	import_notification_sinks: Mutex<Vec<Sender<ExtrinsicHash<B>>>>,
 	rotator: PoolRotator<ExtrinsicHash<B>>,
@@ -200,7 +252,7 @@ impl<B: ChainApi> ValidatedPool<B> {
 	pub fn submit(
 		&self,
 		txs: impl IntoIterator<Item = ValidatedTransactionFor<B>>,
-	) -> Vec<Result<ExtrinsicHash<B>, B::Error>> {
+	) -> Vec<Result<ValidatedPoolSubmitOutcome<B>, B::Error>> {
 		let results = txs
 			.into_iter()
 			.map(|validated_tx| self.submit_one(validated_tx))
@@ -216,7 +268,7 @@ impl<B: ChainApi> ValidatedPool<B> {
 		results
 			.into_iter()
 			.map(|res| match res {
-				Ok(ref hash) if removed.contains(hash) =>
+				Ok(outcome) if removed.contains(&outcome.hash) =>
 					Err(error::Error::ImmediatelyDropped.into()),
 				other => other,
 			})
@@ -224,9 +276,13 @@ impl<B: ChainApi> ValidatedPool<B> {
 	}
 
 	/// Submit single pre-validated transaction to the pool.
-	fn submit_one(&self, tx: ValidatedTransactionFor<B>) -> Result<ExtrinsicHash<B>, B::Error> {
+	fn submit_one(
+		&self,
+		tx: ValidatedTransactionFor<B>,
+	) -> Result<ValidatedPoolSubmitOutcome<B>, B::Error> {
 		match tx {
 			ValidatedTransaction::Valid(tx) => {
+				let priority = tx.priority;
 				log::trace!(target: LOG_TARGET, "[{:?}] ValidatedPool::submit_one", tx.hash);
 				if !tx.propagate && !(self.is_validator.0)() {
 					return Err(error::Error::Unactionable.into())
@@ -254,7 +310,7 @@ impl<B: ChainApi> ValidatedPool<B> {
 
 				let mut listener = self.listener.write();
 				fire_events(&mut *listener, &imported);
-				Ok(*imported.hash())
+				Ok(ValidatedPoolSubmitOutcome::new(*imported.hash(), Some(priority)))
 			},
 			ValidatedTransaction::Invalid(hash, err) => {
 				log::trace!(target: LOG_TARGET, "[{:?}] ValidatedPool::submit_one invalid: {:?}", hash, err);
@@ -305,7 +361,7 @@ impl<B: ChainApi> ValidatedPool<B> {
 			// run notifications
 			let mut listener = self.listener.write();
 			for h in &removed {
-				listener.limit_enforced(h);
+				listener.limits_enforced(h);
 			}
 
 			removed
@@ -318,7 +374,7 @@ impl<B: ChainApi> ValidatedPool<B> {
 	pub fn submit_and_watch(
 		&self,
 		tx: ValidatedTransactionFor<B>,
-	) -> Result<Watcher<ExtrinsicHash<B>, ExtrinsicHash<B>>, B::Error> {
+	) -> Result<ValidatedPoolSubmitOutcome<B>, B::Error> {
 		match tx {
 			ValidatedTransaction::Valid(tx) => {
 				let hash = self.api.hash_and_length(&tx.data).0;
@@ -326,7 +382,7 @@ impl<B: ChainApi> ValidatedPool<B> {
 				self.submit(std::iter::once(ValidatedTransaction::Valid(tx)))
 					.pop()
 					.expect("One extrinsic passed; one result returned; qed")
-					.map(|_| watcher)
+					.map(|outcome| outcome.with_watcher(watcher))
 			},
 			ValidatedTransaction::Invalid(hash, err) => {
 				self.rotator.ban(&Instant::now(), std::iter::once(hash));
@@ -711,11 +767,42 @@ impl<B: ChainApi> ValidatedPool<B> {
 			listener.future(&f.hash);
 		});
 	}
+
+	/// Removes a transaction subtree from the pool, starting from the given transaction hash.
+	///
+	/// This function traverses the dependency graph of transactions and removes the specified
+	/// transaction along with all its descendant transactions from the pool.
+	///
+	/// A `listener_action` callback function is invoked for every transaction that is removed,
+	/// providing a reference to the pool's listener and the hash of the removed transaction. This
+	/// allows to trigger the required events.
+	///
+	/// Returns a vector containing the hashes of all removed transactions, including the root
+	/// transaction specified by `tx_hash`.
+	pub fn remove_subtree<F>(
+		&self,
+		tx_hash: ExtrinsicHash<B>,
+		listener_action: F,
+	) -> Vec<ExtrinsicHash<B>>
+	where
+		F: Fn(&mut Listener<B>, ExtrinsicHash<B>),
+	{
+		self.pool
+			.write()
+			.remove_subtree(&[tx_hash])
+			.into_iter()
+			.map(|tx| {
+				let removed_tx_hash = tx.hash;
+				let mut listener = self.listener.write();
+				listener_action(&mut *listener, removed_tx_hash);
+				removed_tx_hash
+			})
+			.collect::<Vec<_>>()
+	}
 }
 
-fn fire_events<H, B, Ex>(listener: &mut Listener<H, B>, imported: &base::Imported<H, Ex>)
+fn fire_events<B, Ex>(listener: &mut Listener<B>, imported: &base::Imported<ExtrinsicHash<B>, Ex>)
 where
-	H: hash::Hash + Eq + traits::Member + Serialize,
 	B: ChainApi,
 {
 	match *imported {
diff --git a/substrate/client/transaction-pool/src/single_state_txpool/revalidation.rs b/substrate/client/transaction-pool/src/single_state_txpool/revalidation.rs
index caa09585b28..2a691ae35ea 100644
--- a/substrate/client/transaction-pool/src/single_state_txpool/revalidation.rs
+++ b/substrate/client/transaction-pool/src/single_state_txpool/revalidation.rs
@@ -405,7 +405,8 @@ mod tests {
 			TimedTransactionSource::new_external(false),
 			uxt.clone().into(),
 		))
-		.expect("Should be valid");
+		.expect("Should be valid")
+		.hash();
 
 		block_on(queue.revalidate_later(han_of_block0.hash, vec![uxt_hash]));
 
@@ -448,7 +449,7 @@ mod tests {
 				vec![(source.clone(), uxt0.into()), (source, uxt1.into())],
 			))
 			.into_iter()
-			.map(|r| r.expect("Should be valid"))
+			.map(|r| r.expect("Should be valid").hash())
 			.collect::<Vec<_>>();
 
 		assert_eq!(api.validation_requests().len(), 2);
diff --git a/substrate/client/transaction-pool/src/single_state_txpool/single_state_txpool.rs b/substrate/client/transaction-pool/src/single_state_txpool/single_state_txpool.rs
index 2b32704945c..3598f9dbc2a 100644
--- a/substrate/client/transaction-pool/src/single_state_txpool/single_state_txpool.rs
+++ b/substrate/client/transaction-pool/src/single_state_txpool/single_state_txpool.rs
@@ -274,7 +274,12 @@ where
 
 		let number = self.api.resolve_block_number(at);
 		let at = HashAndNumber { hash: at, number: number? };
-		Ok(pool.submit_at(&at, xts).await)
+		Ok(pool
+			.submit_at(&at, xts)
+			.await
+			.into_iter()
+			.map(|result| result.map(|outcome| outcome.hash()))
+			.collect())
 	}
 
 	async fn submit_one(
@@ -292,6 +297,7 @@ where
 		let at = HashAndNumber { hash: at, number: number? };
 		pool.submit_one(&at, TimedTransactionSource::from_transaction_source(source, false), xt)
 			.await
+			.map(|outcome| outcome.hash())
 	}
 
 	async fn submit_and_watch(
@@ -308,15 +314,13 @@ where
 		let number = self.api.resolve_block_number(at);
 
 		let at = HashAndNumber { hash: at, number: number? };
-		let watcher = pool
-			.submit_and_watch(
-				&at,
-				TimedTransactionSource::from_transaction_source(source, false),
-				xt,
-			)
-			.await?;
-
-		Ok(watcher.into_stream().boxed())
+		pool.submit_and_watch(
+			&at,
+			TimedTransactionSource::from_transaction_source(source, false),
+			xt,
+		)
+		.await
+		.map(|mut outcome| outcome.expect_watcher().into_stream().boxed())
 	}
 
 	fn remove_invalid(&self, hashes: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>> {
@@ -484,7 +488,11 @@ where
 			validity,
 		);
 
-		self.pool.validated_pool().submit(vec![validated]).remove(0)
+		self.pool
+			.validated_pool()
+			.submit(vec![validated])
+			.remove(0)
+			.map(|outcome| outcome.hash())
 	}
 }
 
diff --git a/substrate/client/transaction-pool/tests/fatp_common/mod.rs b/substrate/client/transaction-pool/tests/fatp_common/mod.rs
index aaffebc0db0..530c25caf88 100644
--- a/substrate/client/transaction-pool/tests/fatp_common/mod.rs
+++ b/substrate/client/transaction-pool/tests/fatp_common/mod.rs
@@ -192,12 +192,9 @@ macro_rules! assert_ready_iterator {
 		let output: Vec<_> = ready_iterator.collect();
 		log::debug!(target:LOG_TARGET, "expected: {:#?}", expected);
 		log::debug!(target:LOG_TARGET, "output: {:#?}", output);
+		let output = output.into_iter().map(|t|t.hash).collect::<Vec<_>>();
 		assert_eq!(expected.len(), output.len());
-		assert!(
-			output.iter().zip(expected.iter()).all(|(o,e)| {
-				o.hash == *e
-			})
-		);
+		assert_eq!(output,expected);
 	}};
 }
 
@@ -215,6 +212,18 @@ macro_rules! assert_future_iterator {
 	}};
 }
 
+#[macro_export]
+macro_rules! assert_watcher_stream {
+	($stream:ident, [$( $event:expr ),*]) => {{
+		let expected = vec![ $($event),*];
+		log::debug!(target:LOG_TARGET, "expected: {:#?} {}, block now:", expected, expected.len());
+		let output = futures::executor::block_on_stream($stream).take(expected.len()).collect::<Vec<_>>();
+		log::debug!(target:LOG_TARGET, "output: {:#?}", output);
+		assert_eq!(expected.len(), output.len());
+		assert_eq!(output, expected);
+	}};
+}
+
 pub const SOURCE: TransactionSource = TransactionSource::External;
 
 #[cfg(test)]
diff --git a/substrate/client/transaction-pool/tests/fatp_prios.rs b/substrate/client/transaction-pool/tests/fatp_prios.rs
index 4ed9b450386..af5e7e8c5a6 100644
--- a/substrate/client/transaction-pool/tests/fatp_prios.rs
+++ b/substrate/client/transaction-pool/tests/fatp_prios.rs
@@ -20,13 +20,15 @@
 
 pub mod fatp_common;
 
-use fatp_common::{new_best_block_event, TestPoolBuilder, LOG_TARGET, SOURCE};
+use fatp_common::{invalid_hash, new_best_block_event, TestPoolBuilder, LOG_TARGET, SOURCE};
 use futures::{executor::block_on, FutureExt};
 use sc_transaction_pool::ChainApi;
-use sc_transaction_pool_api::{MaintainedTransactionPool, TransactionPool, TransactionStatus};
+use sc_transaction_pool_api::{
+	error::Error as TxPoolError, LocalTransactionPool, MaintainedTransactionPool, TransactionPool,
+	TransactionStatus,
+};
 use substrate_test_runtime_client::Sr25519Keyring::*;
 use substrate_test_runtime_transaction_pool::uxt;
-
 #[test]
 fn fatp_prio_ready_higher_evicts_lower() {
 	sp_tracing::try_init_simple();
@@ -247,3 +249,312 @@ fn fatp_prio_watcher_future_lower_prio_gets_dropped_from_all_views() {
 	assert_ready_iterator!(header01.hash(), pool, [xt2, xt1]);
 	assert_ready_iterator!(header02.hash(), pool, [xt2, xt1]);
 }
+
+#[test]
+fn fatp_prios_watcher_full_mempool_higher_prio_is_accepted() {
+	sp_tracing::try_init_simple();
+
+	let builder = TestPoolBuilder::new();
+	let (pool, api, _) = builder.with_mempool_count_limit(4).with_ready_count(2).build();
+	api.set_nonce(api.genesis_hash(), Bob.into(), 300);
+	api.set_nonce(api.genesis_hash(), Charlie.into(), 400);
+	api.set_nonce(api.genesis_hash(), Dave.into(), 500);
+	api.set_nonce(api.genesis_hash(), Eve.into(), 600);
+	api.set_nonce(api.genesis_hash(), Ferdie.into(), 700);
+
+	let header01 = api.push_block(1, vec![], true);
+	let event = new_best_block_event(&pool, None, header01.hash());
+	block_on(pool.maintain(event));
+
+	let xt0 = uxt(Alice, 200);
+	let xt1 = uxt(Bob, 300);
+	let xt2 = uxt(Charlie, 400);
+
+	let xt3 = uxt(Dave, 500);
+
+	let xt4 = uxt(Eve, 600);
+	let xt5 = uxt(Ferdie, 700);
+
+	api.set_priority(&xt0, 1);
+	api.set_priority(&xt1, 2);
+	api.set_priority(&xt2, 3);
+	api.set_priority(&xt3, 4);
+
+	api.set_priority(&xt4, 5);
+	api.set_priority(&xt5, 6);
+
+	let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap();
+	let xt1_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap();
+
+	assert_pool_status!(header01.hash(), &pool, 2, 0);
+	assert_eq!(pool.mempool_len().1, 2);
+
+	let header02 = api.push_block_with_parent(header01.hash(), vec![], true);
+	block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header02.hash())));
+
+	let _xt2_watcher =
+		block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap();
+	let _xt3_watcher =
+		block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).unwrap();
+
+	assert_pool_status!(header02.hash(), &pool, 2, 0);
+	assert_eq!(pool.mempool_len().1, 4);
+
+	let header03 = api.push_block_with_parent(header02.hash(), vec![], true);
+	block_on(pool.maintain(new_best_block_event(&pool, Some(header02.hash()), header03.hash())));
+
+	let _xt4_watcher =
+		block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt4.clone())).unwrap();
+	let _xt5_watcher =
+		block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt5.clone())).unwrap();
+
+	assert_pool_status!(header03.hash(), &pool, 2, 0);
+	assert_eq!(pool.mempool_len().1, 4);
+
+	assert_watcher_stream!(xt0_watcher, [TransactionStatus::Ready, TransactionStatus::Dropped]);
+	assert_watcher_stream!(xt1_watcher, [TransactionStatus::Ready, TransactionStatus::Dropped]);
+
+	assert_ready_iterator!(header01.hash(), pool, []);
+	assert_ready_iterator!(header02.hash(), pool, [xt3, xt2]);
+	assert_ready_iterator!(header03.hash(), pool, [xt5, xt4]);
+}
+
+#[test]
+fn fatp_prios_watcher_full_mempool_higher_prio_is_accepted_with_subtree() {
+	sp_tracing::try_init_simple();
+
+	let builder = TestPoolBuilder::new();
+	let (pool, api, _) = builder.with_mempool_count_limit(4).with_ready_count(4).build();
+	api.set_nonce(api.genesis_hash(), Bob.into(), 300);
+	api.set_nonce(api.genesis_hash(), Charlie.into(), 400);
+
+	let header01 = api.push_block(1, vec![], true);
+	let event = new_best_block_event(&pool, None, header01.hash());
+	block_on(pool.maintain(event));
+
+	let xt0 = uxt(Alice, 200);
+	let xt1 = uxt(Alice, 201);
+	let xt2 = uxt(Alice, 202);
+	let xt3 = uxt(Bob, 300);
+	let xt4 = uxt(Charlie, 400);
+
+	api.set_priority(&xt0, 1);
+	api.set_priority(&xt1, 3);
+	api.set_priority(&xt2, 3);
+	api.set_priority(&xt3, 2);
+	api.set_priority(&xt4, 2);
+
+	let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap();
+	let xt1_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap();
+	let xt2_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap();
+	let xt3_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).unwrap();
+
+	assert_ready_iterator!(header01.hash(), pool, [xt3, xt0, xt1, xt2]);
+	assert_pool_status!(header01.hash(), &pool, 4, 0);
+	assert_eq!(pool.mempool_len().1, 4);
+
+	let xt4_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt4.clone())).unwrap();
+	assert_pool_status!(header01.hash(), &pool, 2, 0);
+	assert_ready_iterator!(header01.hash(), pool, [xt3, xt4]);
+
+	assert_watcher_stream!(xt0_watcher, [TransactionStatus::Ready, TransactionStatus::Dropped]);
+	assert_watcher_stream!(xt1_watcher, [TransactionStatus::Ready, TransactionStatus::Dropped]);
+	assert_watcher_stream!(xt2_watcher, [TransactionStatus::Ready, TransactionStatus::Dropped]);
+	assert_watcher_stream!(xt3_watcher, [TransactionStatus::Ready]);
+	assert_watcher_stream!(xt4_watcher, [TransactionStatus::Ready]);
+}
+
+#[test]
+fn fatp_prios_watcher_full_mempool_higher_prio_is_accepted_with_subtree2() {
+	sp_tracing::try_init_simple();
+
+	let builder = TestPoolBuilder::new();
+	let (pool, api, _) = builder.with_mempool_count_limit(4).with_ready_count(4).build();
+	api.set_nonce(api.genesis_hash(), Bob.into(), 300);
+	api.set_nonce(api.genesis_hash(), Charlie.into(), 400);
+
+	let header01 = api.push_block(1, vec![], true);
+	let event = new_best_block_event(&pool, None, header01.hash());
+	block_on(pool.maintain(event));
+
+	let xt0 = uxt(Alice, 200);
+	let xt1 = uxt(Alice, 201);
+	let xt2 = uxt(Alice, 202);
+	let xt3 = uxt(Bob, 300);
+	let xt4 = uxt(Charlie, 400);
+
+	api.set_priority(&xt0, 1);
+	api.set_priority(&xt1, 3);
+	api.set_priority(&xt2, 3);
+	api.set_priority(&xt3, 2);
+	api.set_priority(&xt4, 2);
+
+	let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap();
+	let xt1_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap();
+	let xt2_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap();
+	let xt3_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).unwrap();
+
+	assert_ready_iterator!(header01.hash(), pool, [xt3, xt0, xt1, xt2]);
+	assert_pool_status!(header01.hash(), &pool, 4, 0);
+	assert_eq!(pool.mempool_len().1, 4);
+
+	let header02 = api.push_block_with_parent(header01.hash(), vec![], true);
+	block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header02.hash())));
+
+	let xt4_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt4.clone())).unwrap();
+	assert_ready_iterator!(header01.hash(), pool, [xt3]);
+	assert_pool_status!(header02.hash(), &pool, 2, 0);
+	assert_ready_iterator!(header02.hash(), pool, [xt3, xt4]);
+
+	assert_watcher_stream!(xt0_watcher, [TransactionStatus::Ready, TransactionStatus::Dropped]);
+	assert_watcher_stream!(xt1_watcher, [TransactionStatus::Ready, TransactionStatus::Dropped]);
+	assert_watcher_stream!(xt2_watcher, [TransactionStatus::Ready, TransactionStatus::Dropped]);
+	assert_watcher_stream!(xt3_watcher, [TransactionStatus::Ready]);
+	assert_watcher_stream!(xt4_watcher, [TransactionStatus::Ready]);
+}
+
+#[test]
+fn fatp_prios_watcher_full_mempool_lower_prio_gets_rejected() {
+	sp_tracing::try_init_simple();
+
+	let builder = TestPoolBuilder::new();
+	let (pool, api, _) = builder.with_mempool_count_limit(2).with_ready_count(2).build();
+	api.set_nonce(api.genesis_hash(), Bob.into(), 300);
+	api.set_nonce(api.genesis_hash(), Charlie.into(), 400);
+	api.set_nonce(api.genesis_hash(), Dave.into(), 500);
+
+	let header01 = api.push_block(1, vec![], true);
+	let event = new_best_block_event(&pool, None, header01.hash());
+	block_on(pool.maintain(event));
+
+	let xt0 = uxt(Alice, 200);
+	let xt1 = uxt(Bob, 300);
+	let xt2 = uxt(Charlie, 400);
+	let xt3 = uxt(Dave, 500);
+
+	api.set_priority(&xt0, 2);
+	api.set_priority(&xt1, 2);
+	api.set_priority(&xt2, 2);
+	api.set_priority(&xt3, 1);
+
+	let _xt0_watcher =
+		block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap();
+	let _xt1_watcher =
+		block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap();
+
+	assert_pool_status!(header01.hash(), &pool, 2, 0);
+	assert_eq!(pool.mempool_len().1, 2);
+
+	let header02 = api.push_block_with_parent(header01.hash(), vec![], true);
+	block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header02.hash())));
+
+	assert_pool_status!(header02.hash(), &pool, 2, 0);
+	assert_eq!(pool.mempool_len().1, 2);
+
+	assert_ready_iterator!(header01.hash(), pool, [xt0, xt1]);
+	assert_ready_iterator!(header02.hash(), pool, [xt0, xt1]);
+
+	let result2 = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).map(|_| ());
+	assert!(matches!(result2.as_ref().unwrap_err().0, TxPoolError::ImmediatelyDropped));
+	let result3 = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).map(|_| ());
+	assert!(matches!(result3.as_ref().unwrap_err().0, TxPoolError::ImmediatelyDropped));
+}
+
+#[test]
+fn fatp_prios_watcher_full_mempool_does_not_keep_dropped_transaction() {
+	sp_tracing::try_init_simple();
+
+	let builder = TestPoolBuilder::new();
+	let (pool, api, _) = builder.with_mempool_count_limit(4).with_ready_count(2).build();
+	api.set_nonce(api.genesis_hash(), Bob.into(), 300);
+	api.set_nonce(api.genesis_hash(), Charlie.into(), 400);
+	api.set_nonce(api.genesis_hash(), Dave.into(), 500);
+
+	let header01 = api.push_block(1, vec![], true);
+	let event = new_best_block_event(&pool, None, header01.hash());
+	block_on(pool.maintain(event));
+
+	let xt0 = uxt(Alice, 200);
+	let xt1 = uxt(Bob, 300);
+	let xt2 = uxt(Charlie, 400);
+	let xt3 = uxt(Dave, 500);
+
+	api.set_priority(&xt0, 2);
+	api.set_priority(&xt1, 2);
+	api.set_priority(&xt2, 2);
+	api.set_priority(&xt3, 2);
+
+	let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap();
+	let xt1_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap();
+	let xt2_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap();
+	let xt3_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).unwrap();
+
+	assert_pool_status!(header01.hash(), &pool, 2, 0);
+	assert_ready_iterator!(header01.hash(), pool, [xt2, xt3]);
+
+	assert_watcher_stream!(xt0_watcher, [TransactionStatus::Ready, TransactionStatus::Dropped]);
+	assert_watcher_stream!(xt1_watcher, [TransactionStatus::Ready, TransactionStatus::Dropped]);
+	assert_watcher_stream!(xt2_watcher, [TransactionStatus::Ready]);
+	assert_watcher_stream!(xt3_watcher, [TransactionStatus::Ready]);
+}
+
+#[test]
+fn fatp_prios_submit_local_full_mempool_higher_prio_is_accepted() {
+	sp_tracing::try_init_simple();
+
+	let builder = TestPoolBuilder::new();
+	let (pool, api, _) = builder.with_mempool_count_limit(4).with_ready_count(2).build();
+	api.set_nonce(api.genesis_hash(), Bob.into(), 300);
+	api.set_nonce(api.genesis_hash(), Charlie.into(), 400);
+	api.set_nonce(api.genesis_hash(), Dave.into(), 500);
+	api.set_nonce(api.genesis_hash(), Eve.into(), 600);
+	api.set_nonce(api.genesis_hash(), Ferdie.into(), 700);
+
+	let header01 = api.push_block(1, vec![], true);
+	let event = new_best_block_event(&pool, None, header01.hash());
+	block_on(pool.maintain(event));
+
+	let xt0 = uxt(Alice, 200);
+	let xt1 = uxt(Bob, 300);
+	let xt2 = uxt(Charlie, 400);
+
+	let xt3 = uxt(Dave, 500);
+
+	let xt4 = uxt(Eve, 600);
+	let xt5 = uxt(Ferdie, 700);
+
+	api.set_priority(&xt0, 1);
+	api.set_priority(&xt1, 2);
+	api.set_priority(&xt2, 3);
+	api.set_priority(&xt3, 4);
+
+	api.set_priority(&xt4, 5);
+	api.set_priority(&xt5, 6);
+	pool.submit_local(invalid_hash(), xt0.clone()).unwrap();
+	pool.submit_local(invalid_hash(), xt1.clone()).unwrap();
+
+	assert_pool_status!(header01.hash(), &pool, 2, 0);
+	assert_eq!(pool.mempool_len().0, 2);
+
+	let header02 = api.push_block_with_parent(header01.hash(), vec![], true);
+	block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header02.hash())));
+
+	pool.submit_local(invalid_hash(), xt2.clone()).unwrap();
+	pool.submit_local(invalid_hash(), xt3.clone()).unwrap();
+
+	assert_pool_status!(header02.hash(), &pool, 2, 0);
+	assert_eq!(pool.mempool_len().0, 4);
+
+	let header03 = api.push_block_with_parent(header02.hash(), vec![], true);
+	block_on(pool.maintain(new_best_block_event(&pool, Some(header02.hash()), header03.hash())));
+
+	pool.submit_local(invalid_hash(), xt4.clone()).unwrap();
+	pool.submit_local(invalid_hash(), xt5.clone()).unwrap();
+
+	assert_pool_status!(header03.hash(), &pool, 2, 0);
+	assert_eq!(pool.mempool_len().0, 4);
+
+	assert_ready_iterator!(header01.hash(), pool, []);
+	assert_ready_iterator!(header02.hash(), pool, [xt3, xt2]);
+	assert_ready_iterator!(header03.hash(), pool, [xt5, xt4]);
+}
diff --git a/substrate/client/transaction-pool/tests/pool.rs b/substrate/client/transaction-pool/tests/pool.rs
index de35726435f..c70f4548331 100644
--- a/substrate/client/transaction-pool/tests/pool.rs
+++ b/substrate/client/transaction-pool/tests/pool.rs
@@ -158,6 +158,7 @@ fn prune_tags_should_work() {
 	let (pool, api) = pool();
 	let hash209 =
 		block_on(pool.submit_one(&api.expect_hash_and_number(0), TSOURCE, uxt(Alice, 209).into()))
+			.map(|o| o.hash())
 			.unwrap();
 	block_on(pool.submit_one(&api.expect_hash_and_number(0), TSOURCE, uxt(Alice, 210).into()))
 		.unwrap();
@@ -184,10 +185,13 @@ fn prune_tags_should_work() {
 fn should_ban_invalid_transactions() {
 	let (pool, api) = pool();
 	let uxt = Arc::from(uxt(Alice, 209));
-	let hash =
-		block_on(pool.submit_one(&api.expect_hash_and_number(0), TSOURCE, uxt.clone())).unwrap();
+	let hash = block_on(pool.submit_one(&api.expect_hash_and_number(0), TSOURCE, uxt.clone()))
+		.unwrap()
+		.hash();
 	pool.validated_pool().remove_invalid(&[hash]);
-	block_on(pool.submit_one(&api.expect_hash_and_number(0), TSOURCE, uxt.clone())).unwrap_err();
+	block_on(pool.submit_one(&api.expect_hash_and_number(0), TSOURCE, uxt.clone()))
+		.map(|_| ())
+		.unwrap_err();
 
 	// when
 	let pending: Vec<_> = pool
@@ -198,7 +202,9 @@ fn should_ban_invalid_transactions() {
 	assert_eq!(pending, Vec::<Nonce>::new());
 
 	// then
-	block_on(pool.submit_one(&api.expect_hash_and_number(0), TSOURCE, uxt.clone())).unwrap_err();
+	block_on(pool.submit_one(&api.expect_hash_and_number(0), TSOURCE, uxt.clone()))
+		.map(|_| ())
+		.unwrap_err();
 }
 
 #[test]
diff --git a/substrate/test-utils/runtime/transaction-pool/src/lib.rs b/substrate/test-utils/runtime/transaction-pool/src/lib.rs
index 93e5855eefc..f88694fb107 100644
--- a/substrate/test-utils/runtime/transaction-pool/src/lib.rs
+++ b/substrate/test-utils/runtime/transaction-pool/src/lib.rs
@@ -352,9 +352,18 @@ impl ChainApi for TestApi {
 	fn validate_transaction(
 		&self,
 		at: <Self::Block as BlockT>::Hash,
-		_source: TransactionSource,
+		source: TransactionSource,
 		uxt: Arc<<Self::Block as BlockT>::Extrinsic>,
 	) -> Self::ValidationFuture {
+		ready(self.validate_transaction_blocking(at, source, uxt))
+	}
+
+	fn validate_transaction_blocking(
+		&self,
+		at: <Self::Block as BlockT>::Hash,
+		_source: TransactionSource,
+		uxt: Arc<<Self::Block as BlockT>::Extrinsic>,
+	) -> Result<TransactionValidity, Error> {
 		let uxt = (*uxt).clone();
 		self.validation_requests.write().push(uxt.clone());
 		let block_number;
@@ -374,16 +383,12 @@ impl ChainApi for TestApi {
 				// the transaction. (This is not required for this test function, but in real
 				// environment it would fail because of this).
 				if !found_best {
-					return ready(Ok(Err(TransactionValidityError::Invalid(
-						InvalidTransaction::Custom(1),
-					))))
+					return Ok(Err(TransactionValidityError::Invalid(InvalidTransaction::Custom(1))))
 				}
 			},
 			Ok(None) =>
-				return ready(Ok(Err(TransactionValidityError::Invalid(
-					InvalidTransaction::Custom(2),
-				)))),
-			Err(e) => return ready(Err(e)),
+				return Ok(Err(TransactionValidityError::Invalid(InvalidTransaction::Custom(2)))),
+			Err(e) => return Err(e),
 		}
 
 		let (requires, provides) = if let Ok(transfer) = TransferData::try_from(&uxt) {
@@ -423,7 +428,7 @@ impl ChainApi for TestApi {
 
 			if self.enable_stale_check && transfer.nonce < chain_nonce {
 				log::info!("test_api::validate_transaction: invalid_transaction(stale)....");
-				return ready(Ok(Err(TransactionValidityError::Invalid(InvalidTransaction::Stale))))
+				return Ok(Err(TransactionValidityError::Invalid(InvalidTransaction::Stale)))
 			}
 
 			(requires, provides)
@@ -433,7 +438,7 @@ impl ChainApi for TestApi {
 
 		if self.chain.read().invalid_hashes.contains(&self.hash_and_length(&uxt).0) {
 			log::info!("test_api::validate_transaction: invalid_transaction....");
-			return ready(Ok(Err(TransactionValidityError::Invalid(InvalidTransaction::Custom(0)))))
+			return Ok(Err(TransactionValidityError::Invalid(InvalidTransaction::Custom(0))))
 		}
 
 		let priority = self.chain.read().priorities.get(&self.hash_and_length(&uxt).0).cloned();
@@ -447,16 +452,7 @@ impl ChainApi for TestApi {
 
 		(self.valid_modifier.read())(&mut validity);
 
-		ready(Ok(Ok(validity)))
-	}
-
-	fn validate_transaction_blocking(
-		&self,
-		_at: <Self::Block as BlockT>::Hash,
-		_source: TransactionSource,
-		_uxt: Arc<<Self::Block as BlockT>::Extrinsic>,
-	) -> Result<TransactionValidity, Error> {
-		unimplemented!();
+		Ok(Ok(validity))
 	}
 
 	fn block_id_to_number(
-- 
GitLab