diff --git a/prdoc/pr_6262.prdoc b/prdoc/pr_6262.prdoc
new file mode 100644
index 0000000000000000000000000000000000000000..8ad99bc6ad2867c3499938f882851e78201c648f
--- /dev/null
+++ b/prdoc/pr_6262.prdoc
@@ -0,0 +1,10 @@
+title: "Size limits implemented for fork aware transaction pool"
+
+doc:
+  - audience: Node Dev
+    description: |
+      Size limits are now obeyed in fork aware transaction pool
+
+crates:
+  - name: sc-transaction-pool
+    bump: minor
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs
index 2dd5836c570f419674ffe07d2c2f7aab7ab6cc4d..ecae21395c9164b2e11b8361e7e79512219cf111 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs
@@ -68,15 +68,7 @@ where
 	AddView(BlockHash<C>, ViewStream<C>),
 	/// Removes an existing view's stream associated with a specific block hash.
 	RemoveView(BlockHash<C>),
-	/// Adds initial views for given extrinsics hashes.
-	///
-	/// This message should be sent when the external submission of a transaction occures. It
-	/// provides the list of initial views for given extrinsics hashes.
-	/// The dropped notification is not sent if it comes from the initial views. It allows to keep
-	/// transaction in the mempool, even if all the views are full at the time of submitting
-	/// transaction to the pool.
-	AddInitialViews(Vec<ExtrinsicHash<C>>, BlockHash<C>),
-	/// Removes all initial views for given extrinsic hashes.
+	/// Removes internal states for given extrinsic hashes.
 	///
 	/// Intended to ba called on finalization.
 	RemoveFinalizedTxs(Vec<ExtrinsicHash<C>>),
@@ -90,7 +82,6 @@ where
 		match self {
 			Command::AddView(..) => write!(f, "AddView"),
 			Command::RemoveView(..) => write!(f, "RemoveView"),
-			Command::AddInitialViews(..) => write!(f, "AddInitialViews"),
 			Command::RemoveFinalizedTxs(..) => write!(f, "RemoveFinalizedTxs"),
 		}
 	}
@@ -118,13 +109,6 @@ where
 	///
 	/// Once transaction is dropped, dropping view is removed from the set.
 	transaction_states: HashMap<ExtrinsicHash<C>, HashSet<BlockHash<C>>>,
-
-	/// The list of initial view for every extrinsic.
-	///
-	/// Dropped notifications from initial views will be silenced. This allows to accept the
-	/// transaction into the mempool, even if all the views are full at the time of submitting new
-	/// transaction.
-	initial_views: HashMap<ExtrinsicHash<C>, HashSet<BlockHash<C>>>,
 }
 
 impl<C> MultiViewDropWatcherContext<C>
@@ -164,15 +148,7 @@ where
 							.iter()
 							.all(|h| !self.stream_map.contains_key(h))
 					{
-						return self
-							.initial_views
-							.get(&tx_hash)
-							.map(|list| !list.contains(&block_hash))
-							.unwrap_or(true)
-							.then(|| {
-								debug!("[{:?}] dropped_watcher: removing tx", tx_hash);
-								tx_hash
-							})
+						return Some(tx_hash)
 					}
 				} else {
 					debug!("[{:?}] dropped_watcher: removing (non-tracked) tx", tx_hash);
@@ -201,7 +177,6 @@ where
 			stream_map: StreamMap::new(),
 			command_receiver,
 			transaction_states: Default::default(),
-			initial_views: Default::default(),
 		};
 
 		let stream_map = futures::stream::unfold(ctx, |mut ctx| async move {
@@ -217,17 +192,13 @@ where
 							Command::RemoveView(key) => {
 								trace!(target: LOG_TARGET,"dropped_watcher: Command::RemoveView {key:?} views:{:?}",ctx.stream_map.keys().collect::<Vec<_>>());
 								ctx.stream_map.remove(&key);
-							},
-							Command::AddInitialViews(xts,block_hash) => {
-								log_xt_trace!(target: LOG_TARGET, xts.clone(), "[{:?}] dropped_watcher: xt initial view added {block_hash:?}");
-								xts.into_iter().for_each(|xt| {
-									ctx.initial_views.entry(xt).or_default().insert(block_hash);
+								ctx.transaction_states.iter_mut().for_each(|(_,state)| {
+									state.remove(&key);
 								});
 							},
 							Command::RemoveFinalizedTxs(xts) => {
 								log_xt_trace!(target: LOG_TARGET, xts.clone(), "[{:?}] dropped_watcher: finalized xt removed");
 								xts.iter().for_each(|xt| {
-									ctx.initial_views.remove(xt);
 									ctx.transaction_states.remove(xt);
 								});
 
@@ -291,34 +262,13 @@ where
 		});
 	}
 
-	/// Adds the initial view for the given transactions hashes.
-	///
-	/// This message should be called when the external submission of a transaction occures. It
-	/// provides the list of initial views for given extrinsics hashes.
-	///
-	/// The dropped notification is not sent if it comes from the initial views. It allows to keep
-	/// transaction in the mempool, even if all the views are full at the time of submitting
-	/// transaction to the pool.
-	pub fn add_initial_views(
-		&self,
-		xts: impl IntoIterator<Item = ExtrinsicHash<C>> + Clone,
-		block_hash: BlockHash<C>,
-	) {
-		let _ = self
-			.controller
-			.unbounded_send(Command::AddInitialViews(xts.into_iter().collect(), block_hash))
-			.map_err(|e| {
-				trace!(target: LOG_TARGET, "dropped_watcher: add_initial_views_ send message failed: {e}");
-			});
-	}
-
-	/// Removes all initial views for finalized transactions.
+	/// Removes status info for finalized transactions.
 	pub fn remove_finalized_txs(&self, xts: impl IntoIterator<Item = ExtrinsicHash<C>> + Clone) {
 		let _ = self
 			.controller
 			.unbounded_send(Command::RemoveFinalizedTxs(xts.into_iter().collect()))
 			.map_err(|e| {
-				trace!(target: LOG_TARGET, "dropped_watcher: remove_initial_views send message failed: {e}");
+				trace!(target: LOG_TARGET, "dropped_watcher: remove_finalized_txs send message failed: {e}");
 			});
 	}
 }
@@ -471,63 +421,4 @@ mod dropped_watcher_tests {
 		let handle = tokio::spawn(async move { output_stream.take(1).collect::<Vec<_>>().await });
 		assert_eq!(handle.await.unwrap(), vec![tx_hash]);
 	}
-
-	#[tokio::test]
-	async fn test06() {
-		sp_tracing::try_init_simple();
-		let (watcher, mut output_stream) = MultiViewDroppedWatcher::new();
-		assert!(output_stream.next().now_or_never().is_none());
-
-		let block_hash0 = H256::repeat_byte(0x01);
-		let block_hash1 = H256::repeat_byte(0x02);
-		let tx_hash = H256::repeat_byte(0x0b);
-
-		let view_stream0 = futures::stream::iter(vec![
-			(tx_hash, TransactionStatus::Future),
-			(tx_hash, TransactionStatus::InBlock((block_hash1, 0))),
-		])
-		.boxed();
-		watcher.add_view(block_hash0, view_stream0);
-		assert!(output_stream.next().now_or_never().is_none());
-
-		let view_stream1 = futures::stream::iter(vec![
-			(tx_hash, TransactionStatus::Ready),
-			(tx_hash, TransactionStatus::Dropped),
-		])
-		.boxed();
-
-		watcher.add_view(block_hash1, view_stream1);
-		watcher.add_initial_views(vec![tx_hash], block_hash1);
-		assert!(output_stream.next().now_or_never().is_none());
-	}
-
-	#[tokio::test]
-	async fn test07() {
-		sp_tracing::try_init_simple();
-		let (watcher, mut output_stream) = MultiViewDroppedWatcher::new();
-		assert!(output_stream.next().now_or_never().is_none());
-
-		let block_hash0 = H256::repeat_byte(0x01);
-		let block_hash1 = H256::repeat_byte(0x02);
-		let tx_hash = H256::repeat_byte(0x0b);
-
-		let view_stream0 = futures::stream::iter(vec![
-			(tx_hash, TransactionStatus::Future),
-			(tx_hash, TransactionStatus::InBlock((block_hash1, 0))),
-		])
-		.boxed();
-		watcher.add_view(block_hash0, view_stream0);
-		watcher.add_initial_views(vec![tx_hash], block_hash0);
-		assert!(output_stream.next().now_or_never().is_none());
-
-		let view_stream1 = futures::stream::iter(vec![
-			(tx_hash, TransactionStatus::Ready),
-			(tx_hash, TransactionStatus::Dropped),
-		])
-		.boxed();
-		watcher.add_view(block_hash1, view_stream1);
-
-		let handle = tokio::spawn(async move { output_stream.take(1).collect::<Vec<_>>().await });
-		assert_eq!(handle.await.unwrap(), vec![tx_hash]);
-	}
 }
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs
index 7e72b44adf38b7ebb245874a50ea6c4067a9178c..a342d35b2844432e6ac11fd6f20527968a84b714 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs
@@ -45,7 +45,6 @@ use futures::{
 use parking_lot::Mutex;
 use prometheus_endpoint::Registry as PrometheusRegistry;
 use sc_transaction_pool_api::{
-	error::{Error, IntoPoolError},
 	ChainEvent, ImportNotificationStream, MaintainedTransactionPool, PoolFuture, PoolStatus,
 	TransactionFor, TransactionPool, TransactionSource, TransactionStatusStreamFor, TxHash,
 };
@@ -193,6 +192,7 @@ where
 			listener.clone(),
 			Default::default(),
 			mempool_max_transactions_count,
+			ready_limits.total_bytes + future_limits.total_bytes,
 		));
 
 		let (dropped_stream_controller, dropped_stream) =
@@ -283,6 +283,7 @@ where
 			listener.clone(),
 			metrics.clone(),
 			TXMEMPOOL_TRANSACTION_LIMIT_MULTIPLIER * (options.ready.count + options.future.count),
+			options.ready.total_bytes + options.future.total_bytes,
 		));
 
 		let (dropped_stream_controller, dropped_stream) =
@@ -599,48 +600,36 @@ where
 		log::debug!(target: LOG_TARGET, "fatp::submit_at count:{} views:{}", xts.len(), self.active_views_count());
 		log_xt_trace!(target: LOG_TARGET, xts.iter().map(|xt| self.tx_hash(xt)), "[{:?}] fatp::submit_at");
 		let xts = xts.into_iter().map(Arc::from).collect::<Vec<_>>();
-		let mempool_result = self.mempool.extend_unwatched(source, &xts);
+		let mempool_results = self.mempool.extend_unwatched(source, &xts);
 
 		if view_store.is_empty() {
-			return future::ready(Ok(mempool_result)).boxed()
+			return future::ready(Ok(mempool_results)).boxed()
 		}
 
-		let (hashes, to_be_submitted): (Vec<TxHash<Self>>, Vec<ExtrinsicFor<ChainApi>>) =
-			mempool_result
-				.iter()
-				.zip(xts)
-				.filter_map(|(result, xt)| result.as_ref().ok().map(|xt_hash| (xt_hash, xt)))
-				.unzip();
+		let to_be_submitted = mempool_results
+			.iter()
+			.zip(xts)
+			.filter_map(|(result, xt)| result.as_ref().ok().map(|_| xt))
+			.collect::<Vec<_>>();
 
 		self.metrics
 			.report(|metrics| metrics.submitted_transactions.inc_by(to_be_submitted.len() as _));
 
 		let mempool = self.mempool.clone();
 		async move {
-			let results_map = view_store.submit(source, to_be_submitted.into_iter(), hashes).await;
+			let results_map = view_store.submit(source, to_be_submitted.into_iter()).await;
 			let mut submission_results = reduce_multiview_result(results_map).into_iter();
 
-			Ok(mempool_result
+			Ok(mempool_results
 				.into_iter()
 				.map(|result| {
 					result.and_then(|xt_hash| {
-						let result = submission_results
+						submission_results
 							.next()
-							.expect("The number of Ok results in mempool is exactly the same as the size of to-views-submission result. qed.");
-						result.or_else(|error| {
-							let error = error.into_pool_error();
-							match error {
-								Ok(
-									// The transaction is still in mempool it may get included into the view for the next block.
-									Error::ImmediatelyDropped
-								) => Ok(xt_hash),
-								Ok(e) => {
-									mempool.remove(xt_hash);
-									Err(e.into())
-								},
-								Err(e) => Err(e),
-							}
-						})
+							.expect("The number of Ok results in mempool is exactly the same as the size of to-views-submission result. qed.")
+							.inspect_err(|_|
+								mempool.remove(xt_hash)
+							)
 					})
 				})
 				.collect::<Vec<_>>())
@@ -692,26 +681,10 @@ where
 		let view_store = self.view_store.clone();
 		let mempool = self.mempool.clone();
 		async move {
-			let result = view_store.submit_and_watch(at, source, xt).await;
-			let result = result.or_else(|(e, maybe_watcher)| {
-				let error = e.into_pool_error();
-				match (error, maybe_watcher) {
-					(
-						Ok(
-							// The transaction is still in mempool it may get included into the
-							// view for the next block.
-							Error::ImmediatelyDropped,
-						),
-						Some(watcher),
-					) => Ok(watcher),
-					(Ok(e), _) => {
-						mempool.remove(xt_hash);
-						Err(e.into())
-					},
-					(Err(e), _) => Err(e),
-				}
-			});
-			result
+			view_store
+				.submit_and_watch(at, source, xt)
+				.await
+				.inspect_err(|_| mempool.remove(xt_hash))
 		}
 		.boxed()
 	}
@@ -1056,7 +1029,7 @@ where
 		future::join_all(results).await
 	}
 
-	/// Updates the given view with the transaction from the internal mempol.
+	/// Updates the given view with the transactions from the internal mempol.
 	///
 	/// All transactions from the mempool (excluding those which are either already imported or
 	/// already included in blocks since recently finalized block) are submitted to the
@@ -1139,12 +1112,9 @@ where
 		// out the invalid event, and remove transaction.
 		if self.view_store.is_empty() {
 			for result in watched_results {
-				match result {
-					Err(tx_hash) => {
-						self.view_store.listener.invalidate_transactions(&[tx_hash]);
-						self.mempool.remove(tx_hash);
-					},
-					Ok(_) => {},
+				if let Err(tx_hash) = result {
+					self.view_store.listener.invalidate_transactions(&[tx_hash]);
+					self.mempool.remove(tx_hash);
 				}
 			}
 		}
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs
index 989c7e8ef356e74af9c565f54ae4513034ea649c..86c07008c3f3b468fab59047b64ef9e5b5f4c6f4 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs
@@ -30,12 +30,11 @@ use super::{metrics::MetricsLink as PrometheusMetrics, multi_view_listener::Mult
 use crate::{
 	common::log_xt::log_xt_trace,
 	graph,
-	graph::{ExtrinsicFor, ExtrinsicHash},
+	graph::{tracked_map::Size, ExtrinsicFor, ExtrinsicHash},
 	LOG_TARGET,
 };
 use futures::FutureExt;
 use itertools::Itertools;
-use parking_lot::RwLock;
 use sc_transaction_pool_api::TransactionSource;
 use sp_blockchain::HashAndNumber;
 use sp_runtime::{
@@ -43,7 +42,7 @@ use sp_runtime::{
 	transaction_validity::{InvalidTransaction, TransactionValidityError},
 };
 use std::{
-	collections::{hash_map::Entry, HashMap},
+	collections::HashMap,
 	sync::{atomic, atomic::AtomicU64, Arc},
 	time::Instant,
 };
@@ -72,6 +71,8 @@ where
 	watched: bool,
 	/// Extrinsic actual body.
 	tx: ExtrinsicFor<ChainApi>,
+	/// Size of the extrinsics actual body.
+	bytes: usize,
 	/// Transaction source.
 	source: TransactionSource,
 	/// When the transaction was revalidated, used to periodically revalidate the mem pool buffer.
@@ -99,13 +100,13 @@ where
 	}
 
 	/// Creates a new instance of wrapper for unwatched transaction.
-	fn new_unwatched(source: TransactionSource, tx: ExtrinsicFor<ChainApi>) -> Self {
-		Self { watched: false, tx, source, validated_at: AtomicU64::new(0) }
+	fn new_unwatched(source: TransactionSource, tx: ExtrinsicFor<ChainApi>, bytes: usize) -> Self {
+		Self { watched: false, tx, source, validated_at: AtomicU64::new(0), bytes }
 	}
 
 	/// Creates a new instance of wrapper for watched transaction.
-	fn new_watched(source: TransactionSource, tx: ExtrinsicFor<ChainApi>) -> Self {
-		Self { watched: true, tx, source, validated_at: AtomicU64::new(0) }
+	fn new_watched(source: TransactionSource, tx: ExtrinsicFor<ChainApi>, bytes: usize) -> Self {
+		Self { watched: true, tx, source, validated_at: AtomicU64::new(0), bytes }
 	}
 
 	/// Provides a clone of actual transaction body.
@@ -121,10 +122,18 @@ where
 	}
 }
 
+impl<ChainApi, Block> Size for Arc<TxInMemPool<ChainApi, Block>>
+where
+	Block: BlockT,
+	ChainApi: graph::ChainApi<Block = Block> + 'static,
+{
+	fn size(&self) -> usize {
+		self.bytes
+	}
+}
+
 type InternalTxMemPoolMap<ChainApi, Block> =
-	HashMap<ExtrinsicHash<ChainApi>, Arc<TxInMemPool<ChainApi, Block>>>;
-type InternalTxMemPoolMapEntry<'a, ChainApi, Block> =
-	Entry<'a, ExtrinsicHash<ChainApi>, Arc<TxInMemPool<ChainApi, Block>>>;
+	graph::tracked_map::TrackedMap<ExtrinsicHash<ChainApi>, Arc<TxInMemPool<ChainApi, Block>>>;
 
 /// An intermediary transactions buffer.
 ///
@@ -153,13 +162,16 @@ where
 	///
 	///  The key is the hash of the transaction, and the value is a wrapper
 	///  structure, which contains the mempool specific details of the transaction.
-	transactions: RwLock<InternalTxMemPoolMap<ChainApi, Block>>,
+	transactions: InternalTxMemPoolMap<ChainApi, Block>,
 
 	/// Prometheus's metrics endpoint.
 	metrics: PrometheusMetrics,
 
 	/// Indicates the maximum number of transactions that can be maintained in the memory pool.
 	max_transactions_count: usize,
+
+	/// Maximal size of encodings of all transactions in the memory pool.
+	max_transactions_total_bytes: usize,
 }
 
 impl<ChainApi, Block> TxMemPool<ChainApi, Block>
@@ -175,19 +187,32 @@ where
 		listener: Arc<MultiViewListener<ChainApi>>,
 		metrics: PrometheusMetrics,
 		max_transactions_count: usize,
+		max_transactions_total_bytes: usize,
 	) -> Self {
-		Self { api, listener, transactions: Default::default(), metrics, max_transactions_count }
+		Self {
+			api,
+			listener,
+			transactions: Default::default(),
+			metrics,
+			max_transactions_count,
+			max_transactions_total_bytes,
+		}
 	}
 
 	/// Creates a new `TxMemPool` instance for testing purposes.
 	#[allow(dead_code)]
-	fn new_test(api: Arc<ChainApi>, max_transactions_count: usize) -> Self {
+	fn new_test(
+		api: Arc<ChainApi>,
+		max_transactions_count: usize,
+		max_transactions_total_bytes: usize,
+	) -> Self {
 		Self {
 			api,
 			listener: Arc::from(MultiViewListener::new()),
 			transactions: Default::default(),
 			metrics: Default::default(),
 			max_transactions_count,
+			max_transactions_total_bytes,
 		}
 	}
 
@@ -200,28 +225,42 @@ where
 	}
 
 	/// Returns a tuple with the count of unwatched and watched transactions in the memory pool.
-	pub(super) fn unwatched_and_watched_count(&self) -> (usize, usize) {
+	pub fn unwatched_and_watched_count(&self) -> (usize, usize) {
 		let transactions = self.transactions.read();
 		let watched_count = transactions.values().filter(|t| t.is_watched()).count();
 		(transactions.len() - watched_count, watched_count)
 	}
 
+	/// Returns the number of bytes used by all extrinsics in the the pool.
+	#[cfg(test)]
+	pub fn bytes(&self) -> usize {
+		return self.transactions.bytes()
+	}
+
+	/// Returns true if provided values would exceed defined limits.
+	fn is_limit_exceeded(&self, length: usize, current_total_bytes: usize) -> bool {
+		length > self.max_transactions_count ||
+			current_total_bytes > self.max_transactions_total_bytes
+	}
+
 	/// Attempts to insert a transaction into the memory pool, ensuring it does not
 	/// exceed the maximum allowed transaction count.
 	fn try_insert(
 		&self,
-		current_len: usize,
-		entry: InternalTxMemPoolMapEntry<'_, ChainApi, Block>,
 		hash: ExtrinsicHash<ChainApi>,
 		tx: TxInMemPool<ChainApi, Block>,
 	) -> Result<ExtrinsicHash<ChainApi>, ChainApi::Error> {
-		//todo: obey size limits [#5476]
-		let result = match (current_len < self.max_transactions_count, entry) {
-			(true, Entry::Vacant(v)) => {
-				v.insert(Arc::from(tx));
+		let bytes = self.transactions.bytes();
+		let mut transactions = self.transactions.write();
+		let result = match (
+			!self.is_limit_exceeded(transactions.len() + 1, bytes + tx.bytes),
+			transactions.contains_key(&hash),
+		) {
+			(true, false) => {
+				transactions.insert(hash, Arc::from(tx));
 				Ok(hash)
 			},
-			(_, Entry::Occupied(_)) =>
+			(_, true) =>
 				Err(sc_transaction_pool_api::error::Error::AlreadyImported(Box::new(hash)).into()),
 			(false, _) => Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped.into()),
 		};
@@ -239,17 +278,11 @@ where
 		source: TransactionSource,
 		xts: &[ExtrinsicFor<ChainApi>],
 	) -> Vec<Result<ExtrinsicHash<ChainApi>, ChainApi::Error>> {
-		let mut transactions = self.transactions.write();
 		let result = xts
 			.iter()
 			.map(|xt| {
-				let hash = self.api.hash_and_length(&xt).0;
-				self.try_insert(
-					transactions.len(),
-					transactions.entry(hash),
-					hash,
-					TxInMemPool::new_unwatched(source, xt.clone()),
-				)
+				let (hash, length) = self.api.hash_and_length(&xt);
+				self.try_insert(hash, TxInMemPool::new_unwatched(source, xt.clone(), length))
 			})
 			.collect::<Vec<_>>();
 		result
@@ -262,14 +295,8 @@ where
 		source: TransactionSource,
 		xt: ExtrinsicFor<ChainApi>,
 	) -> Result<ExtrinsicHash<ChainApi>, ChainApi::Error> {
-		let mut transactions = self.transactions.write();
-		let hash = self.api.hash_and_length(&xt).0;
-		self.try_insert(
-			transactions.len(),
-			transactions.entry(hash),
-			hash,
-			TxInMemPool::new_watched(source, xt.clone()),
-		)
+		let (hash, length) = self.api.hash_and_length(&xt);
+		self.try_insert(hash, TxInMemPool::new_watched(source, xt.clone(), length))
 	}
 
 	/// Removes transactions from the memory pool which are specified by the given list of hashes
@@ -324,12 +351,11 @@ where
 		let start = Instant::now();
 
 		let (count, input) = {
-			let transactions = self.transactions.read();
+			let transactions = self.transactions.clone_map();
 
 			(
 				transactions.len(),
 				transactions
-					.clone()
 					.into_iter()
 					.filter(|xt| {
 						let finalized_block_number = finalized_block.number.into().as_u64();
@@ -417,8 +443,8 @@ where
 #[cfg(test)]
 mod tx_mem_pool_tests {
 	use super::*;
-	use crate::common::tests::TestApi;
-	use substrate_test_runtime::{AccountId, Extrinsic, Transfer, H256};
+	use crate::{common::tests::TestApi, graph::ChainApi};
+	use substrate_test_runtime::{AccountId, Extrinsic, ExtrinsicBuilder, Transfer, H256};
 	use substrate_test_runtime_client::AccountKeyring::*;
 	fn uxt(nonce: u64) -> Extrinsic {
 		crate::common::tests::uxt(Transfer {
@@ -433,7 +459,7 @@ mod tx_mem_pool_tests {
 	fn extend_unwatched_obeys_limit() {
 		let max = 10;
 		let api = Arc::from(TestApi::default());
-		let mempool = TxMemPool::new_test(api, max);
+		let mempool = TxMemPool::new_test(api, max, usize::MAX);
 
 		let xts = (0..max + 1).map(|x| Arc::from(uxt(x as _))).collect::<Vec<_>>();
 
@@ -450,7 +476,7 @@ mod tx_mem_pool_tests {
 		sp_tracing::try_init_simple();
 		let max = 10;
 		let api = Arc::from(TestApi::default());
-		let mempool = TxMemPool::new_test(api, max);
+		let mempool = TxMemPool::new_test(api, max, usize::MAX);
 
 		let mut xts = (0..max - 1).map(|x| Arc::from(uxt(x as _))).collect::<Vec<_>>();
 		xts.push(xts.iter().last().unwrap().clone());
@@ -467,7 +493,7 @@ mod tx_mem_pool_tests {
 	fn push_obeys_limit() {
 		let max = 10;
 		let api = Arc::from(TestApi::default());
-		let mempool = TxMemPool::new_test(api, max);
+		let mempool = TxMemPool::new_test(api, max, usize::MAX);
 
 		let xts = (0..max).map(|x| Arc::from(uxt(x as _))).collect::<Vec<_>>();
 
@@ -492,7 +518,7 @@ mod tx_mem_pool_tests {
 	fn push_detects_already_imported() {
 		let max = 10;
 		let api = Arc::from(TestApi::default());
-		let mempool = TxMemPool::new_test(api, 2 * max);
+		let mempool = TxMemPool::new_test(api, 2 * max, usize::MAX);
 
 		let xts = (0..max).map(|x| Arc::from(uxt(x as _))).collect::<Vec<_>>();
 		let xt0 = xts.iter().last().unwrap().clone();
@@ -517,7 +543,7 @@ mod tx_mem_pool_tests {
 	fn count_works() {
 		let max = 100;
 		let api = Arc::from(TestApi::default());
-		let mempool = TxMemPool::new_test(api, max);
+		let mempool = TxMemPool::new_test(api, max, usize::MAX);
 
 		let xts0 = (0..10).map(|x| Arc::from(uxt(x as _))).collect::<Vec<_>>();
 
@@ -532,4 +558,39 @@ mod tx_mem_pool_tests {
 		assert!(results.iter().all(Result::is_ok));
 		assert_eq!(mempool.unwatched_and_watched_count(), (10, 5));
 	}
+
+	fn large_uxt(x: usize) -> Extrinsic {
+		ExtrinsicBuilder::new_include_data(vec![x as u8; 1024]).build()
+	}
+
+	#[test]
+	fn push_obeys_size_limit() {
+		sp_tracing::try_init_simple();
+		let max = 10;
+		let api = Arc::from(TestApi::default());
+		//size of large extrinsic is: 1129
+		let mempool = TxMemPool::new_test(api.clone(), usize::MAX, max * 1129);
+
+		let xts = (0..max).map(|x| Arc::from(large_uxt(x))).collect::<Vec<_>>();
+
+		let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1);
+
+		let results = mempool.extend_unwatched(TransactionSource::External, &xts);
+		assert!(results.iter().all(Result::is_ok));
+		assert_eq!(mempool.bytes(), total_xts_bytes);
+
+		let xt = Arc::from(large_uxt(98));
+		let result = mempool.push_watched(TransactionSource::External, xt);
+		assert!(matches!(
+			result.unwrap_err(),
+			sc_transaction_pool_api::error::Error::ImmediatelyDropped
+		));
+
+		let xt = Arc::from(large_uxt(99));
+		let mut result = mempool.extend_unwatched(TransactionSource::External, &[xt]);
+		assert!(matches!(
+			result.pop().unwrap().unwrap_err(),
+			sc_transaction_pool_api::error::Error::ImmediatelyDropped
+		));
+	}
 }
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs
index 413fca223242a46fde881b848bb39164250a641b..f23dcedd5bfd1b17b1c976708812fbb6251ca13c 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs
@@ -91,7 +91,6 @@ where
 		&self,
 		source: TransactionSource,
 		xts: impl IntoIterator<Item = ExtrinsicFor<ChainApi>> + Clone,
-		xts_hashes: impl IntoIterator<Item = ExtrinsicHash<ChainApi>> + Clone,
 	) -> HashMap<Block::Hash, Vec<Result<ExtrinsicHash<ChainApi>, ChainApi::Error>>> {
 		let submit_futures = {
 			let active_views = self.active_views.read();
@@ -100,9 +99,7 @@ where
 				.map(|(_, view)| {
 					let view = view.clone();
 					let xts = xts.clone();
-					self.dropped_stream_controller
-						.add_initial_views(xts_hashes.clone(), view.at.hash);
-					async move { (view.at.hash, view.submit_many(source, xts.clone()).await) }
+					async move { (view.at.hash, view.submit_many(source, xts).await) }
 				})
 				.collect::<Vec<_>>()
 		};
@@ -127,11 +124,7 @@ where
 
 		let result = active_views
 			.iter()
-			.map(|view| {
-				self.dropped_stream_controller
-					.add_initial_views(std::iter::once(tx_hash), view.at.hash);
-				view.submit_local(xt.clone())
-			})
+			.map(|view| view.submit_local(xt.clone()))
 			.find_or_first(Result::is_ok);
 
 		if let Some(Err(err)) = result {
@@ -154,10 +147,10 @@ where
 		_at: Block::Hash,
 		source: TransactionSource,
 		xt: ExtrinsicFor<ChainApi>,
-	) -> Result<TxStatusStream<ChainApi>, (ChainApi::Error, Option<TxStatusStream<ChainApi>>)> {
+	) -> Result<TxStatusStream<ChainApi>, ChainApi::Error> {
 		let tx_hash = self.api.hash_and_length(&xt).0;
 		let Some(external_watcher) = self.listener.create_external_watcher_for_tx(tx_hash) else {
-			return Err((PoolError::AlreadyImported(Box::new(tx_hash)).into(), None))
+			return Err(PoolError::AlreadyImported(Box::new(tx_hash)).into())
 		};
 		let submit_and_watch_futures = {
 			let active_views = self.active_views.read();
@@ -166,8 +159,6 @@ where
 				.map(|(_, view)| {
 					let view = view.clone();
 					let xt = xt.clone();
-					self.dropped_stream_controller
-						.add_initial_views(std::iter::once(tx_hash), view.at.hash);
 					async move {
 						match view.submit_and_watch(source, xt).await {
 							Ok(watcher) => {
@@ -191,7 +182,7 @@ where
 
 		if let Some(Err(err)) = maybe_error {
 			log::trace!(target: LOG_TARGET, "[{:?}] submit_and_watch: err: {}", tx_hash, err);
-			return Err((err, Some(external_watcher)));
+			return Err(err);
 		};
 
 		Ok(external_watcher)
diff --git a/substrate/client/transaction-pool/src/graph/mod.rs b/substrate/client/transaction-pool/src/graph/mod.rs
index c1225d7356d94542f674615ce9cfb2be4be912e3..d93898b1b22ab7dbeeb910bd80e4da7f644fceae 100644
--- a/substrate/client/transaction-pool/src/graph/mod.rs
+++ b/substrate/client/transaction-pool/src/graph/mod.rs
@@ -31,7 +31,7 @@ mod listener;
 mod pool;
 mod ready;
 mod rotator;
-mod tracked_map;
+pub(crate) mod tracked_map;
 mod validated_pool;
 
 pub mod base_pool;
diff --git a/substrate/client/transaction-pool/src/graph/tracked_map.rs b/substrate/client/transaction-pool/src/graph/tracked_map.rs
index 9e92dffc9f96fad53e12d344bec1127967207d65..6c3bbbf34b553e3e03f47d11317cd9f52aafc847 100644
--- a/substrate/client/transaction-pool/src/graph/tracked_map.rs
+++ b/substrate/client/transaction-pool/src/graph/tracked_map.rs
@@ -18,7 +18,7 @@
 
 use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
 use std::{
-	collections::HashMap,
+	collections::{hash_map::Iter, HashMap},
 	sync::{
 		atomic::{AtomicIsize, Ordering as AtomicOrdering},
 		Arc,
@@ -101,20 +101,30 @@ impl<'a, K, V> TrackedMapReadAccess<'a, K, V>
 where
 	K: Eq + std::hash::Hash,
 {
-	/// Returns true if map contains key.
+	/// Returns true if the map contains given key.
 	pub fn contains_key(&self, key: &K) -> bool {
 		self.inner_guard.contains_key(key)
 	}
 
-	/// Returns reference to the contained value by key, if exists.
+	/// Returns the reference to the contained value by key, if exists.
 	pub fn get(&self, key: &K) -> Option<&V> {
 		self.inner_guard.get(key)
 	}
 
-	/// Returns iterator over all values.
+	/// Returns an iterator over all values.
 	pub fn values(&self) -> std::collections::hash_map::Values<K, V> {
 		self.inner_guard.values()
 	}
+
+	/// Returns the number of elements in the map.
+	pub fn len(&self) -> usize {
+		self.inner_guard.len()
+	}
+
+	/// Returns an iterator over all key-value pairs.
+	pub fn iter(&self) -> Iter<'_, K, V> {
+		self.inner_guard.iter()
+	}
 }
 
 pub struct TrackedMapWriteAccess<'a, K, V> {
@@ -149,10 +159,20 @@ where
 		val
 	}
 
+	/// Returns `true` if the inner map contains a value for the specified key.
+	pub fn contains_key(&self, key: &K) -> bool {
+		self.inner_guard.contains_key(key)
+	}
+
 	/// Returns mutable reference to the contained value by key, if exists.
 	pub fn get_mut(&mut self, key: &K) -> Option<&mut V> {
 		self.inner_guard.get_mut(key)
 	}
+
+	/// Returns the number of elements in the map.
+	pub fn len(&mut self) -> usize {
+		self.inner_guard.len()
+	}
 }
 
 #[cfg(test)]
diff --git a/substrate/client/transaction-pool/tests/fatp_common/mod.rs b/substrate/client/transaction-pool/tests/fatp_common/mod.rs
index 63af729b8b730cd1af007de3658e1152ff4b91b7..15f2b7f79c14764c655b3d58d9e381fcbeb3eaea 100644
--- a/substrate/client/transaction-pool/tests/fatp_common/mod.rs
+++ b/substrate/client/transaction-pool/tests/fatp_common/mod.rs
@@ -186,9 +186,9 @@ macro_rules! assert_pool_status {
 
 #[macro_export]
 macro_rules! assert_ready_iterator {
-	($hash:expr, $pool:expr, [$( $xt:expr ),+]) => {{
+	($hash:expr, $pool:expr, [$( $xt:expr ),*]) => {{
 		let ready_iterator = $pool.ready_at($hash).now_or_never().unwrap();
-		let expected = vec![ $($pool.api().hash_and_length(&$xt).0),+];
+		let expected = vec![ $($pool.api().hash_and_length(&$xt).0),*];
 		let output: Vec<_> = ready_iterator.collect();
 		log::debug!(target:LOG_TARGET, "expected: {:#?}", expected);
 		log::debug!(target:LOG_TARGET, "output: {:#?}", output);
diff --git a/substrate/client/transaction-pool/tests/fatp_limits.rs b/substrate/client/transaction-pool/tests/fatp_limits.rs
index 6fd5f93ed070d91a639e300b2bead0c4d7dac56e..03792fd89dfacbf30e00a9ecaaf8a4a8cabbcfb8 100644
--- a/substrate/client/transaction-pool/tests/fatp_limits.rs
+++ b/substrate/client/transaction-pool/tests/fatp_limits.rs
@@ -19,6 +19,7 @@
 //! Tests of limits for fork-aware transaction pool.
 
 pub mod fatp_common;
+
 use fatp_common::{
 	finalized_block_event, invalid_hash, new_best_block_event, TestPoolBuilder, LOG_TARGET, SOURCE,
 };
@@ -27,6 +28,7 @@ use sc_transaction_pool::ChainApi;
 use sc_transaction_pool_api::{
 	error::Error as TxPoolError, MaintainedTransactionPool, TransactionPool, TransactionStatus,
 };
+use std::thread::sleep;
 use substrate_test_runtime_client::AccountKeyring::*;
 use substrate_test_runtime_transaction_pool::uxt;
 
@@ -92,25 +94,103 @@ fn fatp_limits_ready_count_works() {
 	//charlie was not included into view:
 	assert_pool_status!(header01.hash(), &pool, 2, 0);
 	assert_ready_iterator!(header01.hash(), pool, [xt1, xt2]);
+	//todo: can we do better? We don't have API to check if event was processed internally.
+	let mut counter = 0;
+	while pool.mempool_len().0 == 3 {
+		sleep(std::time::Duration::from_millis(1));
+		counter = counter + 1;
+		if counter > 20 {
+			assert!(false, "timeout");
+		}
+	}
+	assert_eq!(pool.mempool_len().0, 2);
 
 	//branch with alice transactions:
 	let header02b = api.push_block(2, vec![xt1.clone(), xt2.clone()], true);
 	let event = new_best_block_event(&pool, Some(header01.hash()), header02b.hash());
 	block_on(pool.maintain(event));
-	assert_eq!(pool.mempool_len().0, 3);
-	//charlie was resubmitted from mmepool into the view:
-	assert_pool_status!(header02b.hash(), &pool, 1, 0);
-	assert_ready_iterator!(header02b.hash(), pool, [xt0]);
+	assert_eq!(pool.mempool_len().0, 2);
+	assert_pool_status!(header02b.hash(), &pool, 0, 0);
+	assert_ready_iterator!(header02b.hash(), pool, []);
 
 	//branch with alice/charlie transactions shall also work:
 	let header02a = api.push_block(2, vec![xt0.clone(), xt1.clone()], true);
+	api.set_nonce(header02a.hash(), Alice.into(), 201);
 	let event = new_best_block_event(&pool, Some(header02b.hash()), header02a.hash());
 	block_on(pool.maintain(event));
-	assert_eq!(pool.mempool_len().0, 3);
-	assert_pool_status!(header02a.hash(), &pool, 1, 0);
+	assert_eq!(pool.mempool_len().0, 2);
+	// assert_pool_status!(header02a.hash(), &pool, 1, 0);
 	assert_ready_iterator!(header02a.hash(), pool, [xt2]);
 }
 
+#[test]
+fn fatp_limits_ready_count_works_for_submit_at() {
+	sp_tracing::try_init_simple();
+
+	let builder = TestPoolBuilder::new();
+	let (pool, api, _) = builder.with_mempool_count_limit(3).with_ready_count(2).build();
+	api.set_nonce(api.genesis_hash(), Bob.into(), 200);
+	api.set_nonce(api.genesis_hash(), Charlie.into(), 500);
+
+	let header01 = api.push_block(1, vec![], true);
+
+	let event = new_best_block_event(&pool, None, header01.hash());
+	block_on(pool.maintain(event));
+
+	let xt0 = uxt(Charlie, 500);
+	let xt1 = uxt(Alice, 200);
+	let xt2 = uxt(Alice, 201);
+
+	let results = block_on(pool.submit_at(
+		header01.hash(),
+		SOURCE,
+		vec![xt0.clone(), xt1.clone(), xt2.clone()],
+	))
+	.unwrap();
+
+	assert!(matches!(results[0].as_ref().unwrap_err().0, TxPoolError::ImmediatelyDropped));
+	assert!(results[1].as_ref().is_ok());
+	assert!(results[2].as_ref().is_ok());
+	assert_eq!(pool.mempool_len().0, 2);
+	//charlie was not included into view:
+	assert_pool_status!(header01.hash(), &pool, 2, 0);
+	assert_ready_iterator!(header01.hash(), pool, [xt1, xt2]);
+}
+
+#[test]
+fn fatp_limits_ready_count_works_for_submit_and_watch() {
+	sp_tracing::try_init_simple();
+
+	let builder = TestPoolBuilder::new();
+	let (pool, api, _) = builder.with_mempool_count_limit(3).with_ready_count(2).build();
+	api.set_nonce(api.genesis_hash(), Bob.into(), 300);
+	api.set_nonce(api.genesis_hash(), Charlie.into(), 500);
+
+	let header01 = api.push_block(1, vec![], true);
+
+	let event = new_best_block_event(&pool, None, header01.hash());
+	block_on(pool.maintain(event));
+
+	let xt0 = uxt(Charlie, 500);
+	let xt1 = uxt(Alice, 200);
+	let xt2 = uxt(Bob, 300);
+	api.set_priority(&xt0, 2);
+	api.set_priority(&xt1, 2);
+	api.set_priority(&xt2, 1);
+
+	let result0 = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone()));
+	let result1 = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone()));
+	let result2 = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).map(|_| ());
+
+	assert!(matches!(result2.unwrap_err().0, TxPoolError::ImmediatelyDropped));
+	assert!(result0.is_ok());
+	assert!(result1.is_ok());
+	assert_eq!(pool.mempool_len().1, 2);
+	//charlie was not included into view:
+	assert_pool_status!(header01.hash(), &pool, 2, 0);
+	assert_ready_iterator!(header01.hash(), pool, [xt0, xt1]);
+}
+
 #[test]
 fn fatp_limits_future_count_works() {
 	sp_tracing::try_init_simple();
@@ -131,29 +211,33 @@ fn fatp_limits_future_count_works() {
 	let xt2 = uxt(Alice, 201);
 	let xt3 = uxt(Alice, 202);
 
-	let submissions = vec![
-		pool.submit_one(header01.hash(), SOURCE, xt1.clone()),
-		pool.submit_one(header01.hash(), SOURCE, xt2.clone()),
-		pool.submit_one(header01.hash(), SOURCE, xt3.clone()),
-	];
+	block_on(pool.submit_one(header01.hash(), SOURCE, xt1.clone())).unwrap();
+	block_on(pool.submit_one(header01.hash(), SOURCE, xt2.clone())).unwrap();
+	block_on(pool.submit_one(header01.hash(), SOURCE, xt3.clone())).unwrap();
 
-	let results = block_on(futures::future::join_all(submissions));
-	assert!(results.iter().all(Result::is_ok));
 	//charlie was not included into view due to limits:
 	assert_pool_status!(header01.hash(), &pool, 0, 2);
+	//todo: can we do better? We don't have API to check if event was processed internally.
+	let mut counter = 0;
+	while pool.mempool_len().0 != 2 {
+		sleep(std::time::Duration::from_millis(1));
+		counter = counter + 1;
+		if counter > 20 {
+			assert!(false, "timeout");
+		}
+	}
 
 	let header02 = api.push_block(2, vec![xt0], true);
 	api.set_nonce(header02.hash(), Alice.into(), 201); //redundant
 	let event = new_best_block_event(&pool, Some(header01.hash()), header02.hash());
 	block_on(pool.maintain(event));
 
-	//charlie was resubmitted from mmepool into the view:
-	assert_pool_status!(header02.hash(), &pool, 2, 1);
-	assert_eq!(pool.mempool_len().0, 3);
+	assert_pool_status!(header02.hash(), &pool, 2, 0);
+	assert_eq!(pool.mempool_len().0, 2);
 }
 
 #[test]
-fn fatp_limits_watcher_mempool_prevents_dropping() {
+fn fatp_limits_watcher_mempool_doesnt_prevent_dropping() {
 	sp_tracing::try_init_simple();
 
 	let builder = TestPoolBuilder::new();
@@ -169,23 +253,15 @@ fn fatp_limits_watcher_mempool_prevents_dropping() {
 	let xt1 = uxt(Bob, 300);
 	let xt2 = uxt(Alice, 200);
 
-	let submissions = vec![
-		pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone()),
-		pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone()),
-		pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone()),
-	];
-	let mut submissions = block_on(futures::future::join_all(submissions));
-	let xt2_watcher = submissions.remove(2).unwrap();
-	let xt1_watcher = submissions.remove(1).unwrap();
-	let xt0_watcher = submissions.remove(0).unwrap();
+	let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap();
+	let xt1_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap();
+	let xt2_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap();
 
 	assert_pool_status!(header01.hash(), &pool, 2, 0);
 
-	let xt0_status = futures::executor::block_on_stream(xt0_watcher).take(1).collect::<Vec<_>>();
-
+	let xt0_status = futures::executor::block_on_stream(xt0_watcher).take(2).collect::<Vec<_>>();
 	log::debug!("xt0_status: {:#?}", xt0_status);
-
-	assert_eq!(xt0_status, vec![TransactionStatus::Ready]);
+	assert_eq!(xt0_status, vec![TransactionStatus::Ready, TransactionStatus::Dropped]);
 	let xt1_status = futures::executor::block_on_stream(xt1_watcher).take(1).collect::<Vec<_>>();
 
 	assert_eq!(xt1_status, vec![TransactionStatus::Ready]);
@@ -214,28 +290,23 @@ fn fatp_limits_watcher_non_intial_view_drops_transaction() {
 	let xt1 = uxt(Charlie, 400);
 	let xt2 = uxt(Bob, 300);
 
-	let submissions = vec![
-		pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone()),
-		pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone()),
-		pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone()),
-	];
-	let mut submissions = block_on(futures::future::join_all(submissions));
-	let xt2_watcher = submissions.remove(2).unwrap();
-	let xt1_watcher = submissions.remove(1).unwrap();
-	let xt0_watcher = submissions.remove(0).unwrap();
+	let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap();
+	let xt1_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap();
+	let xt2_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap();
+
+	// make sure tx0 is actually dropped before checking iterator
+	let xt0_status = futures::executor::block_on_stream(xt0_watcher).take(2).collect::<Vec<_>>();
+	assert_eq!(xt0_status, vec![TransactionStatus::Ready, TransactionStatus::Dropped]);
 
 	assert_ready_iterator!(header01.hash(), pool, [xt1, xt2]);
 
 	let header02 = api.push_block_with_parent(header01.hash(), vec![], true);
 	block_on(pool.maintain(finalized_block_event(&pool, api.genesis_hash(), header02.hash())));
 	assert_pool_status!(header02.hash(), &pool, 2, 0);
-	assert_ready_iterator!(header02.hash(), pool, [xt2, xt0]);
-
-	let xt0_status = futures::executor::block_on_stream(xt0_watcher).take(1).collect::<Vec<_>>();
-	assert_eq!(xt0_status, vec![TransactionStatus::Ready]);
+	assert_ready_iterator!(header02.hash(), pool, [xt1, xt2]);
 
-	let xt1_status = futures::executor::block_on_stream(xt1_watcher).take(2).collect::<Vec<_>>();
-	assert_eq!(xt1_status, vec![TransactionStatus::Ready, TransactionStatus::Dropped]);
+	let xt1_status = futures::executor::block_on_stream(xt1_watcher).take(1).collect::<Vec<_>>();
+	assert_eq!(xt1_status, vec![TransactionStatus::Ready]);
 
 	let xt2_status = futures::executor::block_on_stream(xt2_watcher).take(1).collect::<Vec<_>>();
 	assert_eq!(xt2_status, vec![TransactionStatus::Ready]);
@@ -259,32 +330,19 @@ fn fatp_limits_watcher_finalized_transaction_frees_ready_space() {
 	let xt1 = uxt(Charlie, 400);
 	let xt2 = uxt(Bob, 300);
 
-	let submissions = vec![
-		pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone()),
-		pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone()),
-		pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone()),
-	];
-	let mut submissions = block_on(futures::future::join_all(submissions));
-	let xt2_watcher = submissions.remove(2).unwrap();
-	let xt1_watcher = submissions.remove(1).unwrap();
-	let xt0_watcher = submissions.remove(0).unwrap();
+	let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap();
+	let xt1_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap();
+	let xt2_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap();
 	assert_ready_iterator!(header01.hash(), pool, [xt1, xt2]);
 
+	let xt0_status = futures::executor::block_on_stream(xt0_watcher).take(2).collect::<Vec<_>>();
+	assert_eq!(xt0_status, vec![TransactionStatus::Ready, TransactionStatus::Dropped]);
+
 	let header02 = api.push_block_with_parent(header01.hash(), vec![xt0.clone()], true);
 	block_on(pool.maintain(finalized_block_event(&pool, api.genesis_hash(), header02.hash())));
 	assert_pool_status!(header02.hash(), &pool, 2, 0);
 	assert_ready_iterator!(header02.hash(), pool, [xt1, xt2]);
 
-	let xt0_status = futures::executor::block_on_stream(xt0_watcher).take(3).collect::<Vec<_>>();
-	assert_eq!(
-		xt0_status,
-		vec![
-			TransactionStatus::Ready,
-			TransactionStatus::InBlock((header02.hash(), 0)),
-			TransactionStatus::Finalized((header02.hash(), 0))
-		]
-	);
-
 	let xt1_status = futures::executor::block_on_stream(xt1_watcher).take(1).collect::<Vec<_>>();
 	assert_eq!(xt1_status, vec![TransactionStatus::Ready]);
 
@@ -311,43 +369,275 @@ fn fatp_limits_watcher_view_can_drop_transcation() {
 	let xt2 = uxt(Bob, 300);
 	let xt3 = uxt(Alice, 200);
 
-	let submissions = vec![
-		pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone()),
-		pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone()),
-		pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone()),
-	];
-	let mut submissions = block_on(futures::future::join_all(submissions));
-	let xt2_watcher = submissions.remove(2).unwrap();
-	let xt1_watcher = submissions.remove(1).unwrap();
-	let xt0_watcher = submissions.remove(0).unwrap();
+	let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap();
+	let xt1_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap();
+	let xt2_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap();
+
+	let xt0_status = futures::executor::block_on_stream(xt0_watcher).take(2).collect::<Vec<_>>();
+	assert_eq!(xt0_status, vec![TransactionStatus::Ready, TransactionStatus::Dropped,]);
 
 	assert_ready_iterator!(header01.hash(), pool, [xt1, xt2]);
 
-	let header02 = api.push_block_with_parent(header01.hash(), vec![xt0.clone()], true);
+	let header02 = api.push_block_with_parent(header01.hash(), vec![], true);
 	block_on(pool.maintain(finalized_block_event(&pool, api.genesis_hash(), header02.hash())));
 
-	let submission = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone()));
-	let xt3_watcher = submission.unwrap();
+	let xt3_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).unwrap();
+
+	let xt1_status = futures::executor::block_on_stream(xt1_watcher).take(2).collect::<Vec<_>>();
+	assert_eq!(xt1_status, vec![TransactionStatus::Ready, TransactionStatus::Dropped]);
 
 	assert_pool_status!(header02.hash(), pool, 2, 0);
 	assert_ready_iterator!(header02.hash(), pool, [xt2, xt3]);
 
-	let xt0_status = futures::executor::block_on_stream(xt0_watcher).take(3).collect::<Vec<_>>();
-	assert_eq!(
-		xt0_status,
-		vec![
-			TransactionStatus::Ready,
-			TransactionStatus::InBlock((header02.hash(), 0)),
-			TransactionStatus::Finalized((header02.hash(), 0))
-		]
+	let xt2_status = futures::executor::block_on_stream(xt2_watcher).take(1).collect::<Vec<_>>();
+	assert_eq!(xt2_status, vec![TransactionStatus::Ready]);
+
+	let xt3_status = futures::executor::block_on_stream(xt3_watcher).take(1).collect::<Vec<_>>();
+	assert_eq!(xt3_status, vec![TransactionStatus::Ready]);
+}
+
+#[test]
+fn fatp_limits_watcher_empty_and_full_view_immediately_drops() {
+	sp_tracing::try_init_simple();
+
+	let builder = TestPoolBuilder::new();
+	let (pool, api, _) = builder.with_mempool_count_limit(4).with_ready_count(2).build();
+	api.set_nonce(api.genesis_hash(), Bob.into(), 300);
+	api.set_nonce(api.genesis_hash(), Charlie.into(), 400);
+	api.set_nonce(api.genesis_hash(), Dave.into(), 500);
+	api.set_nonce(api.genesis_hash(), Eve.into(), 600);
+	api.set_nonce(api.genesis_hash(), Ferdie.into(), 700);
+
+	let header01 = api.push_block(1, vec![], true);
+	let event = new_best_block_event(&pool, None, header01.hash());
+	block_on(pool.maintain(event));
+
+	let xt0 = uxt(Alice, 200);
+	let xt1 = uxt(Bob, 300);
+	let xt2 = uxt(Charlie, 400);
+
+	let xt3 = uxt(Dave, 500);
+	let xt4 = uxt(Eve, 600);
+	let xt5 = uxt(Ferdie, 700);
+
+	let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap();
+	let xt1_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap();
+	let xt2_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap();
+
+	let xt0_status = futures::executor::block_on_stream(xt0_watcher).take(2).collect::<Vec<_>>();
+	assert_eq!(xt0_status, vec![TransactionStatus::Ready, TransactionStatus::Dropped]);
+
+	assert_pool_status!(header01.hash(), &pool, 2, 0);
+	assert_eq!(pool.mempool_len().1, 2);
+
+	let header02e = api.push_block_with_parent(
+		header01.hash(),
+		vec![xt0.clone(), xt1.clone(), xt2.clone()],
+		true,
 	);
+	api.set_nonce(header02e.hash(), Alice.into(), 201);
+	api.set_nonce(header02e.hash(), Bob.into(), 301);
+	api.set_nonce(header02e.hash(), Charlie.into(), 401);
+	block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header02e.hash())));
+
+	assert_pool_status!(header02e.hash(), &pool, 0, 0);
+
+	let header02f = api.push_block_with_parent(header01.hash(), vec![], true);
+	block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header02f.hash())));
+	assert_pool_status!(header02f.hash(), &pool, 2, 0);
+	assert_ready_iterator!(header02f.hash(), pool, [xt1, xt2]);
+
+	let xt3_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).unwrap();
+	let xt4_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt4.clone())).unwrap();
+	let result5 = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt5.clone())).map(|_| ());
+
+	//xt5 hits internal mempool limit
+	assert!(matches!(result5.unwrap_err().0, TxPoolError::ImmediatelyDropped));
+
+	assert_pool_status!(header02e.hash(), &pool, 2, 0);
+	assert_ready_iterator!(header02e.hash(), pool, [xt3, xt4]);
+	assert_eq!(pool.mempool_len().1, 4);
 
 	let xt1_status = futures::executor::block_on_stream(xt1_watcher).take(2).collect::<Vec<_>>();
-	assert_eq!(xt1_status, vec![TransactionStatus::Ready, TransactionStatus::Dropped]);
+	assert_eq!(
+		xt1_status,
+		vec![TransactionStatus::Ready, TransactionStatus::InBlock((header02e.hash(), 1))]
+	);
 
-	let xt2_status = futures::executor::block_on_stream(xt2_watcher).take(1).collect::<Vec<_>>();
-	assert_eq!(xt2_status, vec![TransactionStatus::Ready]);
+	let xt2_status = futures::executor::block_on_stream(xt2_watcher).take(2).collect::<Vec<_>>();
+	assert_eq!(
+		xt2_status,
+		vec![TransactionStatus::Ready, TransactionStatus::InBlock((header02e.hash(), 2))]
+	);
 
 	let xt3_status = futures::executor::block_on_stream(xt3_watcher).take(1).collect::<Vec<_>>();
 	assert_eq!(xt3_status, vec![TransactionStatus::Ready]);
+	let xt4_status = futures::executor::block_on_stream(xt4_watcher).take(1).collect::<Vec<_>>();
+	assert_eq!(xt4_status, vec![TransactionStatus::Ready]);
+}
+
+#[test]
+fn fatp_limits_watcher_empty_and_full_view_drops_with_event() {
+	// it is almost copy of fatp_limits_watcher_empty_and_full_view_immediately_drops, but the
+	// mempool_count limit is set to 5 (vs 4).
+	sp_tracing::try_init_simple();
+
+	let builder = TestPoolBuilder::new();
+	let (pool, api, _) = builder.with_mempool_count_limit(5).with_ready_count(2).build();
+	api.set_nonce(api.genesis_hash(), Bob.into(), 300);
+	api.set_nonce(api.genesis_hash(), Charlie.into(), 400);
+	api.set_nonce(api.genesis_hash(), Dave.into(), 500);
+	api.set_nonce(api.genesis_hash(), Eve.into(), 600);
+	api.set_nonce(api.genesis_hash(), Ferdie.into(), 700);
+
+	let header01 = api.push_block(1, vec![], true);
+	let event = new_best_block_event(&pool, None, header01.hash());
+	block_on(pool.maintain(event));
+
+	let xt0 = uxt(Alice, 200);
+	let xt1 = uxt(Bob, 300);
+	let xt2 = uxt(Charlie, 400);
+
+	let xt3 = uxt(Dave, 500);
+	let xt4 = uxt(Eve, 600);
+	let xt5 = uxt(Ferdie, 700);
+
+	let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap();
+	let xt1_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap();
+	let xt2_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap();
+
+	let xt0_status = futures::executor::block_on_stream(xt0_watcher).take(2).collect::<Vec<_>>();
+	assert_eq!(xt0_status, vec![TransactionStatus::Ready, TransactionStatus::Dropped]);
+
+	assert_pool_status!(header01.hash(), &pool, 2, 0);
+	assert_eq!(pool.mempool_len().1, 2);
+
+	let header02e = api.push_block_with_parent(
+		header01.hash(),
+		vec![xt0.clone(), xt1.clone(), xt2.clone()],
+		true,
+	);
+	api.set_nonce(header02e.hash(), Alice.into(), 201);
+	api.set_nonce(header02e.hash(), Bob.into(), 301);
+	api.set_nonce(header02e.hash(), Charlie.into(), 401);
+	block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header02e.hash())));
+
+	assert_pool_status!(header02e.hash(), &pool, 0, 0);
+
+	let header02f = api.push_block_with_parent(header01.hash(), vec![], true);
+	block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header02f.hash())));
+	assert_pool_status!(header02f.hash(), &pool, 2, 0);
+	assert_ready_iterator!(header02f.hash(), pool, [xt1, xt2]);
+
+	let xt3_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).unwrap();
+	let xt4_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt4.clone())).unwrap();
+	let xt5_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt5.clone())).unwrap();
+
+	assert_pool_status!(header02e.hash(), &pool, 2, 0);
+	assert_ready_iterator!(header02e.hash(), pool, [xt4, xt5]);
+
+	let xt3_status = futures::executor::block_on_stream(xt3_watcher).take(2).collect::<Vec<_>>();
+	assert_eq!(xt3_status, vec![TransactionStatus::Ready, TransactionStatus::Dropped]);
+
+	//xt5 got dropped
+	assert_eq!(pool.mempool_len().1, 4);
+
+	let xt1_status = futures::executor::block_on_stream(xt1_watcher).take(2).collect::<Vec<_>>();
+	assert_eq!(
+		xt1_status,
+		vec![TransactionStatus::Ready, TransactionStatus::InBlock((header02e.hash(), 1))]
+	);
+
+	let xt2_status = futures::executor::block_on_stream(xt2_watcher).take(2).collect::<Vec<_>>();
+	assert_eq!(
+		xt2_status,
+		vec![TransactionStatus::Ready, TransactionStatus::InBlock((header02e.hash(), 2))]
+	);
+
+	let xt4_status = futures::executor::block_on_stream(xt4_watcher).take(1).collect::<Vec<_>>();
+	assert_eq!(xt4_status, vec![TransactionStatus::Ready]);
+
+	let xt5_status = futures::executor::block_on_stream(xt5_watcher).take(1).collect::<Vec<_>>();
+	assert_eq!(xt5_status, vec![TransactionStatus::Ready]);
+}
+
+fn large_uxt(x: usize) -> substrate_test_runtime::Extrinsic {
+	substrate_test_runtime::ExtrinsicBuilder::new_include_data(vec![x as u8; 1024]).build()
+}
+
+#[test]
+fn fatp_limits_ready_size_works() {
+	sp_tracing::try_init_simple();
+
+	let builder = TestPoolBuilder::new();
+	let (pool, api, _) = builder.with_ready_bytes_size(3390).with_future_bytes_size(0).build();
+
+	let header01 = api.push_block(1, vec![], true);
+	let event = new_best_block_event(&pool, None, header01.hash());
+	block_on(pool.maintain(event));
+
+	let xt0 = large_uxt(0);
+	let xt1 = large_uxt(1);
+	let xt2 = large_uxt(2);
+
+	let submissions = vec![
+		pool.submit_one(header01.hash(), SOURCE, xt0.clone()),
+		pool.submit_one(header01.hash(), SOURCE, xt1.clone()),
+		pool.submit_one(header01.hash(), SOURCE, xt2.clone()),
+	];
+
+	let results = block_on(futures::future::join_all(submissions));
+	assert!(results.iter().all(Result::is_ok));
+	//charlie was not included into view:
+	assert_pool_status!(header01.hash(), &pool, 3, 0);
+	assert_ready_iterator!(header01.hash(), pool, [xt0, xt1, xt2]);
+
+	let xt3 = large_uxt(3);
+	let result3 = block_on(pool.submit_one(header01.hash(), SOURCE, xt3.clone()));
+	assert!(matches!(result3.as_ref().unwrap_err().0, TxPoolError::ImmediatelyDropped));
+}
+
+#[test]
+fn fatp_limits_future_size_works() {
+	sp_tracing::try_init_simple();
+	const UXT_SIZE: usize = 137;
+
+	let builder = TestPoolBuilder::new();
+	let (pool, api, _) = builder
+		.with_ready_bytes_size(UXT_SIZE)
+		.with_future_bytes_size(3 * UXT_SIZE)
+		.build();
+	api.set_nonce(api.genesis_hash(), Bob.into(), 200);
+	api.set_nonce(api.genesis_hash(), Charlie.into(), 500);
+
+	let header01 = api.push_block(1, vec![], true);
+
+	let event = new_best_block_event(&pool, None, header01.hash());
+	block_on(pool.maintain(event));
+
+	let xt0 = uxt(Bob, 201);
+	let xt1 = uxt(Charlie, 501);
+	let xt2 = uxt(Alice, 201);
+	let xt3 = uxt(Alice, 202);
+	assert_eq!(api.hash_and_length(&xt0).1, UXT_SIZE);
+	assert_eq!(api.hash_and_length(&xt1).1, UXT_SIZE);
+	assert_eq!(api.hash_and_length(&xt2).1, UXT_SIZE);
+	assert_eq!(api.hash_and_length(&xt3).1, UXT_SIZE);
+
+	let _ = block_on(pool.submit_one(header01.hash(), SOURCE, xt0.clone())).unwrap();
+	let _ = block_on(pool.submit_one(header01.hash(), SOURCE, xt1.clone())).unwrap();
+	let _ = block_on(pool.submit_one(header01.hash(), SOURCE, xt2.clone())).unwrap();
+	let _ = block_on(pool.submit_one(header01.hash(), SOURCE, xt3.clone())).unwrap();
+
+	//todo: can we do better? We don't have API to check if event was processed internally.
+	let mut counter = 0;
+	while pool.mempool_len().0 == 4 {
+		sleep(std::time::Duration::from_millis(1));
+		counter = counter + 1;
+		if counter > 20 {
+			assert!(false, "timeout");
+		}
+	}
+	assert_pool_status!(header01.hash(), &pool, 0, 3);
+	assert_eq!(pool.mempool_len().0, 3);
 }