diff --git a/prdoc/pr_6647.prdoc b/prdoc/pr_6647.prdoc
new file mode 100644
index 0000000000000000000000000000000000000000..47af9924ef1c077218ebf5b90d17a6b647d96632
--- /dev/null
+++ b/prdoc/pr_6647.prdoc
@@ -0,0 +1,8 @@
+title: '`fatxpool`: proper handling of priorities when mempool is full'
+doc:
+- audience: Node Dev
+  description: |-
+    Higher-priority transactions can now replace lower-priority transactions even when the internal _tx_mem_pool_ is full.
+crates:
+- name: sc-transaction-pool
+  bump: minor
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs
index d69aa37c94a1acf3f8f37622d06f9f4e6209e43e..bf61558b00b0daf01c85e5a8ae4dec525615e7c4 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs
@@ -53,11 +53,13 @@ pub struct DroppedTransaction<Hash> {
 }
 
 impl<Hash> DroppedTransaction<Hash> {
-	fn new_usurped(tx_hash: Hash, by: Hash) -> Self {
+	/// Creates a new instance with reason set to `DroppedReason::Usurped(by)`.
+	pub fn new_usurped(tx_hash: Hash, by: Hash) -> Self {
 		Self { reason: DroppedReason::Usurped(by), tx_hash }
 	}
 
-	fn new_enforced_by_limts(tx_hash: Hash) -> Self {
+	/// Creates a new instance with reason set to `DroppedReason::LimitsEnforced`.
+	pub fn new_enforced_by_limts(tx_hash: Hash) -> Self {
 		Self { reason: DroppedReason::LimitsEnforced, tx_hash }
 	}
 }
@@ -256,11 +258,13 @@ where
 				self.future_transaction_views.entry(tx_hash).or_default().insert(block_hash);
 			},
 			TransactionStatus::Ready | TransactionStatus::InBlock(..) => {
-				// note: if future transaction was once seens as the ready we may want to treat it
-				// as ready transactions. Unreferenced future transactions are more likely to be
-				// removed when the last referencing view is removed then ready transactions.
-				// Transcaction seen as ready is likely quite close to be included in some
-				// future fork.
+				// note: if future transaction was once seen as the ready we may want to treat it
+				// as ready transaction. The rationale behind this is as follows: we want to remove
+				// unreferenced future transactions when the last referencing view is removed (to
+				// avoid clogging mempool). For ready transactions we prefer to keep them in mempool
+				// even if no view is currently referencing them. Future transcaction once seen as
+				// ready is likely quite close to be included in some future fork (it is close to be
+				// ready, so we make exception and treat such transaction as ready).
 				if let Some(mut views) = self.future_transaction_views.remove(&tx_hash) {
 					views.insert(block_hash);
 					self.ready_transaction_views.insert(tx_hash, views);
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs
index e57256943ccfe37d771e9f7de2d4f17b2798c919..766045718252007455e4b781335d1c0261077a0f 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs
@@ -31,7 +31,10 @@ use crate::{
 	api::FullChainApi,
 	common::log_xt::log_xt_trace,
 	enactment_state::{EnactmentAction, EnactmentState},
-	fork_aware_txpool::{dropped_watcher::DroppedReason, revalidation_worker},
+	fork_aware_txpool::{
+		dropped_watcher::{DroppedReason, DroppedTransaction},
+		revalidation_worker,
+	},
 	graph::{
 		self,
 		base_pool::{TimedTransactionSource, Transaction},
@@ -49,14 +52,16 @@ use futures::{
 use parking_lot::Mutex;
 use prometheus_endpoint::Registry as PrometheusRegistry;
 use sc_transaction_pool_api::{
-	ChainEvent, ImportNotificationStream, MaintainedTransactionPool, PoolStatus, TransactionFor,
-	TransactionPool, TransactionSource, TransactionStatusStreamFor, TxHash,
+	error::Error as TxPoolApiError, ChainEvent, ImportNotificationStream,
+	MaintainedTransactionPool, PoolStatus, TransactionFor, TransactionPool, TransactionPriority,
+	TransactionSource, TransactionStatusStreamFor, TxHash,
 };
 use sp_blockchain::{HashAndNumber, TreeRoute};
 use sp_core::traits::SpawnEssentialNamed;
 use sp_runtime::{
 	generic::BlockId,
 	traits::{Block as BlockT, NumberFor},
+	transaction_validity::{TransactionValidityError, ValidTransaction},
 };
 use std::{
 	collections::{HashMap, HashSet},
@@ -287,7 +292,7 @@ where
 				DroppedReason::LimitsEnforced => {},
 			};
 
-			mempool.remove_dropped_transaction(&dropped_tx_hash).await;
+			mempool.remove_transaction(&dropped_tx_hash);
 			view_store.listener.transaction_dropped(dropped);
 			import_notification_sink.clean_notified_items(&[dropped_tx_hash]);
 		}
@@ -598,7 +603,7 @@ where
 /// out:
 /// [ Ok(xth0), Ok(xth1), Err ]
 /// ```
-fn reduce_multiview_result<H, E>(input: HashMap<H, Vec<Result<H, E>>>) -> Vec<Result<H, E>> {
+fn reduce_multiview_result<H, D, E>(input: HashMap<H, Vec<Result<D, E>>>) -> Vec<Result<D, E>> {
 	let mut values = input.values();
 	let Some(first) = values.next() else {
 		return Default::default();
@@ -650,9 +655,28 @@ where
 		let mempool_results = self.mempool.extend_unwatched(source, &xts);
 
 		if view_store.is_empty() {
-			return Ok(mempool_results.into_iter().map(|r| r.map(|r| r.hash)).collect::<Vec<_>>())
+			return Ok(mempool_results
+				.into_iter()
+				.map(|r| r.map(|r| r.hash).map_err(Into::into))
+				.collect::<Vec<_>>())
 		}
 
+		// Submit all the transactions to the mempool
+		let retries = mempool_results
+			.into_iter()
+			.zip(xts.clone())
+			.map(|(result, xt)| async move {
+				match result {
+					Err(TxPoolApiError::ImmediatelyDropped) =>
+						self.attempt_transaction_replacement(source, false, xt).await,
+					_ => result,
+				}
+			})
+			.collect::<Vec<_>>();
+
+		let mempool_results = futures::future::join_all(retries).await;
+
+		// Collect transactions that were successfully submitted to the mempool...
 		let to_be_submitted = mempool_results
 			.iter()
 			.zip(xts)
@@ -664,22 +688,47 @@ where
 		self.metrics
 			.report(|metrics| metrics.submitted_transactions.inc_by(to_be_submitted.len() as _));
 
+		// ... and submit them to the view_store. Please note that transactions rejected by mempool
+		// are not sent here.
 		let mempool = self.mempool.clone();
 		let results_map = view_store.submit(to_be_submitted.into_iter()).await;
 		let mut submission_results = reduce_multiview_result(results_map).into_iter();
 
+		// Note for composing final result:
+		//
+		// For each failed insertion into the mempool, the mempool result should be placed into
+		// the returned vector.
+		//
+		// For each successful insertion into the mempool, the corresponding
+		// view_store submission result needs to be examined:
+		// - If there is an error during view_store submission, the transaction is removed from
+		// the mempool, and the final result recorded in the vector for this transaction is the
+		// view_store submission error.
+		//
+		// - If the view_store submission is successful, the transaction priority is updated in the
+		// mempool.
+		//
+		// Finally, it collects the hashes of updated transactions or submission errors (either
+		// from the mempool or view_store) into a returned vector.
 		Ok(mempool_results
 				.into_iter()
 				.map(|result| {
-					result.and_then(|insertion| {
-						submission_results
-							.next()
-							.expect("The number of Ok results in mempool is exactly the same as the size of to-views-submission result. qed.")
-							.inspect_err(|_|
-								mempool.remove(insertion.hash)
-							)
+					result
+						.map_err(Into::into)
+						.and_then(|insertion| {
+							submission_results
+								.next()
+								.expect("The number of Ok results in mempool is exactly the same as the size of view_store submission result. qed.")
+								.inspect_err(|_|{
+									mempool.remove_transaction(&insertion.hash);
+								})
 					})
+
 				})
+				.map(|r| r.map(|r| {
+					mempool.update_transaction_priority(&r);
+					r.hash()
+				}))
 				.collect::<Vec<_>>())
 	}
 
@@ -712,10 +761,13 @@ where
 	) -> Result<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error> {
 		log::trace!(target: LOG_TARGET, "[{:?}] fatp::submit_and_watch views:{}", self.tx_hash(&xt), self.active_views_count());
 		let xt = Arc::from(xt);
-		let InsertionInfo { hash: xt_hash, source: timed_source } =
+
+		let InsertionInfo { hash: xt_hash, source: timed_source, .. } =
 			match self.mempool.push_watched(source, xt.clone()) {
 				Ok(result) => result,
-				Err(e) => return Err(e),
+				Err(TxPoolApiError::ImmediatelyDropped) =>
+					self.attempt_transaction_replacement(source, true, xt.clone()).await?,
+				Err(e) => return Err(e.into()),
 			};
 
 		self.metrics.report(|metrics| metrics.submitted_transactions.inc());
@@ -723,7 +775,13 @@ where
 		self.view_store
 			.submit_and_watch(at, timed_source, xt)
 			.await
-			.inspect_err(|_| self.mempool.remove(xt_hash))
+			.inspect_err(|_| {
+				self.mempool.remove_transaction(&xt_hash);
+			})
+			.map(|mut outcome| {
+				self.mempool.update_transaction_priority(&outcome);
+				outcome.expect_watcher()
+			})
 	}
 
 	/// Intended to remove transactions identified by the given hashes, and any dependent
@@ -828,22 +886,16 @@ where
 	}
 }
 
-impl<Block, Client> sc_transaction_pool_api::LocalTransactionPool
-	for ForkAwareTxPool<FullChainApi<Client, Block>, Block>
+impl<ChainApi, Block> sc_transaction_pool_api::LocalTransactionPool
+	for ForkAwareTxPool<ChainApi, Block>
 where
 	Block: BlockT,
+	ChainApi: 'static + graph::ChainApi<Block = Block>,
 	<Block as BlockT>::Hash: Unpin,
-	Client: sp_api::ProvideRuntimeApi<Block>
-		+ sc_client_api::BlockBackend<Block>
-		+ sc_client_api::blockchain::HeaderBackend<Block>
-		+ sp_runtime::traits::BlockIdTo<Block>
-		+ sp_blockchain::HeaderMetadata<Block, Error = sp_blockchain::Error>,
-	Client: Send + Sync + 'static,
-	Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
 {
 	type Block = Block;
-	type Hash = ExtrinsicHash<FullChainApi<Client, Block>>;
-	type Error = <FullChainApi<Client, Block> as graph::ChainApi>::Error;
+	type Hash = ExtrinsicHash<ChainApi>;
+	type Error = ChainApi::Error;
 
 	fn submit_local(
 		&self,
@@ -852,12 +904,29 @@ where
 	) -> Result<Self::Hash, Self::Error> {
 		log::debug!(target: LOG_TARGET, "fatp::submit_local views:{}", self.active_views_count());
 		let xt = Arc::from(xt);
-		let InsertionInfo { hash: xt_hash, .. } = self
-			.mempool
-			.extend_unwatched(TransactionSource::Local, &[xt.clone()])
-			.remove(0)?;
 
-		self.view_store.submit_local(xt).or_else(|_| Ok(xt_hash))
+		let result =
+			self.mempool.extend_unwatched(TransactionSource::Local, &[xt.clone()]).remove(0);
+
+		let insertion = match result {
+			Err(TxPoolApiError::ImmediatelyDropped) => self.attempt_transaction_replacement_sync(
+				TransactionSource::Local,
+				false,
+				xt.clone(),
+			),
+			_ => result,
+		}?;
+
+		self.view_store
+			.submit_local(xt)
+			.inspect_err(|_| {
+				self.mempool.remove_transaction(&insertion.hash);
+			})
+			.map(|outcome| {
+				self.mempool.update_transaction_priority(&outcome);
+				outcome.hash()
+			})
+			.or_else(|_| Ok(insertion.hash))
 	}
 }
 
@@ -1109,7 +1178,11 @@ where
 			.await
 			.into_iter()
 			.zip(hashes)
-			.map(|(result, tx_hash)| result.or_else(|_| Err(tx_hash)))
+			.map(|(result, tx_hash)| {
+				result
+					.map(|outcome| self.mempool.update_transaction_priority(&outcome.into()))
+					.or_else(|_| Err(tx_hash))
+			})
 			.collect::<Vec<_>>();
 
 		let submitted_count = watched_results.len();
@@ -1131,7 +1204,7 @@ where
 			for result in watched_results {
 				if let Err(tx_hash) = result {
 					self.view_store.listener.invalidate_transactions(&[tx_hash]);
-					self.mempool.remove(tx_hash);
+					self.mempool.remove_transaction(&tx_hash);
 				}
 			}
 		}
@@ -1263,6 +1336,101 @@ where
 	fn tx_hash(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
 		self.api.hash_and_length(xt).0
 	}
+
+	/// Attempts to find and replace a lower-priority transaction in the transaction pool with a new
+	/// one.
+	///
+	/// This asynchronous function verifies the new transaction against the most recent view. If a
+	/// transaction with a lower priority exists in the transaction pool, it is replaced with the
+	/// new transaction.
+	///
+	/// If no lower-priority transaction is found, the function returns an error indicating the
+	/// transaction was dropped immediately.
+	async fn attempt_transaction_replacement(
+		&self,
+		source: TransactionSource,
+		watched: bool,
+		xt: ExtrinsicFor<ChainApi>,
+	) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, TxPoolApiError> {
+		let at = self
+			.view_store
+			.most_recent_view
+			.read()
+			.ok_or(TxPoolApiError::ImmediatelyDropped)?;
+
+		let (best_view, _) = self
+			.view_store
+			.get_view_at(at, false)
+			.ok_or(TxPoolApiError::ImmediatelyDropped)?;
+
+		let (xt_hash, validated_tx) = best_view
+			.pool
+			.verify_one(
+				best_view.at.hash,
+				best_view.at.number,
+				TimedTransactionSource::from_transaction_source(source, false),
+				xt.clone(),
+				crate::graph::CheckBannedBeforeVerify::Yes,
+			)
+			.await;
+
+		let Some(priority) = validated_tx.priority() else {
+			return Err(TxPoolApiError::ImmediatelyDropped)
+		};
+
+		self.attempt_transaction_replacement_inner(xt, xt_hash, priority, source, watched)
+	}
+
+	/// Sync version of [`Self::attempt_transaction_replacement`].
+	fn attempt_transaction_replacement_sync(
+		&self,
+		source: TransactionSource,
+		watched: bool,
+		xt: ExtrinsicFor<ChainApi>,
+	) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, TxPoolApiError> {
+		let at = self
+			.view_store
+			.most_recent_view
+			.read()
+			.ok_or(TxPoolApiError::ImmediatelyDropped)?;
+
+		let ValidTransaction { priority, .. } = self
+			.api
+			.validate_transaction_blocking(at, TransactionSource::Local, Arc::from(xt.clone()))
+			.map_err(|_| TxPoolApiError::ImmediatelyDropped)?
+			.map_err(|e| match e {
+				TransactionValidityError::Invalid(i) => TxPoolApiError::InvalidTransaction(i),
+				TransactionValidityError::Unknown(u) => TxPoolApiError::UnknownTransaction(u),
+			})?;
+		let xt_hash = self.hash_of(&xt);
+		self.attempt_transaction_replacement_inner(xt, xt_hash, priority, source, watched)
+	}
+
+	fn attempt_transaction_replacement_inner(
+		&self,
+		xt: ExtrinsicFor<ChainApi>,
+		tx_hash: ExtrinsicHash<ChainApi>,
+		priority: TransactionPriority,
+		source: TransactionSource,
+		watched: bool,
+	) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, TxPoolApiError> {
+		let insertion_info =
+			self.mempool.try_insert_with_replacement(xt, priority, source, watched)?;
+
+		for worst_hash in &insertion_info.removed {
+			log::trace!(target: LOG_TARGET, "removed: {worst_hash:?} replaced by {tx_hash:?}");
+			self.view_store
+				.listener
+				.transaction_dropped(DroppedTransaction::new_enforced_by_limts(*worst_hash));
+
+			self.view_store
+				.remove_transaction_subtree(*worst_hash, |listener, removed_tx_hash| {
+					listener.limits_enforced(&removed_tx_hash);
+				});
+		}
+
+		return Ok(insertion_info)
+	}
 }
 
 #[async_trait]
@@ -1410,7 +1578,7 @@ mod reduce_multiview_result_tests {
 	fn empty() {
 		sp_tracing::try_init_simple();
 		let input = HashMap::default();
-		let r = reduce_multiview_result::<H256, Error>(input);
+		let r = reduce_multiview_result::<H256, H256, Error>(input);
 		assert!(r.is_empty());
 	}
 
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs
index 989ae4425dc4832e4549f37775f0cacdcae80cce..c8a4d0c72dd36ab933f2713cf956e2752973c92c 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs
@@ -26,7 +26,10 @@
 //!   it), while on other forks tx can be valid. Depending on which view is chosen to be cloned,
 //!   such transaction could not be present in the newly created view.
 
-use super::{metrics::MetricsLink as PrometheusMetrics, multi_view_listener::MultiViewListener};
+use super::{
+	metrics::MetricsLink as PrometheusMetrics, multi_view_listener::MultiViewListener,
+	view_store::ViewStoreSubmitOutcome,
+};
 use crate::{
 	common::log_xt::log_xt_trace,
 	graph,
@@ -35,15 +38,20 @@ use crate::{
 };
 use futures::FutureExt;
 use itertools::Itertools;
-use sc_transaction_pool_api::TransactionSource;
+use parking_lot::RwLock;
+use sc_transaction_pool_api::{TransactionPriority, TransactionSource};
 use sp_blockchain::HashAndNumber;
 use sp_runtime::{
 	traits::Block as BlockT,
 	transaction_validity::{InvalidTransaction, TransactionValidityError},
 };
 use std::{
+	cmp::Ordering,
 	collections::HashMap,
-	sync::{atomic, atomic::AtomicU64, Arc},
+	sync::{
+		atomic::{self, AtomicU64},
+		Arc,
+	},
 	time::Instant,
 };
 
@@ -77,6 +85,9 @@ where
 	source: TimedTransactionSource,
 	/// When the transaction was revalidated, used to periodically revalidate the mem pool buffer.
 	validated_at: AtomicU64,
+	/// Priority of transaction at some block. It is assumed it will not be changed often. None if
+	/// not known.
+	priority: RwLock<Option<TransactionPriority>>,
 	//todo: we need to add future / ready status at finalized block.
 	//If future transactions are stuck in tx_mem_pool (due to limits being hit), we need a means
 	// to replace them somehow with newly coming transactions.
@@ -101,23 +112,50 @@ where
 
 	/// Creates a new instance of wrapper for unwatched transaction.
 	fn new_unwatched(source: TransactionSource, tx: ExtrinsicFor<ChainApi>, bytes: usize) -> Self {
-		Self {
-			watched: false,
-			tx,
-			source: TimedTransactionSource::from_transaction_source(source, true),
-			validated_at: AtomicU64::new(0),
-			bytes,
-		}
+		Self::new(false, source, tx, bytes)
 	}
 
 	/// Creates a new instance of wrapper for watched transaction.
 	fn new_watched(source: TransactionSource, tx: ExtrinsicFor<ChainApi>, bytes: usize) -> Self {
+		Self::new(true, source, tx, bytes)
+	}
+
+	/// Creates a new instance of wrapper for a transaction with no priority.
+	fn new(
+		watched: bool,
+		source: TransactionSource,
+		tx: ExtrinsicFor<ChainApi>,
+		bytes: usize,
+	) -> Self {
+		Self::new_with_optional_priority(watched, source, tx, bytes, None)
+	}
+
+	/// Creates a new instance of wrapper for a transaction with given priority.
+	fn new_with_priority(
+		watched: bool,
+		source: TransactionSource,
+		tx: ExtrinsicFor<ChainApi>,
+		bytes: usize,
+		priority: TransactionPriority,
+	) -> Self {
+		Self::new_with_optional_priority(watched, source, tx, bytes, Some(priority))
+	}
+
+	/// Creates a new instance of wrapper for a transaction with optional priority.
+	fn new_with_optional_priority(
+		watched: bool,
+		source: TransactionSource,
+		tx: ExtrinsicFor<ChainApi>,
+		bytes: usize,
+		priority: Option<TransactionPriority>,
+	) -> Self {
 		Self {
-			watched: true,
+			watched,
 			tx,
 			source: TimedTransactionSource::from_transaction_source(source, true),
 			validated_at: AtomicU64::new(0),
 			bytes,
+			priority: priority.into(),
 		}
 	}
 
@@ -132,6 +170,11 @@ where
 	pub(crate) fn source(&self) -> TimedTransactionSource {
 		self.source.clone()
 	}
+
+	/// Returns the priority of the transaction.
+	pub(crate) fn priority(&self) -> Option<TransactionPriority> {
+		*self.priority.read()
+	}
 }
 
 impl<ChainApi, Block> Size for Arc<TxInMemPool<ChainApi, Block>>
@@ -191,11 +234,15 @@ where
 pub(super) struct InsertionInfo<Hash> {
 	pub(super) hash: Hash,
 	pub(super) source: TimedTransactionSource,
+	pub(super) removed: Vec<Hash>,
 }
 
 impl<Hash> InsertionInfo<Hash> {
 	fn new(hash: Hash, source: TimedTransactionSource) -> Self {
-		Self { hash, source }
+		Self::new_with_removed(hash, source, Default::default())
+	}
+	fn new_with_removed(hash: Hash, source: TimedTransactionSource, removed: Vec<Hash>) -> Self {
+		Self { hash, source, removed }
 	}
 }
 
@@ -279,27 +326,109 @@ where
 		&self,
 		hash: ExtrinsicHash<ChainApi>,
 		tx: TxInMemPool<ChainApi, Block>,
-	) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, ChainApi::Error> {
-		let bytes = self.transactions.bytes();
+	) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, sc_transaction_pool_api::error::Error> {
 		let mut transactions = self.transactions.write();
+
+		let bytes = self.transactions.bytes();
+
 		let result = match (
-			!self.is_limit_exceeded(transactions.len() + 1, bytes + tx.bytes),
+			self.is_limit_exceeded(transactions.len() + 1, bytes + tx.bytes),
 			transactions.contains_key(&hash),
 		) {
-			(true, false) => {
+			(false, false) => {
 				let source = tx.source();
 				transactions.insert(hash, Arc::from(tx));
 				Ok(InsertionInfo::new(hash, source))
 			},
 			(_, true) =>
-				Err(sc_transaction_pool_api::error::Error::AlreadyImported(Box::new(hash)).into()),
-			(false, _) => Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped.into()),
+				Err(sc_transaction_pool_api::error::Error::AlreadyImported(Box::new(hash))),
+			(true, _) => Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped),
 		};
 		log::trace!(target: LOG_TARGET, "[{:?}] mempool::try_insert: {:?}", hash, result.as_ref().map(|r| r.hash));
 
 		result
 	}
 
+	/// Attempts to insert a new transaction in the memory pool and drop some worse existing
+	/// transactions.
+	///
+	/// A "worse" transaction means transaction with lower priority, or older transaction with the
+	/// same prio.
+	///
+	/// This operation will not overflow the limit of the mempool. It means that cumulative
+	/// size of removed transactions will be equal (or greated) then size of newly inserted
+	/// transaction.
+	///
+	/// Returns a `Result` containing `InsertionInfo` if the new transaction is successfully
+	/// inserted; otherwise, returns an appropriate error indicating the failure.
+	pub(super) fn try_insert_with_replacement(
+		&self,
+		new_tx: ExtrinsicFor<ChainApi>,
+		priority: TransactionPriority,
+		source: TransactionSource,
+		watched: bool,
+	) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, sc_transaction_pool_api::error::Error> {
+		let (hash, length) = self.api.hash_and_length(&new_tx);
+		let new_tx = TxInMemPool::new_with_priority(watched, source, new_tx, length, priority);
+		if new_tx.bytes > self.max_transactions_total_bytes {
+			return Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped);
+		}
+
+		let mut transactions = self.transactions.write();
+
+		if transactions.contains_key(&hash) {
+			return Err(sc_transaction_pool_api::error::Error::AlreadyImported(Box::new(hash)));
+		}
+
+		let mut sorted = transactions
+			.iter()
+			.filter_map(|(h, v)| v.priority().map(|_| (*h, v.clone())))
+			.collect::<Vec<_>>();
+
+		// When pushing higher prio transaction, we need to find a number of lower prio txs, such
+		// that the sum of their bytes is ge then size of new tx. Otherwise we could overflow size
+		// limits. Naive way to do it - rev-sort by priority and eat the tail.
+
+		// reverse (oldest, lowest prio last)
+		sorted.sort_by(|(_, a), (_, b)| match b.priority().cmp(&a.priority()) {
+			Ordering::Equal => match (a.source.timestamp, b.source.timestamp) {
+				(Some(a), Some(b)) => b.cmp(&a),
+				_ => Ordering::Equal,
+			},
+			ordering => ordering,
+		});
+
+		let mut total_size_removed = 0usize;
+		let mut to_be_removed = vec![];
+		let free_bytes = self.max_transactions_total_bytes - self.transactions.bytes();
+
+		loop {
+			let Some((worst_hash, worst_tx)) = sorted.pop() else {
+				return Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped);
+			};
+
+			if worst_tx.priority() >= new_tx.priority() {
+				return Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped);
+			}
+
+			total_size_removed += worst_tx.bytes;
+			to_be_removed.push(worst_hash);
+
+			if free_bytes + total_size_removed >= new_tx.bytes {
+				break;
+			}
+		}
+
+		let source = new_tx.source();
+		transactions.insert(hash, Arc::from(new_tx));
+		for worst_hash in &to_be_removed {
+			transactions.remove(worst_hash);
+		}
+		debug_assert!(!self.is_limit_exceeded(transactions.len(), self.transactions.bytes()));
+
+		Ok(InsertionInfo::new_with_removed(hash, source, to_be_removed))
+	}
+
 	/// Adds a new unwatched transactions to the internal buffer not exceeding the limit.
 	///
 	/// Returns the vector of results for each transaction, the order corresponds to the input
@@ -308,7 +437,8 @@ where
 		&self,
 		source: TransactionSource,
 		xts: &[ExtrinsicFor<ChainApi>],
-	) -> Vec<Result<InsertionInfo<ExtrinsicHash<ChainApi>>, ChainApi::Error>> {
+	) -> Vec<Result<InsertionInfo<ExtrinsicHash<ChainApi>>, sc_transaction_pool_api::error::Error>>
+	{
 		let result = xts
 			.iter()
 			.map(|xt| {
@@ -325,20 +455,11 @@ where
 		&self,
 		source: TransactionSource,
 		xt: ExtrinsicFor<ChainApi>,
-	) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, ChainApi::Error> {
+	) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, sc_transaction_pool_api::error::Error> {
 		let (hash, length) = self.api.hash_and_length(&xt);
 		self.try_insert(hash, TxInMemPool::new_watched(source, xt.clone(), length))
 	}
 
-	/// Removes transaction from the memory pool which are specified by the given list of hashes.
-	pub(super) async fn remove_dropped_transaction(
-		&self,
-		dropped: &ExtrinsicHash<ChainApi>,
-	) -> Option<Arc<TxInMemPool<ChainApi, Block>>> {
-		log::debug!(target: LOG_TARGET, "[{:?}] mempool::remove_dropped_transaction", dropped);
-		self.transactions.write().remove(dropped)
-	}
-
 	/// Clones and returns a `HashMap` of references to all unwatched transactions in the memory
 	/// pool.
 	pub(super) fn clone_unwatched(
@@ -362,9 +483,13 @@ where
 			.collect::<HashMap<_, _>>()
 	}
 
-	/// Removes a transaction from the memory pool based on a given hash.
-	pub(super) fn remove(&self, hash: ExtrinsicHash<ChainApi>) {
-		let _ = self.transactions.write().remove(&hash);
+	/// Removes a transaction with given hash from the memory pool.
+	pub(super) fn remove_transaction(
+		&self,
+		hash: &ExtrinsicHash<ChainApi>,
+	) -> Option<Arc<TxInMemPool<ChainApi, Block>>> {
+		log::debug!(target: LOG_TARGET, "[{hash:?}] mempool::remove_transaction");
+		self.transactions.write().remove(hash)
 	}
 
 	/// Revalidates a batch of transactions against the provided finalized block.
@@ -462,6 +587,17 @@ where
 		});
 		self.listener.invalidate_transactions(&invalid_hashes);
 	}
+
+	/// Updates the priority of transaction stored in mempool using provided view_store submission
+	/// outcome.
+	pub(super) fn update_transaction_priority(&self, outcome: &ViewStoreSubmitOutcome<ChainApi>) {
+		outcome.priority().map(|priority| {
+			self.transactions
+				.write()
+				.get_mut(&outcome.hash())
+				.map(|p| *p.priority.write() = Some(priority))
+		});
+	}
 }
 
 #[cfg(test)]
@@ -583,6 +719,9 @@ mod tx_mem_pool_tests {
 		assert_eq!(mempool.unwatched_and_watched_count(), (10, 5));
 	}
 
+	/// size of large extrinsic
+	const LARGE_XT_SIZE: usize = 1129;
+
 	fn large_uxt(x: usize) -> Extrinsic {
 		ExtrinsicBuilder::new_include_data(vec![x as u8; 1024]).build()
 	}
@@ -592,8 +731,7 @@ mod tx_mem_pool_tests {
 		sp_tracing::try_init_simple();
 		let max = 10;
 		let api = Arc::from(TestApi::default());
-		//size of large extrinsic is: 1129
-		let mempool = TxMemPool::new_test(api.clone(), usize::MAX, max * 1129);
+		let mempool = TxMemPool::new_test(api.clone(), usize::MAX, max * LARGE_XT_SIZE);
 
 		let xts = (0..max).map(|x| Arc::from(large_uxt(x))).collect::<Vec<_>>();
 
@@ -617,4 +755,200 @@ mod tx_mem_pool_tests {
 			sc_transaction_pool_api::error::Error::ImmediatelyDropped
 		));
 	}
+
+	#[test]
+	fn replacing_txs_works_for_same_tx_size() {
+		sp_tracing::try_init_simple();
+		let max = 10;
+		let api = Arc::from(TestApi::default());
+		let mempool = TxMemPool::new_test(api.clone(), usize::MAX, max * LARGE_XT_SIZE);
+
+		let xts = (0..max).map(|x| Arc::from(large_uxt(x))).collect::<Vec<_>>();
+
+		let low_prio = 0u64;
+		let hi_prio = u64::MAX;
+
+		let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1);
+		let (submit_outcomes, hashes): (Vec<_>, Vec<_>) = xts
+			.iter()
+			.map(|t| {
+				let h = api.hash_and_length(t).0;
+				(ViewStoreSubmitOutcome::new(h, Some(low_prio)), h)
+			})
+			.unzip();
+
+		let results = mempool.extend_unwatched(TransactionSource::External, &xts);
+		assert!(results.iter().all(Result::is_ok));
+		assert_eq!(mempool.bytes(), total_xts_bytes);
+
+		submit_outcomes
+			.into_iter()
+			.for_each(|o| mempool.update_transaction_priority(&o));
+
+		let xt = Arc::from(large_uxt(98));
+		let hash = api.hash_and_length(&xt).0;
+		let result = mempool
+			.try_insert_with_replacement(xt, hi_prio, TransactionSource::External, false)
+			.unwrap();
+
+		assert_eq!(result.hash, hash);
+		assert_eq!(result.removed, hashes[0..1]);
+	}
+
+	#[test]
+	fn replacing_txs_removes_proper_size_of_txs() {
+		sp_tracing::try_init_simple();
+		let max = 10;
+		let api = Arc::from(TestApi::default());
+		let mempool = TxMemPool::new_test(api.clone(), usize::MAX, max * LARGE_XT_SIZE);
+
+		let xts = (0..max).map(|x| Arc::from(large_uxt(x))).collect::<Vec<_>>();
+
+		let low_prio = 0u64;
+		let hi_prio = u64::MAX;
+
+		let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1);
+		let (submit_outcomes, hashes): (Vec<_>, Vec<_>) = xts
+			.iter()
+			.map(|t| {
+				let h = api.hash_and_length(t).0;
+				(ViewStoreSubmitOutcome::new(h, Some(low_prio)), h)
+			})
+			.unzip();
+
+		let results = mempool.extend_unwatched(TransactionSource::External, &xts);
+		assert!(results.iter().all(Result::is_ok));
+		assert_eq!(mempool.bytes(), total_xts_bytes);
+		assert_eq!(total_xts_bytes, max * LARGE_XT_SIZE);
+
+		submit_outcomes
+			.into_iter()
+			.for_each(|o| mempool.update_transaction_priority(&o));
+
+		//this one should drop 2 xts (size: 1130):
+		let xt = Arc::from(ExtrinsicBuilder::new_include_data(vec![98 as u8; 1025]).build());
+		let (hash, length) = api.hash_and_length(&xt);
+		assert_eq!(length, 1130);
+		let result = mempool
+			.try_insert_with_replacement(xt, hi_prio, TransactionSource::External, false)
+			.unwrap();
+
+		assert_eq!(result.hash, hash);
+		assert_eq!(result.removed, hashes[0..2]);
+	}
+
+	#[test]
+	fn replacing_txs_removes_proper_size_and_prios() {
+		sp_tracing::try_init_simple();
+		const COUNT: usize = 10;
+		let api = Arc::from(TestApi::default());
+		let mempool = TxMemPool::new_test(api.clone(), usize::MAX, COUNT * LARGE_XT_SIZE);
+
+		let xts = (0..COUNT).map(|x| Arc::from(large_uxt(x))).collect::<Vec<_>>();
+
+		let hi_prio = u64::MAX;
+
+		let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1);
+		let (submit_outcomes, hashes): (Vec<_>, Vec<_>) = xts
+			.iter()
+			.enumerate()
+			.map(|(prio, t)| {
+				let h = api.hash_and_length(t).0;
+				(ViewStoreSubmitOutcome::new(h, Some((COUNT - prio).try_into().unwrap())), h)
+			})
+			.unzip();
+
+		let results = mempool.extend_unwatched(TransactionSource::External, &xts);
+		assert!(results.iter().all(Result::is_ok));
+		assert_eq!(mempool.bytes(), total_xts_bytes);
+
+		submit_outcomes
+			.into_iter()
+			.for_each(|o| mempool.update_transaction_priority(&o));
+
+		//this one should drop 3 xts (each of size 1129)
+		let xt = Arc::from(ExtrinsicBuilder::new_include_data(vec![98 as u8; 2154]).build());
+		let (hash, length) = api.hash_and_length(&xt);
+		// overhead is 105, thus length: 105 + 2154
+		assert_eq!(length, 2 * LARGE_XT_SIZE + 1);
+		let result = mempool
+			.try_insert_with_replacement(xt, hi_prio, TransactionSource::External, false)
+			.unwrap();
+
+		assert_eq!(result.hash, hash);
+		assert!(result.removed.iter().eq(hashes[COUNT - 3..COUNT].iter().rev()));
+	}
+
+	#[test]
+	fn replacing_txs_skips_lower_prio_tx() {
+		sp_tracing::try_init_simple();
+		const COUNT: usize = 10;
+		let api = Arc::from(TestApi::default());
+		let mempool = TxMemPool::new_test(api.clone(), usize::MAX, COUNT * LARGE_XT_SIZE);
+
+		let xts = (0..COUNT).map(|x| Arc::from(large_uxt(x))).collect::<Vec<_>>();
+
+		let hi_prio = 100u64;
+		let low_prio = 10u64;
+
+		let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1);
+		let submit_outcomes = xts
+			.iter()
+			.map(|t| {
+				let h = api.hash_and_length(t).0;
+				ViewStoreSubmitOutcome::new(h, Some(hi_prio))
+			})
+			.collect::<Vec<_>>();
+
+		let results = mempool.extend_unwatched(TransactionSource::External, &xts);
+		assert!(results.iter().all(Result::is_ok));
+		assert_eq!(mempool.bytes(), total_xts_bytes);
+
+		submit_outcomes
+			.into_iter()
+			.for_each(|o| mempool.update_transaction_priority(&o));
+
+		let xt = Arc::from(large_uxt(98));
+		let result =
+			mempool.try_insert_with_replacement(xt, low_prio, TransactionSource::External, false);
+
+		// lower prio tx is rejected immediately
+		assert!(matches!(
+			result.unwrap_err(),
+			sc_transaction_pool_api::error::Error::ImmediatelyDropped
+		));
+	}
+
+	#[test]
+	fn replacing_txs_is_skipped_if_prios_are_not_set() {
+		sp_tracing::try_init_simple();
+		const COUNT: usize = 10;
+		let api = Arc::from(TestApi::default());
+		let mempool = TxMemPool::new_test(api.clone(), usize::MAX, COUNT * LARGE_XT_SIZE);
+
+		let xts = (0..COUNT).map(|x| Arc::from(large_uxt(x))).collect::<Vec<_>>();
+
+		let hi_prio = u64::MAX;
+
+		let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1);
+
+		let results = mempool.extend_unwatched(TransactionSource::External, &xts);
+		assert!(results.iter().all(Result::is_ok));
+		assert_eq!(mempool.bytes(), total_xts_bytes);
+
+		//this one could drop 3 xts (each of size 1129)
+		let xt = Arc::from(ExtrinsicBuilder::new_include_data(vec![98 as u8; 2154]).build());
+		let length = api.hash_and_length(&xt).1;
+		// overhead is 105, thus length: 105 + 2154
+		assert_eq!(length, 2 * LARGE_XT_SIZE + 1);
+
+		let result =
+			mempool.try_insert_with_replacement(xt, hi_prio, TransactionSource::External, false);
+
+		// we did not update priorities (update_transaction_priority was not called):
+		assert!(matches!(
+			result.unwrap_err(),
+			sc_transaction_pool_api::error::Error::ImmediatelyDropped
+		));
+	}
 }
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs
index 3cbb8fa4871d03e6ed9ac80be177b1a37af3fe59..a35d68120a3abf5eeb99b1589523e4b609825c32 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs
@@ -28,7 +28,7 @@ use crate::{
 	common::log_xt::log_xt_trace,
 	graph::{
 		self, base_pool::TimedTransactionSource, watcher::Watcher, ExtrinsicFor, ExtrinsicHash,
-		IsValidator, ValidatedTransaction, ValidatedTransactionFor,
+		IsValidator, ValidatedPoolSubmitOutcome, ValidatedTransaction, ValidatedTransactionFor,
 	},
 	LOG_TARGET,
 };
@@ -158,7 +158,7 @@ where
 	pub(super) async fn submit_many(
 		&self,
 		xts: impl IntoIterator<Item = (TimedTransactionSource, ExtrinsicFor<ChainApi>)>,
-	) -> Vec<Result<ExtrinsicHash<ChainApi>, ChainApi::Error>> {
+	) -> Vec<Result<ValidatedPoolSubmitOutcome<ChainApi>, ChainApi::Error>> {
 		if log::log_enabled!(target: LOG_TARGET, log::Level::Trace) {
 			let xts = xts.into_iter().collect::<Vec<_>>();
 			log_xt_trace!(target: LOG_TARGET, xts.iter().map(|(_,xt)| self.pool.validated_pool().api().hash_and_length(xt).0), "[{:?}] view::submit_many at:{}", self.at.hash);
@@ -173,7 +173,7 @@ where
 		&self,
 		source: TimedTransactionSource,
 		xt: ExtrinsicFor<ChainApi>,
-	) -> Result<Watcher<ExtrinsicHash<ChainApi>, ExtrinsicHash<ChainApi>>, ChainApi::Error> {
+	) -> Result<ValidatedPoolSubmitOutcome<ChainApi>, ChainApi::Error> {
 		log::trace!(target: LOG_TARGET, "[{:?}] view::submit_and_watch at:{}", self.pool.validated_pool().api().hash_and_length(&xt).0, self.at.hash);
 		self.pool.submit_and_watch(&self.at, source, xt).await
 	}
@@ -182,7 +182,7 @@ where
 	pub(super) fn submit_local(
 		&self,
 		xt: ExtrinsicFor<ChainApi>,
-	) -> Result<ExtrinsicHash<ChainApi>, ChainApi::Error> {
+	) -> Result<ValidatedPoolSubmitOutcome<ChainApi>, ChainApi::Error> {
 		let (hash, length) = self.pool.validated_pool().api().hash_and_length(&xt);
 		log::trace!(target: LOG_TARGET, "[{:?}] view::submit_local at:{}", hash, self.at.hash);
 
@@ -460,4 +460,18 @@ where
 		const IGNORE_BANNED: bool = false;
 		self.pool.validated_pool().check_is_known(tx_hash, IGNORE_BANNED).is_err()
 	}
+
+	/// Removes the whole transaction subtree from the inner pool.
+	///
+	/// Refer to [`crate::graph::ValidatedPool::remove_subtree`] for more details.
+	pub fn remove_subtree<F>(
+		&self,
+		tx_hash: ExtrinsicHash<ChainApi>,
+		listener_action: F,
+	) -> Vec<ExtrinsicHash<ChainApi>>
+	where
+		F: Fn(&mut crate::graph::Listener<ChainApi>, ExtrinsicHash<ChainApi>),
+	{
+		self.pool.validated_pool().remove_subtree(tx_hash, listener_action)
+	}
 }
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs
index a06c051f0a7eb8be4af9cc285e68ecfd74cb1344..43ed5bbf8869f360f641c1ae9590dfedd56dc89d 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs
@@ -27,7 +27,7 @@ use crate::{
 	graph::{
 		self,
 		base_pool::{TimedTransactionSource, Transaction},
-		ExtrinsicFor, ExtrinsicHash, TransactionFor,
+		BaseSubmitOutcome, ExtrinsicFor, ExtrinsicHash, TransactionFor, ValidatedPoolSubmitOutcome,
 	},
 	ReadyIteratorFor, LOG_TARGET,
 };
@@ -38,20 +38,18 @@ use sc_transaction_pool_api::{error::Error as PoolError, PoolStatus};
 use sp_blockchain::TreeRoute;
 use sp_runtime::{generic::BlockId, traits::Block as BlockT};
 use std::{
-	collections::{hash_map::Entry, HashMap},
+	collections::{hash_map::Entry, HashMap, HashSet},
 	sync::Arc,
 	time::Instant,
 };
 
-/// Helper struct to keep the context for transaction replacements.
+/// Helper struct to maintain the context for pending transaction submission, executed for
+/// newly inserted views.
 #[derive(Clone)]
-struct PendingTxReplacement<ChainApi>
+struct PendingTxSubmission<ChainApi>
 where
 	ChainApi: graph::ChainApi,
 {
-	/// Indicates if the new transaction was already submitted to all the views in the view_store.
-	/// If true, it can be removed after inserting any new view.
-	processed: bool,
 	/// New transaction replacing the old one.
 	xt: ExtrinsicFor<ChainApi>,
 	/// Source of the transaction.
@@ -60,13 +58,84 @@ where
 	watched: bool,
 }
 
-impl<ChainApi> PendingTxReplacement<ChainApi>
+/// Helper type representing the callback allowing to trigger per-transaction events on
+/// `ValidatedPool`'s listener.
+type RemovalListener<ChainApi> =
+	Arc<dyn Fn(&mut crate::graph::Listener<ChainApi>, ExtrinsicHash<ChainApi>) + Send + Sync>;
+
+/// Helper struct to maintain the context for pending transaction removal, executed for
+/// newly inserted views.
+struct PendingTxRemoval<ChainApi>
+where
+	ChainApi: graph::ChainApi,
+{
+	/// Hash of the transaction that will be removed,
+	xt_hash: ExtrinsicHash<ChainApi>,
+	/// Action that shall be executed on underlying `ValidatedPool`'s listener.
+	listener_action: RemovalListener<ChainApi>,
+}
+
+/// This enum represents an action that should be executed on the newly built
+/// view before this view is inserted into the view store.
+enum PreInsertAction<ChainApi>
+where
+	ChainApi: graph::ChainApi,
+{
+	/// Represents the action of submitting a new transaction. Intended to use to handle usurped
+	/// transactions.
+	SubmitTx(PendingTxSubmission<ChainApi>),
+
+	/// Represents the action of removing a subtree of transactions.
+	RemoveSubtree(PendingTxRemoval<ChainApi>),
+}
+
+/// Represents a task awaiting execution, to be performed immediately prior to the view insertion
+/// into the view store.
+struct PendingPreInsertTask<ChainApi>
+where
+	ChainApi: graph::ChainApi,
+{
+	/// The action to be applied when inserting a new view.
+	action: PreInsertAction<ChainApi>,
+	/// Indicates if the action was already applied to all the views in the view_store.
+	/// If true, it can be removed after inserting any new view.
+	processed: bool,
+}
+
+impl<ChainApi> PendingPreInsertTask<ChainApi>
 where
 	ChainApi: graph::ChainApi,
 {
-	/// Creates new unprocessed instance of pending transaction replacement.
-	fn new(xt: ExtrinsicFor<ChainApi>, source: TimedTransactionSource, watched: bool) -> Self {
-		Self { processed: false, xt, source, watched }
+	/// Creates new unprocessed instance of pending transaction submission.
+	fn new_submission_action(
+		xt: ExtrinsicFor<ChainApi>,
+		source: TimedTransactionSource,
+		watched: bool,
+	) -> Self {
+		Self {
+			processed: false,
+			action: PreInsertAction::SubmitTx(PendingTxSubmission { xt, source, watched }),
+		}
+	}
+
+	/// Creates new unprocessed instance of pending transaction removal.
+	fn new_removal_action(
+		xt_hash: ExtrinsicHash<ChainApi>,
+		listener: RemovalListener<ChainApi>,
+	) -> Self {
+		Self {
+			processed: false,
+			action: PreInsertAction::RemoveSubtree(PendingTxRemoval {
+				xt_hash,
+				listener_action: listener,
+			}),
+		}
+	}
+
+	/// Marks a task as done for every view present in view store. Basically means that can be
+	/// removed on new view insertion.
+	fn mark_processed(&mut self) {
+		self.processed = true;
 	}
 }
 
@@ -100,9 +169,20 @@ where
 	/// notifcication threads. It is meant to assure that replaced transaction is also removed from
 	/// newly built views in maintain process.
 	///
-	/// The map's key is hash of replaced extrinsic.
-	pending_txs_replacements:
-		RwLock<HashMap<ExtrinsicHash<ChainApi>, PendingTxReplacement<ChainApi>>>,
+	/// The map's key is hash of actionable extrinsic (to avoid duplicated entries).
+	pending_txs_tasks: RwLock<HashMap<ExtrinsicHash<ChainApi>, PendingPreInsertTask<ChainApi>>>,
+}
+
+/// Type alias to outcome of submission to `ViewStore`.
+pub(super) type ViewStoreSubmitOutcome<ChainApi> =
+	BaseSubmitOutcome<ChainApi, TxStatusStream<ChainApi>>;
+
+impl<ChainApi: graph::ChainApi> From<ValidatedPoolSubmitOutcome<ChainApi>>
+	for ViewStoreSubmitOutcome<ChainApi>
+{
+	fn from(value: ValidatedPoolSubmitOutcome<ChainApi>) -> Self {
+		Self::new(value.hash(), value.priority())
+	}
 }
 
 impl<ChainApi, Block> ViewStore<ChainApi, Block>
@@ -124,7 +204,7 @@ where
 			listener,
 			most_recent_view: RwLock::from(None),
 			dropped_stream_controller,
-			pending_txs_replacements: Default::default(),
+			pending_txs_tasks: Default::default(),
 		}
 	}
 
@@ -132,7 +212,7 @@ where
 	pub(super) async fn submit(
 		&self,
 		xts: impl IntoIterator<Item = (TimedTransactionSource, ExtrinsicFor<ChainApi>)> + Clone,
-	) -> HashMap<Block::Hash, Vec<Result<ExtrinsicHash<ChainApi>, ChainApi::Error>>> {
+	) -> HashMap<Block::Hash, Vec<Result<ViewStoreSubmitOutcome<ChainApi>, ChainApi::Error>>> {
 		let submit_futures = {
 			let active_views = self.active_views.read();
 			active_views
@@ -140,7 +220,16 @@ where
 				.map(|(_, view)| {
 					let view = view.clone();
 					let xts = xts.clone();
-					async move { (view.at.hash, view.submit_many(xts).await) }
+					async move {
+						(
+							view.at.hash,
+							view.submit_many(xts)
+								.await
+								.into_iter()
+								.map(|r| r.map(Into::into))
+								.collect::<Vec<_>>(),
+						)
+					}
 				})
 				.collect::<Vec<_>>()
 		};
@@ -153,7 +242,7 @@ where
 	pub(super) fn submit_local(
 		&self,
 		xt: ExtrinsicFor<ChainApi>,
-	) -> Result<ExtrinsicHash<ChainApi>, ChainApi::Error> {
+	) -> Result<ViewStoreSubmitOutcome<ChainApi>, ChainApi::Error> {
 		let active_views = self
 			.active_views
 			.read()
@@ -168,12 +257,14 @@ where
 			.map(|view| view.submit_local(xt.clone()))
 			.find_or_first(Result::is_ok);
 
-		if let Some(Err(err)) = result {
-			log::trace!(target: LOG_TARGET, "[{:?}] submit_local: err: {}", tx_hash, err);
-			return Err(err)
-		};
-
-		Ok(tx_hash)
+		match result {
+			Some(Err(err)) => {
+				log::trace!(target: LOG_TARGET, "[{:?}] submit_local: err: {}", tx_hash, err);
+				Err(err)
+			},
+			None => Ok(ViewStoreSubmitOutcome::new(tx_hash, None)),
+			Some(Ok(r)) => Ok(r.into()),
+		}
 	}
 
 	/// Import a single extrinsic and starts to watch its progress in the pool.
@@ -188,7 +279,7 @@ where
 		_at: Block::Hash,
 		source: TimedTransactionSource,
 		xt: ExtrinsicFor<ChainApi>,
-	) -> Result<TxStatusStream<ChainApi>, ChainApi::Error> {
+	) -> Result<ViewStoreSubmitOutcome<ChainApi>, ChainApi::Error> {
 		let tx_hash = self.api.hash_and_length(&xt).0;
 		let Some(external_watcher) = self.listener.create_external_watcher_for_tx(tx_hash) else {
 			return Err(PoolError::AlreadyImported(Box::new(tx_hash)).into())
@@ -203,13 +294,13 @@ where
 					let source = source.clone();
 					async move {
 						match view.submit_and_watch(source, xt).await {
-							Ok(watcher) => {
+							Ok(mut result) => {
 								self.listener.add_view_watcher_for_tx(
 									tx_hash,
 									view.at.hash,
-									watcher.into_stream().boxed(),
+									result.expect_watcher().into_stream().boxed(),
 								);
-								Ok(())
+								Ok(result)
 							},
 							Err(e) => Err(e),
 						}
@@ -217,17 +308,20 @@ where
 				})
 				.collect::<Vec<_>>()
 		};
-		let maybe_error = futures::future::join_all(submit_and_watch_futures)
+		let result = futures::future::join_all(submit_and_watch_futures)
 			.await
 			.into_iter()
 			.find_or_first(Result::is_ok);
 
-		if let Some(Err(err)) = maybe_error {
-			log::trace!(target: LOG_TARGET, "[{:?}] submit_and_watch: err: {}", tx_hash, err);
-			return Err(err);
-		};
-
-		Ok(external_watcher)
+		match result {
+			Some(Err(err)) => {
+				log::trace!(target: LOG_TARGET, "[{:?}] submit_and_watch: err: {}", tx_hash, err);
+				return Err(err);
+			},
+			Some(Ok(result)) =>
+				Ok(ViewStoreSubmitOutcome::from(result).with_watcher(external_watcher)),
+			None => Ok(ViewStoreSubmitOutcome::new(tx_hash, None).with_watcher(external_watcher)),
+		}
 	}
 
 	/// Returns the pool status for every active view.
@@ -575,8 +669,12 @@ where
 		replaced: ExtrinsicHash<ChainApi>,
 		watched: bool,
 	) {
-		if let Entry::Vacant(entry) = self.pending_txs_replacements.write().entry(replaced) {
-			entry.insert(PendingTxReplacement::new(xt.clone(), source.clone(), watched));
+		if let Entry::Vacant(entry) = self.pending_txs_tasks.write().entry(replaced) {
+			entry.insert(PendingPreInsertTask::new_submission_action(
+				xt.clone(),
+				source.clone(),
+				watched,
+			));
 		} else {
 			return
 		};
@@ -586,8 +684,8 @@ where
 
 		self.replace_transaction_in_views(source, xt, xt_hash, replaced, watched).await;
 
-		if let Some(replacement) = self.pending_txs_replacements.write().get_mut(&replaced) {
-			replacement.processed = true;
+		if let Some(replacement) = self.pending_txs_tasks.write().get_mut(&replaced) {
+			replacement.mark_processed();
 		}
 	}
 
@@ -596,18 +694,25 @@ where
 	/// After application, all already processed replacements are removed.
 	async fn apply_pending_tx_replacements(&self, view: Arc<View<ChainApi>>) {
 		let mut futures = vec![];
-		for replacement in self.pending_txs_replacements.read().values() {
-			let xt_hash = self.api.hash_and_length(&replacement.xt).0;
-			futures.push(self.replace_transaction_in_view(
-				view.clone(),
-				replacement.source.clone(),
-				replacement.xt.clone(),
-				xt_hash,
-				replacement.watched,
-			));
+		for replacement in self.pending_txs_tasks.read().values() {
+			match replacement.action {
+				PreInsertAction::SubmitTx(ref submission) => {
+					let xt_hash = self.api.hash_and_length(&submission.xt).0;
+					futures.push(self.replace_transaction_in_view(
+						view.clone(),
+						submission.source.clone(),
+						submission.xt.clone(),
+						xt_hash,
+						submission.watched,
+					));
+				},
+				PreInsertAction::RemoveSubtree(ref removal) => {
+					view.remove_subtree(removal.xt_hash, &*removal.listener_action);
+				},
+			}
 		}
 		let _results = futures::future::join_all(futures).await;
-		self.pending_txs_replacements.write().retain(|_, r| r.processed);
+		self.pending_txs_tasks.write().retain(|_, r| r.processed);
 	}
 
 	/// Submits `xt` to the given view.
@@ -623,11 +728,11 @@ where
 	) {
 		if watched {
 			match view.submit_and_watch(source, xt).await {
-				Ok(watcher) => {
+				Ok(mut result) => {
 					self.listener.add_view_watcher_for_tx(
 						xt_hash,
 						view.at.hash,
-						watcher.into_stream().boxed(),
+						result.expect_watcher().into_stream().boxed(),
 					);
 				},
 				Err(e) => {
@@ -690,4 +795,58 @@ where
 		};
 		let _results = futures::future::join_all(submit_futures).await;
 	}
+
+	/// Removes a transaction subtree from every view in the view_store, starting from the given
+	/// transaction hash.
+	///
+	/// This function traverses the dependency graph of transactions and removes the specified
+	/// transaction along with all its descendant transactions from every view.
+	///
+	/// A `listener_action` callback function is invoked for every transaction that is removed,
+	/// providing a reference to the pool's listener and the hash of the removed transaction. This
+	/// allows to trigger the required events. Note that listener may be called multiple times for
+	/// the same hash.
+	///
+	/// Function will also schedule view pre-insertion actions to ensure that transactions will be
+	/// removed from newly created view.
+	///
+	/// Returns a vector containing the hashes of all removed transactions, including the root
+	/// transaction specified by `tx_hash`. Vector contains only unique hashes.
+	pub(super) fn remove_transaction_subtree<F>(
+		&self,
+		xt_hash: ExtrinsicHash<ChainApi>,
+		listener_action: F,
+	) -> Vec<ExtrinsicHash<ChainApi>>
+	where
+		F: Fn(&mut crate::graph::Listener<ChainApi>, ExtrinsicHash<ChainApi>)
+			+ Clone
+			+ Send
+			+ Sync
+			+ 'static,
+	{
+		if let Entry::Vacant(entry) = self.pending_txs_tasks.write().entry(xt_hash) {
+			entry.insert(PendingPreInsertTask::new_removal_action(
+				xt_hash,
+				Arc::from(listener_action.clone()),
+			));
+		};
+
+		let mut seen = HashSet::new();
+
+		let removed = self
+			.active_views
+			.read()
+			.iter()
+			.chain(self.inactive_views.read().iter())
+			.filter(|(_, view)| view.is_imported(&xt_hash))
+			.flat_map(|(_, view)| view.remove_subtree(xt_hash, &listener_action))
+			.filter(|xt_hash| seen.insert(*xt_hash))
+			.collect();
+
+		if let Some(removal_action) = self.pending_txs_tasks.write().get_mut(&xt_hash) {
+			removal_action.mark_processed();
+		}
+
+		removed
+	}
 }
diff --git a/substrate/client/transaction-pool/src/graph/base_pool.rs b/substrate/client/transaction-pool/src/graph/base_pool.rs
index 04eaa998f42e6ece34c6b77dfb0b8433cf59b813..3b4afc88b7897cd063f38ad4d553265aa456bb91 100644
--- a/substrate/client/transaction-pool/src/graph/base_pool.rs
+++ b/substrate/client/transaction-pool/src/graph/base_pool.rs
@@ -453,27 +453,29 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash,
 
 		while ready.is_exceeded(self.ready.len(), self.ready.bytes()) {
 			// find the worst transaction
-			let worst = self.ready.fold::<TransactionRef<Hash, Ex>, _>(|worst, current| {
-				let transaction = &current.transaction;
-				worst
-					.map(|worst| {
-						// Here we don't use `TransactionRef`'s ordering implementation because
-						// while it prefers priority like need here, it also prefers older
-						// transactions for inclusion purposes and limit enforcement needs to prefer
-						// newer transactions instead and drop the older ones.
-						match worst.transaction.priority.cmp(&transaction.transaction.priority) {
-							Ordering::Less => worst,
-							Ordering::Equal =>
-								if worst.insertion_id > transaction.insertion_id {
-									transaction.clone()
-								} else {
-									worst
-								},
-							Ordering::Greater => transaction.clone(),
-						}
-					})
-					.or_else(|| Some(transaction.clone()))
-			});
+			let worst =
+				self.ready.fold::<Option<TransactionRef<Hash, Ex>>, _>(None, |worst, current| {
+					let transaction = &current.transaction;
+					worst
+						.map(|worst| {
+							// Here we don't use `TransactionRef`'s ordering implementation because
+							// while it prefers priority like need here, it also prefers older
+							// transactions for inclusion purposes and limit enforcement needs to
+							// prefer newer transactions instead and drop the older ones.
+							match worst.transaction.priority.cmp(&transaction.transaction.priority)
+							{
+								Ordering::Less => worst,
+								Ordering::Equal =>
+									if worst.insertion_id > transaction.insertion_id {
+										transaction.clone()
+									} else {
+										worst
+									},
+								Ordering::Greater => transaction.clone(),
+							}
+						})
+						.or_else(|| Some(transaction.clone()))
+				});
 
 			if let Some(worst) = worst {
 				removed.append(&mut self.remove_subtree(&[worst.transaction.hash.clone()]))
diff --git a/substrate/client/transaction-pool/src/graph/listener.rs b/substrate/client/transaction-pool/src/graph/listener.rs
index 41daf5491f70941a7d476d3858004d5b190f4970..7b09ee4c640958736d8acfddd5baab572e1bf530 100644
--- a/substrate/client/transaction-pool/src/graph/listener.rs
+++ b/substrate/client/transaction-pool/src/graph/listener.rs
@@ -126,8 +126,8 @@ impl<H: hash::Hash + traits::Member + Serialize + Clone, C: ChainApi> Listener<H
 	}
 
 	/// Transaction was dropped from the pool because of enforcing the limit.
-	pub fn limit_enforced(&mut self, tx: &H) {
-		trace!(target: LOG_TARGET, "[{:?}] Dropped (limit enforced)", tx);
+	pub fn limits_enforced(&mut self, tx: &H) {
+		trace!(target: LOG_TARGET, "[{:?}] Dropped (limits enforced)", tx);
 		self.fire(tx, |watcher| watcher.limit_enforced());
 
 		if let Some(ref sink) = self.dropped_by_limits_sink {
diff --git a/substrate/client/transaction-pool/src/graph/mod.rs b/substrate/client/transaction-pool/src/graph/mod.rs
index d93898b1b22ab7dbeeb910bd80e4da7f644fceae..2114577f4dee74bf51e25a1375b8650cf8f4acf7 100644
--- a/substrate/client/transaction-pool/src/graph/mod.rs
+++ b/substrate/client/transaction-pool/src/graph/mod.rs
@@ -41,6 +41,12 @@ pub use self::pool::{
 	BlockHash, ChainApi, ExtrinsicFor, ExtrinsicHash, NumberFor, Options, Pool, RawExtrinsicFor,
 	TransactionFor, ValidatedTransactionFor,
 };
-pub use validated_pool::{IsValidator, ValidatedTransaction};
+pub use validated_pool::{
+	BaseSubmitOutcome, IsValidator, Listener, ValidatedPoolSubmitOutcome, ValidatedTransaction,
+};
 
+pub(crate) use self::pool::CheckBannedBeforeVerify;
 pub(crate) use listener::DroppedByLimitsEvent;
+
+#[cfg(doc)]
+pub(crate) use validated_pool::ValidatedPool;
diff --git a/substrate/client/transaction-pool/src/graph/pool.rs b/substrate/client/transaction-pool/src/graph/pool.rs
index 4c0ace0b1c73a65072b979a533f8e72c5f2e825b..403712662adae418c0e640824ac1599cf5a5248d 100644
--- a/substrate/client/transaction-pool/src/graph/pool.rs
+++ b/substrate/client/transaction-pool/src/graph/pool.rs
@@ -37,7 +37,7 @@ use std::{
 use super::{
 	base_pool as base,
 	validated_pool::{IsValidator, ValidatedPool, ValidatedTransaction},
-	watcher::Watcher,
+	ValidatedPoolSubmitOutcome,
 };
 
 /// Modification notification event stream type;
@@ -168,7 +168,7 @@ impl Options {
 /// Should we check that the transaction is banned
 /// in the pool, before we verify it?
 #[derive(Copy, Clone)]
-enum CheckBannedBeforeVerify {
+pub(crate) enum CheckBannedBeforeVerify {
 	Yes,
 	No,
 }
@@ -204,7 +204,7 @@ impl<B: ChainApi> Pool<B> {
 		&self,
 		at: &HashAndNumber<B::Block>,
 		xts: impl IntoIterator<Item = (base::TimedTransactionSource, ExtrinsicFor<B>)>,
-	) -> Vec<Result<ExtrinsicHash<B>, B::Error>> {
+	) -> Vec<Result<ValidatedPoolSubmitOutcome<B>, B::Error>> {
 		let validated_transactions = self.verify(at, xts, CheckBannedBeforeVerify::Yes).await;
 		self.validated_pool.submit(validated_transactions.into_values())
 	}
@@ -216,7 +216,7 @@ impl<B: ChainApi> Pool<B> {
 		&self,
 		at: &HashAndNumber<B::Block>,
 		xts: impl IntoIterator<Item = (base::TimedTransactionSource, ExtrinsicFor<B>)>,
-	) -> Vec<Result<ExtrinsicHash<B>, B::Error>> {
+	) -> Vec<Result<ValidatedPoolSubmitOutcome<B>, B::Error>> {
 		let validated_transactions = self.verify(at, xts, CheckBannedBeforeVerify::No).await;
 		self.validated_pool.submit(validated_transactions.into_values())
 	}
@@ -227,7 +227,7 @@ impl<B: ChainApi> Pool<B> {
 		at: &HashAndNumber<B::Block>,
 		source: base::TimedTransactionSource,
 		xt: ExtrinsicFor<B>,
-	) -> Result<ExtrinsicHash<B>, B::Error> {
+	) -> Result<ValidatedPoolSubmitOutcome<B>, B::Error> {
 		let res = self.submit_at(at, std::iter::once((source, xt))).await.pop();
 		res.expect("One extrinsic passed; one result returned; qed")
 	}
@@ -238,7 +238,7 @@ impl<B: ChainApi> Pool<B> {
 		at: &HashAndNumber<B::Block>,
 		source: base::TimedTransactionSource,
 		xt: ExtrinsicFor<B>,
-	) -> Result<Watcher<ExtrinsicHash<B>, ExtrinsicHash<B>>, B::Error> {
+	) -> Result<ValidatedPoolSubmitOutcome<B>, B::Error> {
 		let (_, tx) = self
 			.verify_one(at.hash, at.number, source, xt, CheckBannedBeforeVerify::Yes)
 			.await;
@@ -432,7 +432,7 @@ impl<B: ChainApi> Pool<B> {
 	}
 
 	/// Returns future that validates single transaction at given block.
-	async fn verify_one(
+	pub(crate) async fn verify_one(
 		&self,
 		block_hash: <B::Block as BlockT>::Hash,
 		block_number: NumberFor<B>,
@@ -539,6 +539,7 @@ mod tests {
 				.into(),
 			),
 		)
+		.map(|outcome| outcome.hash())
 		.unwrap();
 
 		// then
@@ -567,7 +568,10 @@ mod tests {
 
 		// when
 		let txs = txs.into_iter().map(|x| (SOURCE, Arc::from(x))).collect::<Vec<_>>();
-		let hashes = block_on(pool.submit_at(&api.expect_hash_and_number(0), txs));
+		let hashes = block_on(pool.submit_at(&api.expect_hash_and_number(0), txs))
+			.into_iter()
+			.map(|r| r.map(|o| o.hash()))
+			.collect::<Vec<_>>();
 		log::debug!("--> {hashes:#?}");
 
 		// then
@@ -591,7 +595,8 @@ mod tests {
 
 		// when
 		pool.validated_pool.ban(&Instant::now(), vec![pool.hash_of(&uxt)]);
-		let res = block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, uxt.into()));
+		let res = block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, uxt.into()))
+			.map(|o| o.hash());
 		assert_eq!(pool.validated_pool().status().ready, 0);
 		assert_eq!(pool.validated_pool().status().future, 0);
 
@@ -614,7 +619,8 @@ mod tests {
 		let uxt = ExtrinsicBuilder::new_include_data(vec![42]).build();
 
 		// when
-		let res = block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, uxt.into()));
+		let res = block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, uxt.into()))
+			.map(|o| o.hash());
 
 		// then
 		assert_matches!(res.unwrap_err(), error::Error::Unactionable);
@@ -642,7 +648,8 @@ mod tests {
 					.into(),
 				),
 			)
-			.unwrap();
+			.unwrap()
+			.hash();
 			let hash1 = block_on(
 				pool.submit_one(
 					&han_of_block0,
@@ -656,7 +663,8 @@ mod tests {
 					.into(),
 				),
 			)
-			.unwrap();
+			.unwrap()
+			.hash();
 			// future doesn't count
 			let _hash = block_on(
 				pool.submit_one(
@@ -671,7 +679,8 @@ mod tests {
 					.into(),
 				),
 			)
-			.unwrap();
+			.unwrap()
+			.hash();
 
 			assert_eq!(pool.validated_pool().status().ready, 2);
 			assert_eq!(pool.validated_pool().status().future, 1);
@@ -704,7 +713,8 @@ mod tests {
 				.into(),
 			),
 		)
-		.unwrap();
+		.unwrap()
+		.hash();
 		let hash2 = block_on(
 			pool.submit_one(
 				&han_of_block0,
@@ -718,7 +728,8 @@ mod tests {
 				.into(),
 			),
 		)
-		.unwrap();
+		.unwrap()
+		.hash();
 		let hash3 = block_on(
 			pool.submit_one(
 				&han_of_block0,
@@ -732,7 +743,8 @@ mod tests {
 				.into(),
 			),
 		)
-		.unwrap();
+		.unwrap()
+		.hash();
 
 		// when
 		pool.validated_pool.clear_stale(&api.expect_hash_and_number(5));
@@ -764,7 +776,8 @@ mod tests {
 				.into(),
 			),
 		)
-		.unwrap();
+		.unwrap()
+		.hash();
 
 		// when
 		block_on(pool.prune_tags(&api.expect_hash_and_number(1), vec![vec![0]], vec![hash1]));
@@ -792,8 +805,9 @@ mod tests {
 		let api = Arc::new(TestApi::default());
 		let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());
 
-		let hash1 =
-			block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, xt.into())).unwrap();
+		let hash1 = block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, xt.into()))
+			.unwrap()
+			.hash();
 		assert_eq!(pool.validated_pool().status().future, 1);
 
 		// when
@@ -810,7 +824,8 @@ mod tests {
 				.into(),
 			),
 		)
-		.unwrap();
+		.unwrap()
+		.hash();
 
 		// then
 		assert_eq!(pool.validated_pool().status().future, 1);
@@ -842,6 +857,7 @@ mod tests {
 				.into(),
 			),
 		)
+		.map(|o| o.hash())
 		.unwrap_err();
 
 		// then
@@ -868,6 +884,7 @@ mod tests {
 				.into(),
 			),
 		)
+		.map(|o| o.hash())
 		.unwrap_err();
 
 		// then
@@ -896,7 +913,8 @@ mod tests {
 					.into(),
 				),
 			)
-			.unwrap();
+			.unwrap()
+			.expect_watcher();
 			assert_eq!(pool.validated_pool().status().ready, 1);
 			assert_eq!(pool.validated_pool().status().future, 0);
 
@@ -933,7 +951,8 @@ mod tests {
 					.into(),
 				),
 			)
-			.unwrap();
+			.unwrap()
+			.expect_watcher();
 			assert_eq!(pool.validated_pool().status().ready, 1);
 			assert_eq!(pool.validated_pool().status().future, 0);
 
@@ -972,7 +991,8 @@ mod tests {
 					.into(),
 				),
 			)
-			.unwrap();
+			.unwrap()
+			.expect_watcher();
 			assert_eq!(pool.validated_pool().status().ready, 0);
 			assert_eq!(pool.validated_pool().status().future, 1);
 
@@ -1011,7 +1031,8 @@ mod tests {
 			});
 			let watcher =
 				block_on(pool.submit_and_watch(&api.expect_hash_and_number(0), SOURCE, uxt.into()))
-					.unwrap();
+					.unwrap()
+					.expect_watcher();
 			assert_eq!(pool.validated_pool().status().ready, 1);
 
 			// when
@@ -1036,7 +1057,8 @@ mod tests {
 			});
 			let watcher =
 				block_on(pool.submit_and_watch(&api.expect_hash_and_number(0), SOURCE, uxt.into()))
-					.unwrap();
+					.unwrap()
+					.expect_watcher();
 			assert_eq!(pool.validated_pool().status().ready, 1);
 
 			// when
@@ -1069,7 +1091,8 @@ mod tests {
 			});
 			let watcher =
 				block_on(pool.submit_and_watch(&api.expect_hash_and_number(0), SOURCE, xt.into()))
-					.unwrap();
+					.unwrap()
+					.expect_watcher();
 			assert_eq!(pool.validated_pool().status().ready, 1);
 
 			// when
@@ -1136,7 +1159,9 @@ mod tests {
 				// after validation `IncludeData` will have priority set to 9001
 				// (validate_transaction mock)
 				let xt = ExtrinsicBuilder::new_include_data(Vec::new()).build();
-				block_on(pool.submit_and_watch(&han_of_block0, SOURCE, xt.into())).unwrap();
+				block_on(pool.submit_and_watch(&han_of_block0, SOURCE, xt.into()))
+					.unwrap()
+					.expect_watcher();
 				assert_eq!(pool.validated_pool().status().ready, 1);
 
 				// after validation `Transfer` will have priority set to 4 (validate_transaction
@@ -1147,8 +1172,9 @@ mod tests {
 					amount: 5,
 					nonce: 0,
 				});
-				let watcher =
-					block_on(pool.submit_and_watch(&han_of_block0, SOURCE, xt.into())).unwrap();
+				let watcher = block_on(pool.submit_and_watch(&han_of_block0, SOURCE, xt.into()))
+					.unwrap()
+					.expect_watcher();
 				assert_eq!(pool.validated_pool().status().ready, 2);
 
 				// when
diff --git a/substrate/client/transaction-pool/src/graph/ready.rs b/substrate/client/transaction-pool/src/graph/ready.rs
index 9061d0e255811b5129ee1c54c28897d732f50368..b8aef99e638dccee26ed5dde4b7c0c738625c24b 100644
--- a/substrate/client/transaction-pool/src/graph/ready.rs
+++ b/substrate/client/transaction-pool/src/graph/ready.rs
@@ -232,12 +232,10 @@ impl<Hash: hash::Hash + Member + Serialize, Ex> ReadyTransactions<Hash, Ex> {
 		Ok(replaced)
 	}
 
-	/// Fold a list of ready transactions to compute a single value.
-	pub fn fold<R, F: FnMut(Option<R>, &ReadyTx<Hash, Ex>) -> Option<R>>(
-		&mut self,
-		f: F,
-	) -> Option<R> {
-		self.ready.read().values().fold(None, f)
+	/// Fold a list of ready transactions to compute a single value using initial value of
+	/// accumulator.
+	pub fn fold<R, F: FnMut(R, &ReadyTx<Hash, Ex>) -> R>(&self, init: R, f: F) -> R {
+		self.ready.read().values().fold(init, f)
 	}
 
 	/// Returns true if given transaction is part of the queue.
diff --git a/substrate/client/transaction-pool/src/graph/tracked_map.rs b/substrate/client/transaction-pool/src/graph/tracked_map.rs
index 6c3bbbf34b553e3e03f47d11317cd9f52aafc847..fe15c6eca308084f18261b8d7ebc003509d3d735 100644
--- a/substrate/client/transaction-pool/src/graph/tracked_map.rs
+++ b/substrate/client/transaction-pool/src/graph/tracked_map.rs
@@ -173,6 +173,11 @@ where
 	pub fn len(&mut self) -> usize {
 		self.inner_guard.len()
 	}
+
+	/// Returns an iterator over all key-value pairs.
+	pub fn iter(&self) -> Iter<'_, K, V> {
+		self.inner_guard.iter()
+	}
 }
 
 #[cfg(test)]
diff --git a/substrate/client/transaction-pool/src/graph/validated_pool.rs b/substrate/client/transaction-pool/src/graph/validated_pool.rs
index 3f7bf4773de7b2654a794382e4656d4de09ed702..bc2b07896dba09b51c4cff506883c89d37e702be 100644
--- a/substrate/client/transaction-pool/src/graph/validated_pool.rs
+++ b/substrate/client/transaction-pool/src/graph/validated_pool.rs
@@ -18,25 +18,22 @@
 
 use std::{
 	collections::{HashMap, HashSet},
-	hash,
 	sync::Arc,
 };
 
 use crate::{common::log_xt::log_xt_trace, LOG_TARGET};
 use futures::channel::mpsc::{channel, Sender};
 use parking_lot::{Mutex, RwLock};
-use sc_transaction_pool_api::{error, PoolStatus, ReadyTransactions};
-use serde::Serialize;
+use sc_transaction_pool_api::{error, PoolStatus, ReadyTransactions, TransactionPriority};
 use sp_blockchain::HashAndNumber;
 use sp_runtime::{
-	traits::{self, SaturatedConversion},
+	traits::SaturatedConversion,
 	transaction_validity::{TransactionTag as Tag, ValidTransaction},
 };
 use std::time::Instant;
 
 use super::{
 	base_pool::{self as base, PruneStatus},
-	listener::Listener,
 	pool::{
 		BlockHash, ChainApi, EventStream, ExtrinsicFor, ExtrinsicHash, Options, TransactionFor,
 	},
@@ -79,12 +76,23 @@ impl<Hash, Ex, Error> ValidatedTransaction<Hash, Ex, Error> {
 			valid_till: at.saturated_into::<u64>().saturating_add(validity.longevity),
 		})
 	}
+
+	/// Returns priority for valid transaction, None if transaction is not valid.
+	pub fn priority(&self) -> Option<TransactionPriority> {
+		match self {
+			ValidatedTransaction::Valid(base::Transaction { priority, .. }) => Some(*priority),
+			_ => None,
+		}
+	}
 }
 
-/// A type of validated transaction stored in the pool.
+/// A type of validated transaction stored in the validated pool.
 pub type ValidatedTransactionFor<B> =
 	ValidatedTransaction<ExtrinsicHash<B>, ExtrinsicFor<B>, <B as ChainApi>::Error>;
 
+/// A type alias representing ValidatedPool listener for given ChainApi type.
+pub type Listener<B> = super::listener::Listener<ExtrinsicHash<B>, B>;
+
 /// A closure that returns true if the local node is a validator that can author blocks.
 #[derive(Clone)]
 pub struct IsValidator(Arc<Box<dyn Fn() -> bool + Send + Sync>>);
@@ -101,12 +109,56 @@ impl From<Box<dyn Fn() -> bool + Send + Sync>> for IsValidator {
 	}
 }
 
+/// Represents the result of `submit` or `submit_and_watch` operations.
+pub struct BaseSubmitOutcome<B: ChainApi, W> {
+	/// The hash of the submitted transaction.
+	hash: ExtrinsicHash<B>,
+	/// A transaction watcher. This is `Some` for `submit_and_watch` and `None` for `submit`.
+	watcher: Option<W>,
+
+	/// The priority of the transaction. Defaults to None if unknown.
+	priority: Option<TransactionPriority>,
+}
+
+/// Type alias to outcome of submission to `ValidatedPool`.
+pub type ValidatedPoolSubmitOutcome<B> =
+	BaseSubmitOutcome<B, Watcher<ExtrinsicHash<B>, ExtrinsicHash<B>>>;
+
+impl<B: ChainApi, W> BaseSubmitOutcome<B, W> {
+	/// Creates a new instance with given hash and priority.
+	pub fn new(hash: ExtrinsicHash<B>, priority: Option<TransactionPriority>) -> Self {
+		Self { hash, priority, watcher: None }
+	}
+
+	/// Sets the transaction watcher.
+	pub fn with_watcher(mut self, watcher: W) -> Self {
+		self.watcher = Some(watcher);
+		self
+	}
+
+	/// Provides priority of submitted transaction.
+	pub fn priority(&self) -> Option<TransactionPriority> {
+		self.priority
+	}
+
+	/// Provides hash of submitted transaction.
+	pub fn hash(&self) -> ExtrinsicHash<B> {
+		self.hash
+	}
+
+	/// Provides a watcher. Should only be called on outcomes of `submit_and_watch`. Otherwise will
+	/// panic (that would mean logical error in program).
+	pub fn expect_watcher(&mut self) -> W {
+		self.watcher.take().expect("watcher was set in submit_and_watch. qed")
+	}
+}
+
 /// Pool that deals with validated transactions.
 pub struct ValidatedPool<B: ChainApi> {
 	api: Arc<B>,
 	is_validator: IsValidator,
 	options: Options,
-	listener: RwLock<Listener<ExtrinsicHash<B>, B>>,
+	listener: RwLock<Listener<B>>,
 	pub(crate) pool: RwLock<base::BasePool<ExtrinsicHash<B>, ExtrinsicFor<B>>>,
 	import_notification_sinks: Mutex<Vec<Sender<ExtrinsicHash<B>>>>,
 	rotator: PoolRotator<ExtrinsicHash<B>>,
@@ -200,7 +252,7 @@ impl<B: ChainApi> ValidatedPool<B> {
 	pub fn submit(
 		&self,
 		txs: impl IntoIterator<Item = ValidatedTransactionFor<B>>,
-	) -> Vec<Result<ExtrinsicHash<B>, B::Error>> {
+	) -> Vec<Result<ValidatedPoolSubmitOutcome<B>, B::Error>> {
 		let results = txs
 			.into_iter()
 			.map(|validated_tx| self.submit_one(validated_tx))
@@ -216,7 +268,7 @@ impl<B: ChainApi> ValidatedPool<B> {
 		results
 			.into_iter()
 			.map(|res| match res {
-				Ok(ref hash) if removed.contains(hash) =>
+				Ok(outcome) if removed.contains(&outcome.hash) =>
 					Err(error::Error::ImmediatelyDropped.into()),
 				other => other,
 			})
@@ -224,9 +276,13 @@ impl<B: ChainApi> ValidatedPool<B> {
 	}
 
 	/// Submit single pre-validated transaction to the pool.
-	fn submit_one(&self, tx: ValidatedTransactionFor<B>) -> Result<ExtrinsicHash<B>, B::Error> {
+	fn submit_one(
+		&self,
+		tx: ValidatedTransactionFor<B>,
+	) -> Result<ValidatedPoolSubmitOutcome<B>, B::Error> {
 		match tx {
 			ValidatedTransaction::Valid(tx) => {
+				let priority = tx.priority;
 				log::trace!(target: LOG_TARGET, "[{:?}] ValidatedPool::submit_one", tx.hash);
 				if !tx.propagate && !(self.is_validator.0)() {
 					return Err(error::Error::Unactionable.into())
@@ -254,7 +310,7 @@ impl<B: ChainApi> ValidatedPool<B> {
 
 				let mut listener = self.listener.write();
 				fire_events(&mut *listener, &imported);
-				Ok(*imported.hash())
+				Ok(ValidatedPoolSubmitOutcome::new(*imported.hash(), Some(priority)))
 			},
 			ValidatedTransaction::Invalid(hash, err) => {
 				log::trace!(target: LOG_TARGET, "[{:?}] ValidatedPool::submit_one invalid: {:?}", hash, err);
@@ -305,7 +361,7 @@ impl<B: ChainApi> ValidatedPool<B> {
 			// run notifications
 			let mut listener = self.listener.write();
 			for h in &removed {
-				listener.limit_enforced(h);
+				listener.limits_enforced(h);
 			}
 
 			removed
@@ -318,7 +374,7 @@ impl<B: ChainApi> ValidatedPool<B> {
 	pub fn submit_and_watch(
 		&self,
 		tx: ValidatedTransactionFor<B>,
-	) -> Result<Watcher<ExtrinsicHash<B>, ExtrinsicHash<B>>, B::Error> {
+	) -> Result<ValidatedPoolSubmitOutcome<B>, B::Error> {
 		match tx {
 			ValidatedTransaction::Valid(tx) => {
 				let hash = self.api.hash_and_length(&tx.data).0;
@@ -326,7 +382,7 @@ impl<B: ChainApi> ValidatedPool<B> {
 				self.submit(std::iter::once(ValidatedTransaction::Valid(tx)))
 					.pop()
 					.expect("One extrinsic passed; one result returned; qed")
-					.map(|_| watcher)
+					.map(|outcome| outcome.with_watcher(watcher))
 			},
 			ValidatedTransaction::Invalid(hash, err) => {
 				self.rotator.ban(&Instant::now(), std::iter::once(hash));
@@ -711,11 +767,42 @@ impl<B: ChainApi> ValidatedPool<B> {
 			listener.future(&f.hash);
 		});
 	}
+
+	/// Removes a transaction subtree from the pool, starting from the given transaction hash.
+	///
+	/// This function traverses the dependency graph of transactions and removes the specified
+	/// transaction along with all its descendant transactions from the pool.
+	///
+	/// A `listener_action` callback function is invoked for every transaction that is removed,
+	/// providing a reference to the pool's listener and the hash of the removed transaction. This
+	/// allows to trigger the required events.
+	///
+	/// Returns a vector containing the hashes of all removed transactions, including the root
+	/// transaction specified by `tx_hash`.
+	pub fn remove_subtree<F>(
+		&self,
+		tx_hash: ExtrinsicHash<B>,
+		listener_action: F,
+	) -> Vec<ExtrinsicHash<B>>
+	where
+		F: Fn(&mut Listener<B>, ExtrinsicHash<B>),
+	{
+		self.pool
+			.write()
+			.remove_subtree(&[tx_hash])
+			.into_iter()
+			.map(|tx| {
+				let removed_tx_hash = tx.hash;
+				let mut listener = self.listener.write();
+				listener_action(&mut *listener, removed_tx_hash);
+				removed_tx_hash
+			})
+			.collect::<Vec<_>>()
+	}
 }
 
-fn fire_events<H, B, Ex>(listener: &mut Listener<H, B>, imported: &base::Imported<H, Ex>)
+fn fire_events<B, Ex>(listener: &mut Listener<B>, imported: &base::Imported<ExtrinsicHash<B>, Ex>)
 where
-	H: hash::Hash + Eq + traits::Member + Serialize,
 	B: ChainApi,
 {
 	match *imported {
diff --git a/substrate/client/transaction-pool/src/single_state_txpool/revalidation.rs b/substrate/client/transaction-pool/src/single_state_txpool/revalidation.rs
index caa09585b28bff894ab7c2476facad55761b2f69..2a691ae35eaf78549ca145d41d933a37edd64d1d 100644
--- a/substrate/client/transaction-pool/src/single_state_txpool/revalidation.rs
+++ b/substrate/client/transaction-pool/src/single_state_txpool/revalidation.rs
@@ -405,7 +405,8 @@ mod tests {
 			TimedTransactionSource::new_external(false),
 			uxt.clone().into(),
 		))
-		.expect("Should be valid");
+		.expect("Should be valid")
+		.hash();
 
 		block_on(queue.revalidate_later(han_of_block0.hash, vec![uxt_hash]));
 
@@ -448,7 +449,7 @@ mod tests {
 				vec![(source.clone(), uxt0.into()), (source, uxt1.into())],
 			))
 			.into_iter()
-			.map(|r| r.expect("Should be valid"))
+			.map(|r| r.expect("Should be valid").hash())
 			.collect::<Vec<_>>();
 
 		assert_eq!(api.validation_requests().len(), 2);
diff --git a/substrate/client/transaction-pool/src/single_state_txpool/single_state_txpool.rs b/substrate/client/transaction-pool/src/single_state_txpool/single_state_txpool.rs
index 2b32704945c759f4ae7a6f473a54e5098d6685ad..3598f9dbc2af15f3b56cb8ccd8fce91777716abc 100644
--- a/substrate/client/transaction-pool/src/single_state_txpool/single_state_txpool.rs
+++ b/substrate/client/transaction-pool/src/single_state_txpool/single_state_txpool.rs
@@ -274,7 +274,12 @@ where
 
 		let number = self.api.resolve_block_number(at);
 		let at = HashAndNumber { hash: at, number: number? };
-		Ok(pool.submit_at(&at, xts).await)
+		Ok(pool
+			.submit_at(&at, xts)
+			.await
+			.into_iter()
+			.map(|result| result.map(|outcome| outcome.hash()))
+			.collect())
 	}
 
 	async fn submit_one(
@@ -292,6 +297,7 @@ where
 		let at = HashAndNumber { hash: at, number: number? };
 		pool.submit_one(&at, TimedTransactionSource::from_transaction_source(source, false), xt)
 			.await
+			.map(|outcome| outcome.hash())
 	}
 
 	async fn submit_and_watch(
@@ -308,15 +314,13 @@ where
 		let number = self.api.resolve_block_number(at);
 
 		let at = HashAndNumber { hash: at, number: number? };
-		let watcher = pool
-			.submit_and_watch(
-				&at,
-				TimedTransactionSource::from_transaction_source(source, false),
-				xt,
-			)
-			.await?;
-
-		Ok(watcher.into_stream().boxed())
+		pool.submit_and_watch(
+			&at,
+			TimedTransactionSource::from_transaction_source(source, false),
+			xt,
+		)
+		.await
+		.map(|mut outcome| outcome.expect_watcher().into_stream().boxed())
 	}
 
 	fn remove_invalid(&self, hashes: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>> {
@@ -484,7 +488,11 @@ where
 			validity,
 		);
 
-		self.pool.validated_pool().submit(vec![validated]).remove(0)
+		self.pool
+			.validated_pool()
+			.submit(vec![validated])
+			.remove(0)
+			.map(|outcome| outcome.hash())
 	}
 }
 
diff --git a/substrate/client/transaction-pool/tests/fatp_common/mod.rs b/substrate/client/transaction-pool/tests/fatp_common/mod.rs
index aaffebc0db0ac15d5443f6e1239ba8015ab409a9..530c25caf88e7973b6c877f8370fc77fa1a203b2 100644
--- a/substrate/client/transaction-pool/tests/fatp_common/mod.rs
+++ b/substrate/client/transaction-pool/tests/fatp_common/mod.rs
@@ -192,12 +192,9 @@ macro_rules! assert_ready_iterator {
 		let output: Vec<_> = ready_iterator.collect();
 		log::debug!(target:LOG_TARGET, "expected: {:#?}", expected);
 		log::debug!(target:LOG_TARGET, "output: {:#?}", output);
+		let output = output.into_iter().map(|t|t.hash).collect::<Vec<_>>();
 		assert_eq!(expected.len(), output.len());
-		assert!(
-			output.iter().zip(expected.iter()).all(|(o,e)| {
-				o.hash == *e
-			})
-		);
+		assert_eq!(output,expected);
 	}};
 }
 
@@ -215,6 +212,18 @@ macro_rules! assert_future_iterator {
 	}};
 }
 
+#[macro_export]
+macro_rules! assert_watcher_stream {
+	($stream:ident, [$( $event:expr ),*]) => {{
+		let expected = vec![ $($event),*];
+		log::debug!(target:LOG_TARGET, "expected: {:#?} {}, block now:", expected, expected.len());
+		let output = futures::executor::block_on_stream($stream).take(expected.len()).collect::<Vec<_>>();
+		log::debug!(target:LOG_TARGET, "output: {:#?}", output);
+		assert_eq!(expected.len(), output.len());
+		assert_eq!(output, expected);
+	}};
+}
+
 pub const SOURCE: TransactionSource = TransactionSource::External;
 
 #[cfg(test)]
diff --git a/substrate/client/transaction-pool/tests/fatp_prios.rs b/substrate/client/transaction-pool/tests/fatp_prios.rs
index 4ed9b450386143c02557cd73a8437364ea48117b..af5e7e8c5a6a83269ff7532d750d8f036e77397b 100644
--- a/substrate/client/transaction-pool/tests/fatp_prios.rs
+++ b/substrate/client/transaction-pool/tests/fatp_prios.rs
@@ -20,13 +20,15 @@
 
 pub mod fatp_common;
 
-use fatp_common::{new_best_block_event, TestPoolBuilder, LOG_TARGET, SOURCE};
+use fatp_common::{invalid_hash, new_best_block_event, TestPoolBuilder, LOG_TARGET, SOURCE};
 use futures::{executor::block_on, FutureExt};
 use sc_transaction_pool::ChainApi;
-use sc_transaction_pool_api::{MaintainedTransactionPool, TransactionPool, TransactionStatus};
+use sc_transaction_pool_api::{
+	error::Error as TxPoolError, LocalTransactionPool, MaintainedTransactionPool, TransactionPool,
+	TransactionStatus,
+};
 use substrate_test_runtime_client::Sr25519Keyring::*;
 use substrate_test_runtime_transaction_pool::uxt;
-
 #[test]
 fn fatp_prio_ready_higher_evicts_lower() {
 	sp_tracing::try_init_simple();
@@ -247,3 +249,312 @@ fn fatp_prio_watcher_future_lower_prio_gets_dropped_from_all_views() {
 	assert_ready_iterator!(header01.hash(), pool, [xt2, xt1]);
 	assert_ready_iterator!(header02.hash(), pool, [xt2, xt1]);
 }
+
+#[test]
+fn fatp_prios_watcher_full_mempool_higher_prio_is_accepted() {
+	sp_tracing::try_init_simple();
+
+	let builder = TestPoolBuilder::new();
+	let (pool, api, _) = builder.with_mempool_count_limit(4).with_ready_count(2).build();
+	api.set_nonce(api.genesis_hash(), Bob.into(), 300);
+	api.set_nonce(api.genesis_hash(), Charlie.into(), 400);
+	api.set_nonce(api.genesis_hash(), Dave.into(), 500);
+	api.set_nonce(api.genesis_hash(), Eve.into(), 600);
+	api.set_nonce(api.genesis_hash(), Ferdie.into(), 700);
+
+	let header01 = api.push_block(1, vec![], true);
+	let event = new_best_block_event(&pool, None, header01.hash());
+	block_on(pool.maintain(event));
+
+	let xt0 = uxt(Alice, 200);
+	let xt1 = uxt(Bob, 300);
+	let xt2 = uxt(Charlie, 400);
+
+	let xt3 = uxt(Dave, 500);
+
+	let xt4 = uxt(Eve, 600);
+	let xt5 = uxt(Ferdie, 700);
+
+	api.set_priority(&xt0, 1);
+	api.set_priority(&xt1, 2);
+	api.set_priority(&xt2, 3);
+	api.set_priority(&xt3, 4);
+
+	api.set_priority(&xt4, 5);
+	api.set_priority(&xt5, 6);
+
+	let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap();
+	let xt1_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap();
+
+	assert_pool_status!(header01.hash(), &pool, 2, 0);
+	assert_eq!(pool.mempool_len().1, 2);
+
+	let header02 = api.push_block_with_parent(header01.hash(), vec![], true);
+	block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header02.hash())));
+
+	let _xt2_watcher =
+		block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap();
+	let _xt3_watcher =
+		block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).unwrap();
+
+	assert_pool_status!(header02.hash(), &pool, 2, 0);
+	assert_eq!(pool.mempool_len().1, 4);
+
+	let header03 = api.push_block_with_parent(header02.hash(), vec![], true);
+	block_on(pool.maintain(new_best_block_event(&pool, Some(header02.hash()), header03.hash())));
+
+	let _xt4_watcher =
+		block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt4.clone())).unwrap();
+	let _xt5_watcher =
+		block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt5.clone())).unwrap();
+
+	assert_pool_status!(header03.hash(), &pool, 2, 0);
+	assert_eq!(pool.mempool_len().1, 4);
+
+	assert_watcher_stream!(xt0_watcher, [TransactionStatus::Ready, TransactionStatus::Dropped]);
+	assert_watcher_stream!(xt1_watcher, [TransactionStatus::Ready, TransactionStatus::Dropped]);
+
+	assert_ready_iterator!(header01.hash(), pool, []);
+	assert_ready_iterator!(header02.hash(), pool, [xt3, xt2]);
+	assert_ready_iterator!(header03.hash(), pool, [xt5, xt4]);
+}
+
+#[test]
+fn fatp_prios_watcher_full_mempool_higher_prio_is_accepted_with_subtree() {
+	sp_tracing::try_init_simple();
+
+	let builder = TestPoolBuilder::new();
+	let (pool, api, _) = builder.with_mempool_count_limit(4).with_ready_count(4).build();
+	api.set_nonce(api.genesis_hash(), Bob.into(), 300);
+	api.set_nonce(api.genesis_hash(), Charlie.into(), 400);
+
+	let header01 = api.push_block(1, vec![], true);
+	let event = new_best_block_event(&pool, None, header01.hash());
+	block_on(pool.maintain(event));
+
+	let xt0 = uxt(Alice, 200);
+	let xt1 = uxt(Alice, 201);
+	let xt2 = uxt(Alice, 202);
+	let xt3 = uxt(Bob, 300);
+	let xt4 = uxt(Charlie, 400);
+
+	api.set_priority(&xt0, 1);
+	api.set_priority(&xt1, 3);
+	api.set_priority(&xt2, 3);
+	api.set_priority(&xt3, 2);
+	api.set_priority(&xt4, 2);
+
+	let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap();
+	let xt1_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap();
+	let xt2_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap();
+	let xt3_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).unwrap();
+
+	assert_ready_iterator!(header01.hash(), pool, [xt3, xt0, xt1, xt2]);
+	assert_pool_status!(header01.hash(), &pool, 4, 0);
+	assert_eq!(pool.mempool_len().1, 4);
+
+	let xt4_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt4.clone())).unwrap();
+	assert_pool_status!(header01.hash(), &pool, 2, 0);
+	assert_ready_iterator!(header01.hash(), pool, [xt3, xt4]);
+
+	assert_watcher_stream!(xt0_watcher, [TransactionStatus::Ready, TransactionStatus::Dropped]);
+	assert_watcher_stream!(xt1_watcher, [TransactionStatus::Ready, TransactionStatus::Dropped]);
+	assert_watcher_stream!(xt2_watcher, [TransactionStatus::Ready, TransactionStatus::Dropped]);
+	assert_watcher_stream!(xt3_watcher, [TransactionStatus::Ready]);
+	assert_watcher_stream!(xt4_watcher, [TransactionStatus::Ready]);
+}
+
+#[test]
+fn fatp_prios_watcher_full_mempool_higher_prio_is_accepted_with_subtree2() {
+	sp_tracing::try_init_simple();
+
+	let builder = TestPoolBuilder::new();
+	let (pool, api, _) = builder.with_mempool_count_limit(4).with_ready_count(4).build();
+	api.set_nonce(api.genesis_hash(), Bob.into(), 300);
+	api.set_nonce(api.genesis_hash(), Charlie.into(), 400);
+
+	let header01 = api.push_block(1, vec![], true);
+	let event = new_best_block_event(&pool, None, header01.hash());
+	block_on(pool.maintain(event));
+
+	let xt0 = uxt(Alice, 200);
+	let xt1 = uxt(Alice, 201);
+	let xt2 = uxt(Alice, 202);
+	let xt3 = uxt(Bob, 300);
+	let xt4 = uxt(Charlie, 400);
+
+	api.set_priority(&xt0, 1);
+	api.set_priority(&xt1, 3);
+	api.set_priority(&xt2, 3);
+	api.set_priority(&xt3, 2);
+	api.set_priority(&xt4, 2);
+
+	let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap();
+	let xt1_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap();
+	let xt2_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap();
+	let xt3_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).unwrap();
+
+	assert_ready_iterator!(header01.hash(), pool, [xt3, xt0, xt1, xt2]);
+	assert_pool_status!(header01.hash(), &pool, 4, 0);
+	assert_eq!(pool.mempool_len().1, 4);
+
+	let header02 = api.push_block_with_parent(header01.hash(), vec![], true);
+	block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header02.hash())));
+
+	let xt4_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt4.clone())).unwrap();
+	assert_ready_iterator!(header01.hash(), pool, [xt3]);
+	assert_pool_status!(header02.hash(), &pool, 2, 0);
+	assert_ready_iterator!(header02.hash(), pool, [xt3, xt4]);
+
+	assert_watcher_stream!(xt0_watcher, [TransactionStatus::Ready, TransactionStatus::Dropped]);
+	assert_watcher_stream!(xt1_watcher, [TransactionStatus::Ready, TransactionStatus::Dropped]);
+	assert_watcher_stream!(xt2_watcher, [TransactionStatus::Ready, TransactionStatus::Dropped]);
+	assert_watcher_stream!(xt3_watcher, [TransactionStatus::Ready]);
+	assert_watcher_stream!(xt4_watcher, [TransactionStatus::Ready]);
+}
+
+#[test]
+fn fatp_prios_watcher_full_mempool_lower_prio_gets_rejected() {
+	sp_tracing::try_init_simple();
+
+	let builder = TestPoolBuilder::new();
+	let (pool, api, _) = builder.with_mempool_count_limit(2).with_ready_count(2).build();
+	api.set_nonce(api.genesis_hash(), Bob.into(), 300);
+	api.set_nonce(api.genesis_hash(), Charlie.into(), 400);
+	api.set_nonce(api.genesis_hash(), Dave.into(), 500);
+
+	let header01 = api.push_block(1, vec![], true);
+	let event = new_best_block_event(&pool, None, header01.hash());
+	block_on(pool.maintain(event));
+
+	let xt0 = uxt(Alice, 200);
+	let xt1 = uxt(Bob, 300);
+	let xt2 = uxt(Charlie, 400);
+	let xt3 = uxt(Dave, 500);
+
+	api.set_priority(&xt0, 2);
+	api.set_priority(&xt1, 2);
+	api.set_priority(&xt2, 2);
+	api.set_priority(&xt3, 1);
+
+	let _xt0_watcher =
+		block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap();
+	let _xt1_watcher =
+		block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap();
+
+	assert_pool_status!(header01.hash(), &pool, 2, 0);
+	assert_eq!(pool.mempool_len().1, 2);
+
+	let header02 = api.push_block_with_parent(header01.hash(), vec![], true);
+	block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header02.hash())));
+
+	assert_pool_status!(header02.hash(), &pool, 2, 0);
+	assert_eq!(pool.mempool_len().1, 2);
+
+	assert_ready_iterator!(header01.hash(), pool, [xt0, xt1]);
+	assert_ready_iterator!(header02.hash(), pool, [xt0, xt1]);
+
+	let result2 = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).map(|_| ());
+	assert!(matches!(result2.as_ref().unwrap_err().0, TxPoolError::ImmediatelyDropped));
+	let result3 = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).map(|_| ());
+	assert!(matches!(result3.as_ref().unwrap_err().0, TxPoolError::ImmediatelyDropped));
+}
+
+#[test]
+fn fatp_prios_watcher_full_mempool_does_not_keep_dropped_transaction() {
+	sp_tracing::try_init_simple();
+
+	let builder = TestPoolBuilder::new();
+	let (pool, api, _) = builder.with_mempool_count_limit(4).with_ready_count(2).build();
+	api.set_nonce(api.genesis_hash(), Bob.into(), 300);
+	api.set_nonce(api.genesis_hash(), Charlie.into(), 400);
+	api.set_nonce(api.genesis_hash(), Dave.into(), 500);
+
+	let header01 = api.push_block(1, vec![], true);
+	let event = new_best_block_event(&pool, None, header01.hash());
+	block_on(pool.maintain(event));
+
+	let xt0 = uxt(Alice, 200);
+	let xt1 = uxt(Bob, 300);
+	let xt2 = uxt(Charlie, 400);
+	let xt3 = uxt(Dave, 500);
+
+	api.set_priority(&xt0, 2);
+	api.set_priority(&xt1, 2);
+	api.set_priority(&xt2, 2);
+	api.set_priority(&xt3, 2);
+
+	let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap();
+	let xt1_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap();
+	let xt2_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap();
+	let xt3_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).unwrap();
+
+	assert_pool_status!(header01.hash(), &pool, 2, 0);
+	assert_ready_iterator!(header01.hash(), pool, [xt2, xt3]);
+
+	assert_watcher_stream!(xt0_watcher, [TransactionStatus::Ready, TransactionStatus::Dropped]);
+	assert_watcher_stream!(xt1_watcher, [TransactionStatus::Ready, TransactionStatus::Dropped]);
+	assert_watcher_stream!(xt2_watcher, [TransactionStatus::Ready]);
+	assert_watcher_stream!(xt3_watcher, [TransactionStatus::Ready]);
+}
+
+#[test]
+fn fatp_prios_submit_local_full_mempool_higher_prio_is_accepted() {
+	sp_tracing::try_init_simple();
+
+	let builder = TestPoolBuilder::new();
+	let (pool, api, _) = builder.with_mempool_count_limit(4).with_ready_count(2).build();
+	api.set_nonce(api.genesis_hash(), Bob.into(), 300);
+	api.set_nonce(api.genesis_hash(), Charlie.into(), 400);
+	api.set_nonce(api.genesis_hash(), Dave.into(), 500);
+	api.set_nonce(api.genesis_hash(), Eve.into(), 600);
+	api.set_nonce(api.genesis_hash(), Ferdie.into(), 700);
+
+	let header01 = api.push_block(1, vec![], true);
+	let event = new_best_block_event(&pool, None, header01.hash());
+	block_on(pool.maintain(event));
+
+	let xt0 = uxt(Alice, 200);
+	let xt1 = uxt(Bob, 300);
+	let xt2 = uxt(Charlie, 400);
+
+	let xt3 = uxt(Dave, 500);
+
+	let xt4 = uxt(Eve, 600);
+	let xt5 = uxt(Ferdie, 700);
+
+	api.set_priority(&xt0, 1);
+	api.set_priority(&xt1, 2);
+	api.set_priority(&xt2, 3);
+	api.set_priority(&xt3, 4);
+
+	api.set_priority(&xt4, 5);
+	api.set_priority(&xt5, 6);
+	pool.submit_local(invalid_hash(), xt0.clone()).unwrap();
+	pool.submit_local(invalid_hash(), xt1.clone()).unwrap();
+
+	assert_pool_status!(header01.hash(), &pool, 2, 0);
+	assert_eq!(pool.mempool_len().0, 2);
+
+	let header02 = api.push_block_with_parent(header01.hash(), vec![], true);
+	block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header02.hash())));
+
+	pool.submit_local(invalid_hash(), xt2.clone()).unwrap();
+	pool.submit_local(invalid_hash(), xt3.clone()).unwrap();
+
+	assert_pool_status!(header02.hash(), &pool, 2, 0);
+	assert_eq!(pool.mempool_len().0, 4);
+
+	let header03 = api.push_block_with_parent(header02.hash(), vec![], true);
+	block_on(pool.maintain(new_best_block_event(&pool, Some(header02.hash()), header03.hash())));
+
+	pool.submit_local(invalid_hash(), xt4.clone()).unwrap();
+	pool.submit_local(invalid_hash(), xt5.clone()).unwrap();
+
+	assert_pool_status!(header03.hash(), &pool, 2, 0);
+	assert_eq!(pool.mempool_len().0, 4);
+
+	assert_ready_iterator!(header01.hash(), pool, []);
+	assert_ready_iterator!(header02.hash(), pool, [xt3, xt2]);
+	assert_ready_iterator!(header03.hash(), pool, [xt5, xt4]);
+}
diff --git a/substrate/client/transaction-pool/tests/pool.rs b/substrate/client/transaction-pool/tests/pool.rs
index de35726435f0f1c53bbc1e4a0cd758f8121b0530..c70f4548331454f166e8112df00e42e1652e1fb6 100644
--- a/substrate/client/transaction-pool/tests/pool.rs
+++ b/substrate/client/transaction-pool/tests/pool.rs
@@ -158,6 +158,7 @@ fn prune_tags_should_work() {
 	let (pool, api) = pool();
 	let hash209 =
 		block_on(pool.submit_one(&api.expect_hash_and_number(0), TSOURCE, uxt(Alice, 209).into()))
+			.map(|o| o.hash())
 			.unwrap();
 	block_on(pool.submit_one(&api.expect_hash_and_number(0), TSOURCE, uxt(Alice, 210).into()))
 		.unwrap();
@@ -184,10 +185,13 @@ fn prune_tags_should_work() {
 fn should_ban_invalid_transactions() {
 	let (pool, api) = pool();
 	let uxt = Arc::from(uxt(Alice, 209));
-	let hash =
-		block_on(pool.submit_one(&api.expect_hash_and_number(0), TSOURCE, uxt.clone())).unwrap();
+	let hash = block_on(pool.submit_one(&api.expect_hash_and_number(0), TSOURCE, uxt.clone()))
+		.unwrap()
+		.hash();
 	pool.validated_pool().remove_invalid(&[hash]);
-	block_on(pool.submit_one(&api.expect_hash_and_number(0), TSOURCE, uxt.clone())).unwrap_err();
+	block_on(pool.submit_one(&api.expect_hash_and_number(0), TSOURCE, uxt.clone()))
+		.map(|_| ())
+		.unwrap_err();
 
 	// when
 	let pending: Vec<_> = pool
@@ -198,7 +202,9 @@ fn should_ban_invalid_transactions() {
 	assert_eq!(pending, Vec::<Nonce>::new());
 
 	// then
-	block_on(pool.submit_one(&api.expect_hash_and_number(0), TSOURCE, uxt.clone())).unwrap_err();
+	block_on(pool.submit_one(&api.expect_hash_and_number(0), TSOURCE, uxt.clone()))
+		.map(|_| ())
+		.unwrap_err();
 }
 
 #[test]
diff --git a/substrate/test-utils/runtime/transaction-pool/src/lib.rs b/substrate/test-utils/runtime/transaction-pool/src/lib.rs
index 93e5855eefc6c0bad3b805cb63e73b12ab630b1e..f88694fb1071e231ac5344156cca634a19d1614d 100644
--- a/substrate/test-utils/runtime/transaction-pool/src/lib.rs
+++ b/substrate/test-utils/runtime/transaction-pool/src/lib.rs
@@ -352,9 +352,18 @@ impl ChainApi for TestApi {
 	fn validate_transaction(
 		&self,
 		at: <Self::Block as BlockT>::Hash,
-		_source: TransactionSource,
+		source: TransactionSource,
 		uxt: Arc<<Self::Block as BlockT>::Extrinsic>,
 	) -> Self::ValidationFuture {
+		ready(self.validate_transaction_blocking(at, source, uxt))
+	}
+
+	fn validate_transaction_blocking(
+		&self,
+		at: <Self::Block as BlockT>::Hash,
+		_source: TransactionSource,
+		uxt: Arc<<Self::Block as BlockT>::Extrinsic>,
+	) -> Result<TransactionValidity, Error> {
 		let uxt = (*uxt).clone();
 		self.validation_requests.write().push(uxt.clone());
 		let block_number;
@@ -374,16 +383,12 @@ impl ChainApi for TestApi {
 				// the transaction. (This is not required for this test function, but in real
 				// environment it would fail because of this).
 				if !found_best {
-					return ready(Ok(Err(TransactionValidityError::Invalid(
-						InvalidTransaction::Custom(1),
-					))))
+					return Ok(Err(TransactionValidityError::Invalid(InvalidTransaction::Custom(1))))
 				}
 			},
 			Ok(None) =>
-				return ready(Ok(Err(TransactionValidityError::Invalid(
-					InvalidTransaction::Custom(2),
-				)))),
-			Err(e) => return ready(Err(e)),
+				return Ok(Err(TransactionValidityError::Invalid(InvalidTransaction::Custom(2)))),
+			Err(e) => return Err(e),
 		}
 
 		let (requires, provides) = if let Ok(transfer) = TransferData::try_from(&uxt) {
@@ -423,7 +428,7 @@ impl ChainApi for TestApi {
 
 			if self.enable_stale_check && transfer.nonce < chain_nonce {
 				log::info!("test_api::validate_transaction: invalid_transaction(stale)....");
-				return ready(Ok(Err(TransactionValidityError::Invalid(InvalidTransaction::Stale))))
+				return Ok(Err(TransactionValidityError::Invalid(InvalidTransaction::Stale)))
 			}
 
 			(requires, provides)
@@ -433,7 +438,7 @@ impl ChainApi for TestApi {
 
 		if self.chain.read().invalid_hashes.contains(&self.hash_and_length(&uxt).0) {
 			log::info!("test_api::validate_transaction: invalid_transaction....");
-			return ready(Ok(Err(TransactionValidityError::Invalid(InvalidTransaction::Custom(0)))))
+			return Ok(Err(TransactionValidityError::Invalid(InvalidTransaction::Custom(0))))
 		}
 
 		let priority = self.chain.read().priorities.get(&self.hash_and_length(&uxt).0).cloned();
@@ -447,16 +452,7 @@ impl ChainApi for TestApi {
 
 		(self.valid_modifier.read())(&mut validity);
 
-		ready(Ok(Ok(validity)))
-	}
-
-	fn validate_transaction_blocking(
-		&self,
-		_at: <Self::Block as BlockT>::Hash,
-		_source: TransactionSource,
-		_uxt: Arc<<Self::Block as BlockT>::Extrinsic>,
-	) -> Result<TransactionValidity, Error> {
-		unimplemented!();
+		Ok(Ok(validity))
 	}
 
 	fn block_id_to_number(