diff --git a/substrate/client/service/src/lib.rs b/substrate/client/service/src/lib.rs
index 322726a1eff43830f4b84252bc95335978535cfc..52a19da220c05be35edf336c46745e496b76d06d 100644
--- a/substrate/client/service/src/lib.rs
+++ b/substrate/client/service/src/lib.rs
@@ -40,7 +40,7 @@ use std::{
 use codec::{Decode, Encode};
 use futures::{pin_mut, FutureExt, StreamExt};
 use jsonrpsee::RpcModule;
-use log::{debug, error, warn};
+use log::{debug, error, trace, warn};
 use sc_client_api::{blockchain::HeaderBackend, BlockBackend, BlockchainEvents, ProofProvider};
 use sc_network::{
 	config::MultiaddrWithPeerId, service::traits::NetworkService, NetworkBackend, NetworkBlock,
@@ -538,7 +538,7 @@ where
 			{
 				Ok(_) => {
 					let elapsed = start.elapsed();
-					debug!(target: sc_transaction_pool::LOG_TARGET, "import transaction: {elapsed:?}");
+					trace!(target: sc_transaction_pool::LOG_TARGET, "import transaction: {elapsed:?}");
 					TransactionImport::NewGood
 				},
 				Err(e) => match e.into_pool_error() {
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs
index 3588645344ba5fd3a22968be6f28901f463a6796..be20a1608961945c8727e6e6feb24146ba64e9f5 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs
@@ -74,7 +74,7 @@ pub enum DroppedReason<Hash> {
 }
 
 /// Dropped-logic related event from the single view.
-pub type ViewStreamEvent<C> = crate::graph::DroppedByLimitsEvent<ExtrinsicHash<C>, BlockHash<C>>;
+pub type ViewStreamEvent<C> = crate::graph::TransactionStatusEvent<ExtrinsicHash<C>, BlockHash<C>>;
 
 /// Dropped-logic stream of events coming from the single view.
 type ViewStream<C> = Pin<Box<dyn futures::Stream<Item = ViewStreamEvent<C>> + Send>>;
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs
index c609ee2da22e5e4f7d5bf1c724101bbcfdfe39d7..ffe6c20d92b72d2c3aa5c444224f6c32f83aad8e 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs
@@ -23,7 +23,7 @@ use super::{
 	import_notification_sink::MultiViewImportNotificationSink,
 	metrics::MetricsLink as PrometheusMetrics,
 	multi_view_listener::MultiViewListener,
-	tx_mem_pool::{InsertionInfo, TxInMemPool, TxMemPool, TXMEMPOOL_TRANSACTION_LIMIT_MULTIPLIER},
+	tx_mem_pool::{InsertionInfo, TxMemPool, TXMEMPOOL_TRANSACTION_LIMIT_MULTIPLIER},
 	view::View,
 	view_store::ViewStore,
 };
@@ -193,7 +193,9 @@ where
 		future_limits: crate::PoolLimit,
 		mempool_max_transactions_count: usize,
 	) -> (Self, ForkAwareTxPoolTask) {
-		let listener = Arc::from(MultiViewListener::new());
+		let (listener, listener_task) = MultiViewListener::new_with_worker();
+		let listener = Arc::new(listener);
+
 		let (import_notification_sink, import_notification_sink_task) =
 			MultiViewImportNotificationSink::new_with_worker();
 
@@ -220,6 +222,7 @@ where
 
 		let combined_tasks = async move {
 			tokio::select! {
+				_ = listener_task => {},
 				_ = import_notification_sink_task => {},
 				_ = dropped_monitor_task => {}
 			}
@@ -279,14 +282,7 @@ where
 			match dropped.reason {
 				DroppedReason::Usurped(new_tx_hash) => {
 					if let Some(new_tx) = mempool.get_by_hash(new_tx_hash) {
-						view_store
-							.replace_transaction(
-								new_tx.source(),
-								new_tx.tx(),
-								tx_hash,
-								new_tx.is_watched(),
-							)
-							.await;
+						view_store.replace_transaction(new_tx.source(), new_tx.tx(), tx_hash).await;
 					} else {
 						trace!(
 							target: LOG_TARGET,
@@ -318,7 +314,10 @@ where
 		finalized_hash: Block::Hash,
 	) -> Self {
 		let metrics = PrometheusMetrics::new(prometheus);
-		let listener = Arc::from(MultiViewListener::new());
+
+		let (listener, listener_task) = MultiViewListener::new_with_worker();
+		let listener = Arc::new(listener);
+
 		let (revalidation_queue, revalidation_task) =
 			revalidation_worker::RevalidationQueue::new_with_worker();
 
@@ -347,6 +346,7 @@ where
 
 		let combined_tasks = async move {
 			tokio::select! {
+				_ = listener_task => {}
 				_ = revalidation_task => {},
 				_ = import_notification_sink_task => {},
 				_ = dropped_monitor_task => {}
@@ -1077,6 +1077,7 @@ where
 			)
 		};
 
+		let start = Instant::now();
 		// 1. Capture all import notification from the very beginning, so first register all
 		//the listeners.
 		self.import_notification_sink.add_view(
@@ -1089,16 +1090,17 @@ where
 			view.pool.validated_pool().create_dropped_by_limits_stream().boxed(),
 		);
 
-		let start = Instant::now();
-		let watched_xts = self.register_listeners(&mut view).await;
-		let duration = start.elapsed();
+		self.view_store.listener.add_view_aggregated_stream(
+			view.at.hash,
+			view.pool.validated_pool().create_aggregated_stream().boxed(),
+		);
 		// sync the transactions statuses and referencing views in all the listeners with newly
 		// cloned view.
 		view.pool.validated_pool().retrigger_notifications();
 		debug!(
 			target: LOG_TARGET,
 			?at,
-			?duration,
+			duration = ?start.elapsed(),
 			"register_listeners"
 		);
 
@@ -1106,22 +1108,20 @@ where
 		// will make some space for mempool transactions in case we are at the view's limits.
 		let start = Instant::now();
 		self.update_view_with_fork(&view, tree_route, at.clone()).await;
-		let duration = start.elapsed();
 		debug!(
 			target: LOG_TARGET,
 			?at,
-			?duration,
+			duration = ?start.elapsed(),
 			"update_view_with_fork"
 		);
 
 		// 3. Finally, submit transactions from the mempool.
 		let start = Instant::now();
-		self.update_view_with_mempool(&mut view, watched_xts).await;
-		let duration = start.elapsed();
+		self.update_view_with_mempool(&mut view).await;
 		debug!(
 			target: LOG_TARGET,
 			?at,
-			?duration,
+			duration= ?start.elapsed(),
 			"update_view_with_mempool"
 		);
 		let view = Arc::from(view);
@@ -1173,53 +1173,6 @@ where
 		all_extrinsics
 	}
 
-	/// For every watched transaction in the mempool registers a transaction listener in the view.
-	///
-	/// The transaction listener for a given view is also added to multi-view listener. This allows
-	/// to track aggreagated progress of the transaction within the transaction pool.
-	///
-	/// Function returns a list of currently watched transactions in the mempool.
-	async fn register_listeners(
-		&self,
-		view: &View<ChainApi>,
-	) -> Vec<(ExtrinsicHash<ChainApi>, Arc<TxInMemPool<ChainApi, Block>>)> {
-		debug!(
-			target: LOG_TARGET,
-			view_at = ?view.at,
-			xts_count = ?self.mempool.unwatched_and_watched_count(),
-			active_views_count = self.active_views_count(),
-			"register_listeners"
-		);
-
-		//todo [#5495]: maybe we don't need to register listener in view? We could use
-		// multi_view_listener.transaction_in_block
-		let results = self
-			.mempool
-			.clone_watched()
-			.into_iter()
-			.map(|(tx_hash, tx)| {
-				let watcher = view.create_watcher(tx_hash);
-				let at = view.at.clone();
-				async move {
-					trace!(
-						target: LOG_TARGET,
-						?tx_hash,
-						at = ?at.hash,
-						"adding watcher"
-					);
-					self.view_store.listener.add_view_watcher_for_tx(
-						tx_hash,
-						at.hash,
-						watcher.into_stream().boxed(),
-					);
-					(tx_hash, tx)
-				}
-			})
-			.collect::<Vec<_>>();
-
-		future::join_all(results).await
-	}
-
 	/// Updates the given view with the transactions from the internal mempol.
 	///
 	/// All transactions from the mempool (excluding those which are either already imported or
@@ -1229,15 +1182,7 @@ where
 	/// If there are no views, and mempool transaction is reported as invalid for the given view,
 	/// the transaction is reported as invalid and removed from the mempool. This does not apply to
 	/// stale and temporarily banned transactions.
-	///
-	/// As the listeners for watched transactions were registered at the very beginning of maintain
-	/// procedure (`register_listeners`), this function accepts the list of watched transactions
-	/// from the mempool for which listener was actually registered to avoid submit/maintain races.
-	async fn update_view_with_mempool(
-		&self,
-		view: &View<ChainApi>,
-		watched_xts: Vec<(ExtrinsicHash<ChainApi>, Arc<TxInMemPool<ChainApi, Block>>)>,
-	) {
+	async fn update_view_with_mempool(&self, view: &View<ChainApi>) {
 		debug!(
 			target: LOG_TARGET,
 			view_at = ?view.at,
@@ -1247,15 +1192,16 @@ where
 		);
 		let included_xts = self.extrinsics_included_since_finalized(view.at.hash).await;
 
-		let (hashes, xts_filtered): (Vec<_>, Vec<_>) = watched_xts
+		let (hashes, xts_filtered): (Vec<_>, Vec<_>) = self
+			.mempool
+			.clone_transactions()
 			.into_iter()
-			.chain(self.mempool.clone_unwatched().into_iter())
 			.filter(|(hash, _)| !view.is_imported(hash))
 			.filter(|(hash, _)| !included_xts.contains(&hash))
 			.map(|(tx_hash, tx)| (tx_hash, (tx.source(), tx.tx())))
 			.unzip();
 
-		let watched_results = view
+		let results = view
 			.submit_many(xts_filtered)
 			.await
 			.into_iter()
@@ -1267,7 +1213,7 @@ where
 			})
 			.collect::<Vec<_>>();
 
-		let submitted_count = watched_results.len();
+		let submitted_count = results.len();
 
 		debug!(
 			target: LOG_TARGET,
@@ -1283,9 +1229,9 @@ where
 		// if there are no views yet, and a single newly created view is reporting error, just send
 		// out the invalid event, and remove transaction.
 		if self.view_store.is_empty() {
-			for result in watched_results {
+			for result in results {
 				if let Err(tx_hash) = result {
-					self.view_store.listener.invalidate_transactions(&[tx_hash]);
+					self.view_store.listener.transactions_invalidated(&[tx_hash]);
 					self.mempool.remove_transaction(&tx_hash);
 				}
 			}
@@ -1619,9 +1565,9 @@ where
 
 		info!(
 			target: LOG_TARGET,
-			mempool_len = format!("{:?}", self.mempool_len()),
+			txs = ?self.mempool_len(),
 			active_views_count = self.active_views_count(),
-			views_stats = ?self.views_stats(),
+			views = ?self.views_stats(),
 			?event,
 			?duration,
 			"maintain"
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/mod.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/mod.rs
index 5f7294a24fd75df4d4bea38ee585eb744f2123ab..2c4da0182a2524431077ed3284b406bc6fc8869c 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/mod.rs
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/mod.rs
@@ -84,7 +84,8 @@
 //!
 //! ### Multi-view listeners
 //! There is a number of event streams that are provided by individual views:
-//! - [transaction status][`Watcher`],
+//! - aggregated stream of [transactions statuses][`AggregatedStream`] for all the transactions
+//!   within the view in the form of `(transaction-hash, status)` tuple,
 //! - [ready notification][`vp::import_notification_stream`] (see [networking
 //!   section](#networking)),
 //! - [dropped notification][`create_dropped_by_limits_stream`].
@@ -93,10 +94,9 @@
 //! internally). Those aggregators are often referred as multi-view listeners and they implement
 //! stream-specific or event-specific logic.
 //!
-//! The most important is [`MultiViewListener`] which is owned by view store.
-//! More information about it is provided in [transaction
-//! route](#transaction-route-submit_and_watch) section.
-//!
+//! The most important is [`MultiViewListener`] which is owned by view store. Some internal details
+//! on events' flow is provided in [transaction status](#monitoring-the-status-of-a-transaction)
+//! section.
 //!
 //! ### Intermediate transactions buffer: [`TxMemPool`]
 //! The main purpose of an internal [`TxMemPool`] (referred to as *mempool*) is to prevent a
@@ -106,10 +106,11 @@
 //! procedure. Additionally, it allows the pool to accept transactions when no blocks have been
 //! reported yet.
 //!
-//! Since watched and non-watched transactions require a different treatment, the *mempool* keeps a
-//! track on how the transaction was submitted. The [transaction source][`TransactionSource`] used
-//! to submit transactions also needs to be kept in the *mempool*. The *mempool* transaction is a
-//! simple [wrapper][`TxInMemPool`] around the [`Arc`] reference to the actual extrinsic body.
+//! The *mempool* keeps a track on how the transaction was submitted - keeping number of watched and
+//! non-watched transactions is useful for testing and metrics. The [transaction
+//! source][`TransactionSource`] used to submit transactions also needs to be kept in the *mempool*.
+//! The *mempool* transaction is a simple [wrapper][`TxInMemPool`] around the [`Arc`] reference to
+//! the actual extrinsic body.
 //!
 //! Once the view is created, all transactions from *mempool* are submitted to and validated at this
 //! view.
@@ -138,20 +139,37 @@
 //! ### Transaction route: [`submit_and_watch`][`api_submit_and_watch`]
 //!
 //! The [`submit_and_watch`] function allows to submit the transaction and track its
-//! [status][`TransactionStatus`] within the pool. Every view is providing an independent
-//! [stream][`View::submit_and_watch`] of events, which needs to be merged into the single stream
-//! exposed to the [external listener][`TransactionStatusStreamFor`]. For majority of events simple
-//! forwarding of events would not work (e.g. we could get multiple [`Ready`] events, or [`Ready`] /
-//! [`Future`] mix). Some additional stateful logic is required to filter and process the views'
-//! events. It is also easier to trigger some events (e.g. [`Finalized`], [`Invalid`], and
-//! [`Broadcast`]) using some side-channel and simply ignoring these events from the view. All the
-//! before mentioned functionality is provided by the [`MultiViewListener`].
-//!
-//! When watched transaction is submitted to the pool it is added the *mempool* with watched
-//! flag. The external stream for the transaction is created in a [`MultiViewListener`]. Then
-//! transaction is submitted to every active [`View`] (using
-//! [`submit_and_watch`][`View::submit_and_watch`]) and the resulting
-//! views' stream is connected to the [`MultiViewListener`].
+//! [status][`TransactionStatus`] within the pool.
+//!
+//! When a watched transaction is submitted to the pool it is added to the *mempool* with the
+//! watched flag. The external stream for the transaction is created in a [`MultiViewListener`].
+//! Then a transaction is submitted to every active [`View`] (using
+//! [`submit_many`][`View::submit_many`]). The view's [aggregated
+//! stream][`create_aggregated_stream`] was already connected to the [`MultiViewListener`] when new
+//! view was created, so no additional action is required upon the submission. The view will provide
+//! the required updates for all the transactions over this single stream.
+//!
+//!
+//! #### Monitoring the status of a transaction
+//!
+//! Transaction status monitoring and triggering events to [external
+//! listener][`TransactionStatusStreamFor`] (e.g. to RPC client) is responsibility of the
+//! [`MultiViewListener`].
+//!
+//! Every view is providing an independent aggreagated [stream][`create_aggregated_stream`] of
+//! events for all transactions in this view, which needs to be merged into the single stream
+//! exposed to the [external listener][`TransactionStatusStreamFor`] (e.g. to RPC client). For
+//! majority of events simple forwarding would not work (e.g. we could get multiple [`Ready`]
+//! events, or [`Ready`] / [`Future`] mix). Some additional stateful logic (implemented by
+//! [`MultiViewListener`]) is required to filter and process the views' events.
+//!
+//! It is not possible to trigger some external events (e.g., [`Dropped`], [`Finalized`],
+//! [`Invalid`], and [`Broadcast`]) using only the view-aggregated streams. These events require a
+//! pool-wide understanding of the transaction state. For example, dropping a transaction from a
+//! single view does not mean it was dropped from other views. Broadcast and finalized notifications
+//! are sent to the transaction pool API, not at the view level. These events are simply ignored
+//! when they originate in the view. The pool uses a dedicated side channel exposed by
+//! [`MultiViewListener`] to trigger the beforementioned events.
 //!
 //! ### Maintain
 //! The transaction pool exposes the [task][`notification_future`] that listens to the
@@ -169,8 +187,8 @@
 //!   *mempool*
 //! 	- all transactions from the *mempool* (with some obvious filtering applied) are submitted to
 //!    the view,
-//! 	- for all watched transactions from the *mempool* the watcher is registered in the new view,
-//! 	and it is connected to the multi-view-listener,
+//! 	- the new [aggregated stream][`create_aggregated_stream`] of all transactions statuses  is
+//! 	created for the new view and it is connected to the multi-view-listener,
 //! - [update the view][ForkAwareTxPool::update_view_with_fork] with the transactions from the [tree
 //!   route][`TreeRoute`] (which is computed from the recent best block to newly notified one by
 //!   [enactment state][`EnactmentState`] helper):
@@ -292,7 +310,7 @@
 //! [`View`]: crate::fork_aware_txpool::view::View
 //! [`view::revalidate`]: crate::fork_aware_txpool::view::View::revalidate
 //! [`start_background_revalidation`]: crate::fork_aware_txpool::view::View::start_background_revalidation
-//! [`View::submit_and_watch`]: crate::fork_aware_txpool::view::View::submit_and_watch
+//! [`View::submit_many`]: crate::fork_aware_txpool::view::View::submit_many
 //! [`ViewStore`]: crate::fork_aware_txpool::view_store::ViewStore
 //! [`finish_background_revalidations`]: crate::fork_aware_txpool::view_store::ViewStore::finish_background_revalidations
 //! [find_best_view]: crate::fork_aware_txpool::view_store::ViewStore::find_best_view
@@ -305,10 +323,12 @@
 //! [`MultiViewListener`]: crate::fork_aware_txpool::multi_view_listener::MultiViewListener
 //! [`Pool`]: crate::graph::Pool
 //! [`Watcher`]: crate::graph::watcher::Watcher
+//! [`AggregatedStream`]: crate::graph::AggregatedStream
 //! [`Options`]: crate::graph::Options
 //! [`vp::import_notification_stream`]: ../graph/validated_pool/struct.ValidatedPool.html#method.import_notification_stream
 //! [`vp::enforce_limits`]: ../graph/validated_pool/struct.ValidatedPool.html#method.enforce_limits
 //! [`create_dropped_by_limits_stream`]: ../graph/validated_pool/struct.ValidatedPool.html#method.create_dropped_by_limits_stream
+//! [`create_aggregated_stream`]: ../graph/validated_pool/struct.ValidatedPool.html#method.create_aggregated_stream
 //! [`ChainEvent`]: sc_transaction_pool_api::ChainEvent
 //! [`TransactionStatusStreamFor`]: sc_transaction_pool_api::TransactionStatusStreamFor
 //! [`api_submit`]: sc_transaction_pool_api::TransactionPool::submit_at
@@ -323,6 +343,7 @@
 //! [`Invalid`]:sc_transaction_pool_api::TransactionStatus::Invalid
 //! [`InBlock`]:sc_transaction_pool_api::TransactionStatus::InBlock
 //! [`Finalized`]:sc_transaction_pool_api::TransactionStatus::Finalized
+//! [`Dropped`]:sc_transaction_pool_api::TransactionStatus::Dropped
 //! [`ReadyTransactions`]:sc_transaction_pool_api::ReadyTransactions
 //! [`dropped_monitor_task`]: ForkAwareTxPool::dropped_monitor_task
 //! [`ready_poll`]: ForkAwareTxPool::ready_poll
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs
index a513559a7cd5312b53ddcb182ca64d429a47151f..107c2941ec183470e91efb28d9d46e6156713aa0 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs
@@ -21,20 +21,23 @@
 //! aggregated streams of transaction events.
 
 use crate::{
+	common::tracing_log_xt::log_xt_trace,
 	fork_aware_txpool::stream_map_util::next_event,
-	graph::{self, BlockHash, ExtrinsicHash},
+	graph::{self, BlockHash, ExtrinsicHash, TransactionStatusEvent},
 	LOG_TARGET,
 };
-use futures::StreamExt;
+use futures::{Future, FutureExt, Stream, StreamExt};
+use parking_lot::RwLock;
 use sc_transaction_pool_api::{TransactionStatus, TransactionStatusStream, TxIndex};
 use sc_utils::mpsc;
 use sp_runtime::traits::Block as BlockT;
 use std::{
 	collections::{hash_map::Entry, HashMap, HashSet},
 	pin::Pin,
+	sync::Arc,
 };
 use tokio_stream::StreamMap;
-use tracing::{debug, trace};
+use tracing::trace;
 
 use super::dropped_watcher::{DroppedReason, DroppedTransaction};
 
@@ -54,99 +57,202 @@ type CommandReceiver<T> = mpsc::TracingUnboundedReceiver<T>;
 /// It can represent both a single view's stream and an external watcher stream.
 pub type TxStatusStream<T> = Pin<Box<TransactionStatusStream<ExtrinsicHash<T>, BlockHash<T>>>>;
 
-/// Commands to control the single external stream living within the multi view listener.
-enum ControllerCommand<ChainApi: graph::ChainApi> {
-	/// Adds a new stream of transaction statuses originating in the view associated with a
-	/// specific block hash.
-	AddViewStream(BlockHash<ChainApi>, TxStatusStream<ChainApi>),
+/// An aggregated stream providing events for all transactions from the view.
+///
+/// This stream delivers updates for all transactions in the view, rather than for individual
+/// transactions.
+pub type ViewStatusStream<T> =
+	Pin<Box<dyn Stream<Item = TransactionStatusEvent<ExtrinsicHash<T>, BlockHash<T>>> + Send>>;
 
+/// Commands to control / drive the task of the multi view listener.
+enum ControllerCommand<ChainApi: graph::ChainApi> {
+	/// Requests transaction status updated. Sent by transaction pool implementation.
+	TransactionStatusRequest(TransactionStatusUpdate<ChainApi>),
+	/// Adds a new (aggregated) stream of transactions statuses originating in the view associated
+	/// with a specific block hash.
+	AddViewStream(BlockHash<ChainApi>, ViewStatusStream<ChainApi>),
 	/// Removes an existing view's stream associated with a specific block hash.
 	RemoveViewStream(BlockHash<ChainApi>),
+}
 
+/// Represents the transaction status update performed by transaction pool state machine. The
+/// corresponding statuses coming from the view would typically be ignored in the external watcher.
+enum TransactionStatusUpdate<ChainApi: graph::ChainApi> {
 	/// Marks a transaction as invalidated.
 	///
 	/// If all pre-conditions are met, an external invalid event will be sent out.
-	TransactionInvalidated,
+	TransactionInvalidated(ExtrinsicHash<ChainApi>),
 
 	/// Notifies that a transaction was finalized in a specific block hash and transaction index.
 	///
 	/// Send out an external finalized event.
-	FinalizeTransaction(BlockHash<ChainApi>, TxIndex),
+	TransactionFinalized(ExtrinsicHash<ChainApi>, BlockHash<ChainApi>, TxIndex),
 
 	/// Notifies that a transaction was broadcasted with a list of peer addresses.
 	///
 	/// Sends out an external broadcasted event.
-	TransactionBroadcasted(Vec<String>),
+	TransactionBroadcasted(ExtrinsicHash<ChainApi>, Vec<String>),
 
 	/// Notifies that a transaction was dropped from the pool.
 	///
 	/// If all preconditions are met, an external dropped event will be sent out.
-	TransactionDropped(DroppedReason<ExtrinsicHash<ChainApi>>),
+	TransactionDropped(ExtrinsicHash<ChainApi>, DroppedReason<ExtrinsicHash<ChainApi>>),
 }
 
-impl<ChainApi> std::fmt::Debug for ControllerCommand<ChainApi>
+impl<ChainApi> TransactionStatusUpdate<ChainApi>
+where
+	ChainApi: graph::ChainApi,
+{
+	fn hash(&self) -> ExtrinsicHash<ChainApi> {
+		match self {
+			Self::TransactionInvalidated(hash) |
+			Self::TransactionFinalized(hash, _, _) |
+			Self::TransactionBroadcasted(hash, _) |
+			Self::TransactionDropped(hash, _) => *hash,
+		}
+	}
+}
+
+impl<ChainApi> std::fmt::Debug for TransactionStatusUpdate<ChainApi>
 where
 	ChainApi: graph::ChainApi,
 {
 	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 		match self {
-			ControllerCommand::AddViewStream(h, _) => write!(f, "ListenerAction::AddView({h})"),
-			ControllerCommand::RemoveViewStream(h) => write!(f, "ListenerAction::RemoveView({h})"),
-			ControllerCommand::TransactionInvalidated => {
-				write!(f, "ListenerAction::TransactionInvalidated")
+			Self::TransactionInvalidated(h) => {
+				write!(f, "TransactionInvalidated({h})")
 			},
-			ControllerCommand::FinalizeTransaction(h, i) => {
-				write!(f, "ListenerAction::FinalizeTransaction({h},{i})")
+			Self::TransactionFinalized(h, b, i) => {
+				write!(f, "FinalizeTransaction({h},{b},{i})")
 			},
-			ControllerCommand::TransactionBroadcasted(_) => {
-				write!(f, "ListenerAction::TransactionBroadcasted(...)")
+			Self::TransactionBroadcasted(h, _) => {
+				write!(f, "TransactionBroadcasted({h})")
 			},
-			ControllerCommand::TransactionDropped(r) => {
-				write!(f, "ListenerAction::TransactionDropped {r:?}")
+			Self::TransactionDropped(h, r) => {
+				write!(f, "TransactionDropped({h},{r:?})")
+			},
+		}
+	}
+}
+
+impl<ChainApi> std::fmt::Debug for ControllerCommand<ChainApi>
+where
+	ChainApi: graph::ChainApi,
+{
+	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+		match self {
+			ControllerCommand::AddViewStream(h, _) => write!(f, "AddView({h})"),
+			ControllerCommand::RemoveViewStream(h) => write!(f, "RemoveView({h})"),
+			ControllerCommand::TransactionStatusRequest(c) => {
+				write!(f, "TransactionStatusRequest({c:?})")
 			},
 		}
 	}
 }
+impl<ChainApi> ControllerCommand<ChainApi>
+where
+	ChainApi: graph::ChainApi,
+{
+	/// Creates new instance of a command requesting [`TransactionStatus::Invalid`] transaction
+	/// status.
+	fn new_transaction_invalidated(tx_hash: ExtrinsicHash<ChainApi>) -> Self {
+		ControllerCommand::TransactionStatusRequest(
+			TransactionStatusUpdate::TransactionInvalidated(tx_hash),
+		)
+	}
+	/// Creates new instance of a command requesting [`TransactionStatus::Broadcast`] transaction
+	/// status.
+	fn new_transaction_broadcasted(tx_hash: ExtrinsicHash<ChainApi>, peers: Vec<String>) -> Self {
+		ControllerCommand::TransactionStatusRequest(
+			TransactionStatusUpdate::TransactionBroadcasted(tx_hash, peers),
+		)
+	}
+	/// Creates new instance of a command requesting [`TransactionStatus::Finalized`] transaction
+	/// status.
+	fn new_transaction_finalized(
+		tx_hash: ExtrinsicHash<ChainApi>,
+		block_hash: BlockHash<ChainApi>,
+		index: TxIndex,
+	) -> Self {
+		ControllerCommand::TransactionStatusRequest(TransactionStatusUpdate::TransactionFinalized(
+			tx_hash, block_hash, index,
+		))
+	}
+	/// Creates new instance of a command requesting [`TransactionStatus::Dropped`] transaction
+	/// status.
+	fn new_transaction_dropped(
+		tx_hash: ExtrinsicHash<ChainApi>,
+		reason: DroppedReason<ExtrinsicHash<ChainApi>>,
+	) -> Self {
+		ControllerCommand::TransactionStatusRequest(TransactionStatusUpdate::TransactionDropped(
+			tx_hash, reason,
+		))
+	}
+}
 
 /// This struct allows to create and control listener for multiple transactions.
 ///
-/// For every transaction the view's stream generating its own events can be added. The events are
-/// flattened and sent out to the external listener. (The *external*  term here means that it can be
-/// exposed to [`sc_transaction_pool_api::TransactionPool`] API client e.g. over RPC.)
+/// For every view, an aggregated stream of transactions events can be added. The events are
+/// flattened and sent out to the external listener for individual transactions. (The *external*
+/// term here means that it can be exposed to [`sc_transaction_pool_api::TransactionPool`] API
+/// client e.g. over RPC.)
 ///
-/// The listener allows to add and remove view's stream (per transaction).
+/// The listener allows to add and remove view's stream.
 ///
 /// The listener provides a side channel that allows triggering specific events (finalized, dropped,
-/// invalid) independently of the view's stream.
+/// invalid, broadcast) independently of the view's stream.
 pub struct MultiViewListener<ChainApi: graph::ChainApi> {
-	/// Provides the set of controllers for the events streams corresponding to individual
-	/// transactions identified by transaction hashes.
-	controllers: parking_lot::RwLock<
-		HashMap<ExtrinsicHash<ChainApi>, Controller<ControllerCommand<ChainApi>>>,
-	>,
+	/// Provides the controller for sending control commands to the listener's task.
+	controller: Controller<ControllerCommand<ChainApi>>,
+
+	/// The map containing the sinks of the streams representing the external listeners of
+	/// the individual transactions. Hash of the transaction is used as a map's key. A map is
+	/// shared with listener's task.
+	external_controllers:
+		Arc<RwLock<HashMap<ExtrinsicHash<ChainApi>, Controller<ExternalWatcherCommand<ChainApi>>>>>,
 }
 
+/// A type representing a `MultiViewListener` task. For more details refer to
+/// [`MultiViewListener::task`].
+pub type MultiViewListenerTask = Pin<Box<dyn Future<Output = ()> + Send>>;
+
 /// The external stream unfolding context.
 ///
-/// This context is used to unfold the external events stream for a single transaction, it
-/// facilitates the logic of converting single view's events to the external events stream.
+/// This context is used to unfold the external events stream for a individual transaction, it
+/// facilitates the logic of converting events incoming from numerous views into the external events
+/// stream.
 struct ExternalWatcherContext<ChainApi: graph::ChainApi> {
 	/// The hash of the transaction being monitored within this context.
 	tx_hash: ExtrinsicHash<ChainApi>,
-	/// A stream map of transaction status streams coming from individual views, keyed by
-	/// block hash associated with view.
-	status_stream_map: StreamMap<BlockHash<ChainApi>, TxStatusStream<ChainApi>>,
-	/// A receiver for controller commands.
-	command_receiver: CommandReceiver<ControllerCommand<ChainApi>>,
+	/// A receiver for controller commands sent by [`MultiViewListener`]'s task.
+	command_receiver: CommandReceiver<ExternalWatcherCommand<ChainApi>>,
 	/// A flag indicating whether the context should terminate.
 	terminate: bool,
 	/// A flag indicating if a `Future` status has been encountered.
 	future_seen: bool,
 	/// A flag indicating if a `Ready` status has been encountered.
 	ready_seen: bool,
-
 	/// A hash set of block hashes from views that consider the transaction valid.
 	views_keeping_tx_valid: HashSet<BlockHash<ChainApi>>,
+	/// The set of views (represented by block hashes) currently maintained by the transaction
+	/// pool.
+	known_views: HashSet<BlockHash<ChainApi>>,
+}
+
+/// Commands to control the single external stream living within the multi view listener. These
+/// commands are sent from listener's task to [`ExternalWatcherContext`].
+enum ExternalWatcherCommand<ChainApi: graph::ChainApi> {
+	/// Command for triggering some of the transaction states, that are decided by the pool logic.
+	PoolTransactionStatus(TransactionStatusUpdate<ChainApi>),
+	/// Transaction status updates coming from the individual views.
+	ViewTransactionStatus(
+		BlockHash<ChainApi>,
+		TransactionStatus<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
+	),
+	/// Notification about new view being added.
+	AddView(BlockHash<ChainApi>),
+	/// Notification about view being removed.
+	RemoveView(BlockHash<ChainApi>),
 }
 
 impl<ChainApi: graph::ChainApi> ExternalWatcherContext<ChainApi>
@@ -155,44 +261,85 @@ where
 {
 	/// Creates new `ExternalWatcherContext` for particular transaction identified by `tx_hash`
 	///
-	/// The `command_receiver` is a side channel for receiving controller's commands.
+	/// The `command_receiver` is a side channel for receiving controller's
+	/// [commands][`ExternalWatcherCommand`].
 	fn new(
 		tx_hash: ExtrinsicHash<ChainApi>,
-		command_receiver: CommandReceiver<ControllerCommand<ChainApi>>,
+		command_receiver: CommandReceiver<ExternalWatcherCommand<ChainApi>>,
 	) -> Self {
 		Self {
 			tx_hash,
-			status_stream_map: StreamMap::new(),
 			command_receiver,
 			terminate: false,
 			future_seen: false,
 			ready_seen: false,
 			views_keeping_tx_valid: Default::default(),
+			known_views: Default::default(),
 		}
 	}
 
-	/// Handles various transaction status updates and manages internal states based on the status.
+	/// Handles transaction status updates from the pool and manages internal states based on the
+	/// input value.
+	///
+	/// Function may set the context termination flag, which will close the stream.
+	///
+	/// Returns `Some` with the `event` to be sent out or `None`.
+	fn handle_pool_transaction_status(
+		&mut self,
+		request: TransactionStatusUpdate<ChainApi>,
+	) -> Option<TransactionStatus<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>> {
+		match request {
+			TransactionStatusUpdate::TransactionInvalidated(..) =>
+				if self.handle_invalidate_transaction() {
+					log::trace!(target: LOG_TARGET, "[{:?}] mvl sending out: Invalid", self.tx_hash);
+					return Some(TransactionStatus::Invalid)
+				},
+			TransactionStatusUpdate::TransactionFinalized(_, block, index) => {
+				log::trace!(target: LOG_TARGET, "[{:?}] mvl sending out: Finalized", self.tx_hash);
+				self.terminate = true;
+				return Some(TransactionStatus::Finalized((block, index)))
+			},
+			TransactionStatusUpdate::TransactionBroadcasted(_, peers) => {
+				log::trace!(target: LOG_TARGET, "[{:?}] mvl sending out: Broadcasted", self.tx_hash);
+				return Some(TransactionStatus::Broadcast(peers))
+			},
+			TransactionStatusUpdate::TransactionDropped(_, DroppedReason::LimitsEnforced) => {
+				log::trace!(target: LOG_TARGET, "[{:?}] mvl sending out: Dropped", self.tx_hash);
+				self.terminate = true;
+				return Some(TransactionStatus::Dropped)
+			},
+			TransactionStatusUpdate::TransactionDropped(_, DroppedReason::Usurped(by)) => {
+				log::trace!(target: LOG_TARGET, "[{:?}] mvl sending out: Usurped({:?})", self.tx_hash, by);
+				self.terminate = true;
+				return Some(TransactionStatus::Usurped(by))
+			},
+		};
+		None
+	}
+
+	/// Handles various transaction status updates from individual views and manages internal states
+	/// based on the input value.
 	///
 	/// Function may set the context termination flag, which will close the stream.
 	///
-	/// Returns `Some` with the `event` to forward or `None`.
-	fn handle(
+	/// Returns `Some` with the `event` to be sent out or `None`.
+	fn handle_view_transaction_status(
 		&mut self,
+		block_hash: BlockHash<ChainApi>,
 		status: TransactionStatus<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
-		hash: BlockHash<ChainApi>,
 	) -> Option<TransactionStatus<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>> {
 		trace!(
 			target: LOG_TARGET,
 			tx_hash = ?self.tx_hash,
-			?hash,
+			?block_hash,
 			?status,
-			views = ?self.status_stream_map.keys().collect::<Vec<_>>(),
+			views = ?self.known_views.iter().collect::<Vec<_>>(),
 			"mvl handle event"
 		);
 
 		match status {
 			TransactionStatus::Future => {
-				self.views_keeping_tx_valid.insert(hash);
+				self.views_keeping_tx_valid.insert(block_hash);
 				if self.ready_seen || self.future_seen {
 					None
 				} else {
@@ -201,7 +348,7 @@ where
 				}
 			},
 			TransactionStatus::Ready => {
-				self.views_keeping_tx_valid.insert(hash);
+				self.views_keeping_tx_valid.insert(block_hash);
 				if self.ready_seen {
 					None
 				} else {
@@ -209,9 +356,8 @@ where
 					Some(status)
 				}
 			},
-			TransactionStatus::Broadcast(_) => None,
 			TransactionStatus::InBlock((..)) => {
-				self.views_keeping_tx_valid.insert(hash);
+				self.views_keeping_tx_valid.insert(block_hash);
 				if !(self.ready_seen || self.future_seen) {
 					self.ready_seen = true;
 					Some(status)
@@ -219,12 +365,13 @@ where
 					Some(status)
 				}
 			},
-			TransactionStatus::Retracted(_) => None,
 			TransactionStatus::FinalityTimeout(_) => Some(status),
 			TransactionStatus::Finalized(_) => {
 				self.terminate = true;
 				Some(status)
 			},
+			TransactionStatus::Retracted(_) |
+			TransactionStatus::Broadcast(_) |
 			TransactionStatus::Usurped(_) |
 			TransactionStatus::Dropped |
 			TransactionStatus::Invalid => None,
@@ -238,13 +385,11 @@ where
 	/// Returns true if the event should be sent out, and false if the invalidation request should
 	/// be skipped.
 	fn handle_invalidate_transaction(&mut self) -> bool {
-		let keys = HashSet::<BlockHash<ChainApi>>::from_iter(
-			self.status_stream_map.keys().map(Clone::clone),
-		);
+		let keys = self.known_views.clone();
 		trace!(
 			target: LOG_TARGET,
 			tx_hash = ?self.tx_hash,
-			views = ?self.status_stream_map.keys().collect::<Vec<_>>(),
+			views = ?self.known_views.iter().collect::<Vec<_>>(),
 			"got invalidate_transaction"
 		);
 		if self.views_keeping_tx_valid.is_disjoint(&keys) {
@@ -261,33 +406,33 @@ where
 		}
 	}
 
-	/// Adds a new transaction status stream.
+	/// Adds a new aggragted transaction status stream.
 	///
-	/// Inserts a new view's transaction status stream associated with a specific block hash into
-	/// the stream map.
-	fn add_stream(&mut self, block_hash: BlockHash<ChainApi>, stream: TxStatusStream<ChainApi>) {
-		self.status_stream_map.insert(block_hash, stream);
+	/// Inserts a new view's transaction status stream into the stream map. The view is represented
+	/// by `block_hash`.
+	fn add_view(&mut self, block_hash: BlockHash<ChainApi>) {
 		trace!(
 			target: LOG_TARGET,
 			tx_hash = ?self.tx_hash,
 			?block_hash,
-			views = ?self.status_stream_map.keys().collect::<Vec<_>>(),
+			views = ?self.known_views.iter().collect::<Vec<_>>(),
 			"AddView view"
 		);
+		self.known_views.insert(block_hash);
 	}
 
-	/// Removes an existing transaction status stream.
+	/// Removes an existing aggreagated transaction status stream.
 	///
-	/// Removes a transaction status stream associated with a specific block hash from the
-	/// stream map.
+	/// Removes an aggregated transaction status stream associated with a specific block hash from
+	/// the stream map.
 	fn remove_view(&mut self, block_hash: BlockHash<ChainApi>) {
-		self.status_stream_map.remove(&block_hash);
+		self.known_views.remove(&block_hash);
 		self.views_keeping_tx_valid.remove(&block_hash);
 		trace!(
 			target: LOG_TARGET,
 			tx_hash = ?self.tx_hash,
 			?block_hash,
-			views = ?self.status_stream_map.keys().collect::<Vec<_>>(),
+			views = ?self.known_views.iter().collect::<Vec<_>>(),
 			"RemoveView view"
 		);
 	}
@@ -298,125 +443,180 @@ where
 	ChainApi: graph::ChainApi + 'static,
 	<<ChainApi as graph::ChainApi>::Block as BlockT>::Hash: Unpin,
 {
-	/// Creates new instance of `MultiViewListener`.
-	pub fn new() -> Self {
-		Self { controllers: Default::default() }
+	/// A worker task associated with `MultiViewListener` instance.
+	///
+	/// An asynchronous listener's task responsible for dispatching:
+	/// - stream_map containing aggregated transaction status streams from multiple views,
+	/// - view add/remove requests,
+	/// - transaction commands,
+	/// to multiple individual per-transaction external watcher contexts.
+	///
+	/// The future shall be polled by instantiator of `MultiViewListener`.
+	async fn task(
+		external_watchers_tx_hash_map: Arc<
+			RwLock<HashMap<ExtrinsicHash<ChainApi>, Controller<ExternalWatcherCommand<ChainApi>>>>,
+		>,
+		mut command_receiver: CommandReceiver<ControllerCommand<ChainApi>>,
+	) {
+		let mut aggregated_streams_map: StreamMap<BlockHash<ChainApi>, ViewStatusStream<ChainApi>> =
+			Default::default();
+
+		loop {
+			tokio::select! {
+				biased;
+				Some((view_hash, (tx_hash, status))) =  next_event(&mut aggregated_streams_map) => {
+					if let Entry::Occupied(mut ctrl) = external_watchers_tx_hash_map.write().entry(tx_hash) {
+						log::trace!(
+							target: LOG_TARGET,
+							"[{:?}] aggregated_stream_map event: view:{} status:{:?}",
+							tx_hash,
+							view_hash,
+							status
+						);
+						if let Err(e) = ctrl
+							.get_mut()
+							.unbounded_send(ExternalWatcherCommand::ViewTransactionStatus(view_hash, status))
+						{
+							trace!(target: LOG_TARGET, "[{:?}] send status failed: {:?}", tx_hash, e);
+							ctrl.remove();
+						}
+					}
+				},
+				cmd = command_receiver.next() => {
+					log::trace!(target: LOG_TARGET, "cmd {:?}", cmd);
+					match cmd {
+						Some(ControllerCommand::AddViewStream(h,stream)) => {
+							aggregated_streams_map.insert(h,stream);
+							// //todo: aysnc and join all?
+							external_watchers_tx_hash_map.write().retain(|tx_hash, ctrl| {
+								ctrl.unbounded_send(ExternalWatcherCommand::AddView(h))
+									.inspect_err(|e| {
+										trace!(target: LOG_TARGET, "[{:?}] invalidate_transaction: send message failed: {:?}", tx_hash, e);
+									})
+									.is_ok()
+							})
+						},
+						Some(ControllerCommand::RemoveViewStream(h)) => {
+							aggregated_streams_map.remove(&h);
+							//todo: aysnc and join all?
+							external_watchers_tx_hash_map.write().retain(|tx_hash, ctrl| {
+								ctrl.unbounded_send(ExternalWatcherCommand::RemoveView(h))
+									.inspect_err(|e| {
+										trace!(target: LOG_TARGET, "[{:?}] invalidate_transaction: send message failed: {:?}", tx_hash, e);
+									})
+									.is_ok()
+							})
+						},
+
+						Some(ControllerCommand::TransactionStatusRequest(request)) => {
+							let tx_hash = request.hash();
+							if let Entry::Occupied(mut ctrl) = external_watchers_tx_hash_map.write().entry(tx_hash) {
+								if let Err(e) = ctrl
+									.get_mut()
+									.unbounded_send(ExternalWatcherCommand::PoolTransactionStatus(request))
+								{
+									trace!(target: LOG_TARGET, "[{:?}] send message failed: {:?}", tx_hash, e);
+									ctrl.remove();
+								}
+							}
+						},
+						None =>  {}
+					}
+				},
+			};
+		}
 	}
 
-	/// Returns `true` if the listener contains a stream controller for the specified hash.
-	pub fn contains_tx(&self, tx_hash: &ExtrinsicHash<ChainApi>) -> bool {
-		self.controllers.read().contains_key(tx_hash)
+	/// Creates a new [`MultiViewListener`] instance along with its associated worker task.
+	///
+	/// This function instansiates the new `MultiViewListener` and provides the worker task that
+	/// relays messages to the external transactions listeners. The task shall be polled by caller.
+	///
+	/// Returns a tuple containing the [`MultiViewListener`] and the
+	/// [`MultiViewListenerTask`].
+	pub fn new_with_worker() -> (Self, MultiViewListenerTask) {
+		let external_controllers = Arc::from(RwLock::from(HashMap::<
+			ExtrinsicHash<ChainApi>,
+			Controller<ExternalWatcherCommand<ChainApi>>,
+		>::default()));
+
+		const CONTROLLER_QUEUE_WARN_SIZE: usize = 100_000;
+		let (tx, rx) = mpsc::tracing_unbounded(
+			"txpool-multi-view-listener-task-controller",
+			CONTROLLER_QUEUE_WARN_SIZE,
+		);
+		let task = Self::task(external_controllers.clone(), rx);
+
+		(Self { external_controllers, controller: tx }, task.boxed())
 	}
 
-	/// Creates an external aggregated stream of events for given transaction.
+	/// Creates an external tstream of events for given transaction.
 	///
 	/// This method initializes an `ExternalWatcherContext` for the provided transaction hash, sets
-	/// up the necessary communication channels, and unfolds an external (meaning that it can be
-	/// exposed to [`sc_transaction_pool_api::TransactionPool`] API client e.g. rpc) stream of
-	/// transaction status events. If an external watcher is already present for the given
-	/// transaction, it returns `None`.
+	/// up the necessary communication channel with listener's task, and unfolds an external
+	/// (meaning that it can be exposed to [`sc_transaction_pool_api::TransactionPool`] API client
+	/// e.g. rpc) stream of transaction status events. If an external watcher is already present for
+	/// the given transaction, it returns `None`.
 	pub(crate) fn create_external_watcher_for_tx(
 		&self,
 		tx_hash: ExtrinsicHash<ChainApi>,
 	) -> Option<TxStatusStream<ChainApi>> {
-		let mut controllers = self.controllers.write();
-		if controllers.contains_key(&tx_hash) {
-			return None
-		}
+		let external_ctx = match self.external_controllers.write().entry(tx_hash) {
+			Entry::Occupied(_) => return None,
+			Entry::Vacant(entry) => {
+				const EXT_CONTROLLER_QUEUE_WARN_THRESHOLD: usize = 128;
+				let (tx, rx) = mpsc::tracing_unbounded(
+					"txpool-multi-view-listener",
+					EXT_CONTROLLER_QUEUE_WARN_THRESHOLD,
+				);
+				entry.insert(tx);
+				ExternalWatcherContext::new(tx_hash, rx)
+			},
+		};
 
 		trace!(
 			target: LOG_TARGET,
 			?tx_hash,
 			"create_external_watcher_for_tx"
 		);
-		let (tx, rx) = mpsc::tracing_unbounded("txpool-multi-view-listener", 32);
-		controllers.insert(tx_hash, tx);
-
-		let ctx = ExternalWatcherContext::new(tx_hash, rx);
 
 		Some(
-			futures::stream::unfold(ctx, |mut ctx| async move {
+			futures::stream::unfold(external_ctx, |mut ctx| async move {
 				if ctx.terminate {
+					log::trace!(target: LOG_TARGET, "[{:?}] terminate", ctx.tx_hash);
 					return None
 				}
 				loop {
 					tokio::select! {
-						biased;
-						Some((view_hash, status)) =  next_event(&mut ctx.status_stream_map) => {
-							if let Some(new_status) = ctx.handle(status, view_hash) {
-								trace!(
-									target: LOG_TARGET,
-									tx_hash = ?ctx.tx_hash,
-									?new_status,
-									"mvl sending out"
-								);
-							return Some((new_status, ctx))
-							}
-						},
 						cmd = ctx.command_receiver.next() => {
-							trace!(
-								target: LOG_TARGET,
-								tx_hash = ?ctx.tx_hash,
-								views = ?ctx.status_stream_map.keys().collect::<Vec<_>>(),
-								"select::rx"
-							);
 							match cmd? {
-								ControllerCommand::AddViewStream(h,stream) => {
-									ctx.add_stream(h, stream);
-								},
-								ControllerCommand::RemoveViewStream(h) => {
-									ctx.remove_view(h);
-								},
-								ControllerCommand::TransactionInvalidated => {
-									if ctx.handle_invalidate_transaction() {
+								ExternalWatcherCommand::ViewTransactionStatus(view_hash, status) => {
+									if let Some(new_status) = ctx.handle_view_transaction_status(view_hash, status) {
 										trace!(
 											target: LOG_TARGET,
 											tx_hash = ?ctx.tx_hash,
-											status = "Invalid",
+											?new_status,
 											"mvl sending out"
 										);
-										return Some((TransactionStatus::Invalid, ctx))
+										return Some((new_status, ctx))
 									}
 								},
-								ControllerCommand::FinalizeTransaction(block, index) => {
-									trace!(
-										target: LOG_TARGET,
-										tx_hash = ?ctx.tx_hash,
-										status = "Finalized",
-										"mvl sending out"
-									);
-									ctx.terminate = true;
-									return Some((TransactionStatus::Finalized((block, index)), ctx))
-								},
-								ControllerCommand::TransactionBroadcasted(peers) => {
-									trace!(
-										target: LOG_TARGET,
-										tx_hash = ?ctx.tx_hash,
-										status = "Broadcasted",
-										"mvl sending out"
-									);
-									return Some((TransactionStatus::Broadcast(peers), ctx))
-								},
-								ControllerCommand::TransactionDropped(DroppedReason::LimitsEnforced) => {
-									trace!(
-										target: LOG_TARGET,
-										tx_hash = ?ctx.tx_hash,
-										status = "Dropped",
-										"mvl sending out"
-									);
-									ctx.terminate = true;
-									return Some((TransactionStatus::Dropped, ctx))
+								ExternalWatcherCommand::PoolTransactionStatus(request) => {
+									if let Some(new_status) = ctx.handle_pool_transaction_status(request) {
+										trace!(
+											target: LOG_TARGET,
+											tx_hash = ?ctx.tx_hash,
+											?new_status,
+											"mvl sending out"
+										);
+										return Some((new_status, ctx))
+									}
+								}
+								ExternalWatcherCommand::AddView(h) => {
+									ctx.add_view(h);
 								},
-								ControllerCommand::TransactionDropped(DroppedReason::Usurped(by)) => {
-									trace!(
-										target: LOG_TARGET,
-										tx_hash = ?ctx.tx_hash,
-										status = "Usurped",
-										?by,
-										"mvl sending out"
-									);
-									ctx.terminate = true;
-									return Some((TransactionStatus::Usurped(by), ctx))
+								ExternalWatcherCommand::RemoveView(h) => {
+									ctx.remove_view(h);
 								},
 							}
 						},
@@ -427,178 +627,142 @@ where
 		)
 	}
 
-	/// Adds a view's transaction status stream for particular transaction.
+	/// Adds an aggregated view's transaction status stream.
+	///
+	/// This method sends a `AddViewStream` command to the task, from where it is further dispatched
+	/// to the external watcher context for every watched transaction.
 	///
-	/// This method sends a `AddViewStream` command to the controller of each transaction to
-	/// remove the view's stream corresponding to the given block hash.
-	pub(crate) fn add_view_watcher_for_tx(
+	/// The stream is associated with a view represented by `block_hash`.
+	pub(crate) fn add_view_aggregated_stream(
 		&self,
-		tx_hash: ExtrinsicHash<ChainApi>,
 		block_hash: BlockHash<ChainApi>,
-		stream: TxStatusStream<ChainApi>,
+		stream: ViewStatusStream<ChainApi>,
 	) {
-		let mut controllers = self.controllers.write();
-
-		if let Entry::Occupied(mut tx) = controllers.entry(tx_hash) {
-			if let Err(error) = tx
-				.get_mut()
-				.unbounded_send(ControllerCommand::AddViewStream(block_hash, stream))
-			{
-				trace!(
-					target: LOG_TARGET,
-					?tx_hash,
-					%error,
-					"add_view_watcher_for_tx: send message failed"
-				);
-				tx.remove();
-			}
+		trace!(target: LOG_TARGET, ?block_hash, "mvl::add_view_aggregated_stream");
+		if let Err(error) = self
+			.controller
+			.unbounded_send(ControllerCommand::AddViewStream(block_hash, stream))
+		{
+			trace!(
+				target: LOG_TARGET,
+				?block_hash,
+				%error,
+				"add_view_aggregated_stream: send message failed"
+			);
 		}
 	}
 
-	/// Removes a view's stream associated with a specific view hash across all transactions.
+	/// Removes a view's stream associated with a specific view hash.
 	///
-	/// This method sends a `RemoveViewStream` command to the controller of each transaction to
-	/// remove the view's stream corresponding to the given block hash.
+	/// This method sends a `RemoveViewStream` command to the listener's task, from where is further
+	/// dispatched to the external watcher context for every watched transaction.
 	pub(crate) fn remove_view(&self, block_hash: BlockHash<ChainApi>) {
-		self.controllers.write().retain(|tx_hash, sender| {
-			sender
-				.unbounded_send(ControllerCommand::RemoveViewStream(block_hash))
-				.map_err(|error| {
-					trace!(
-						target: LOG_TARGET,
-						?tx_hash,
-						%error,
-						"remove_view: send message failed"
-					);
-					error
-				})
-				.is_ok()
-		});
+		trace!(target: LOG_TARGET, ?block_hash, "mvl::remove_view");
+		if let Err(error) =
+			self.controller.unbounded_send(ControllerCommand::RemoveViewStream(block_hash))
+		{
+			trace!(
+				target: LOG_TARGET,
+				?block_hash,
+				%error,
+				"remove_view: send message failed"
+			);
+		}
 	}
 
 	/// Invalidate given transaction.
 	///
-	/// This method sends a `TransactionInvalidated` command to the controller of each transaction
-	/// provided to process the invalidation request.
+	/// This method sends a `TransactionInvalidated` command to the task's controller of each
+	/// transaction provided to process the invalidation request.
 	///
 	/// The external event will be sent if no view is referencing the transaction as `Ready` or
 	/// `Future`.
-	pub(crate) fn invalidate_transactions(&self, invalid_hashes: &[ExtrinsicHash<ChainApi>]) {
-		let mut controllers = self.controllers.write();
-		invalid_hashes.iter().for_each(|tx_hash| {
-			if let Entry::Occupied(mut tx) = controllers.entry(*tx_hash) {
+	pub(crate) fn transactions_invalidated(&self, invalid_hashes: &[ExtrinsicHash<ChainApi>]) {
+		log_xt_trace!(target: LOG_TARGET, invalid_hashes, "transactions_invalidated");
+		for tx_hash in invalid_hashes {
+			if let Err(error) = self
+				.controller
+				.unbounded_send(ControllerCommand::new_transaction_invalidated(*tx_hash))
+			{
 				trace!(
 					target: LOG_TARGET,
 					?tx_hash,
-					"invalidate_transaction"
+					%error,
+					"transactions_invalidated: send message failed"
 				);
-				if let Err(error) =
-					tx.get_mut().unbounded_send(ControllerCommand::TransactionInvalidated)
-				{
-					trace!(
-						target: LOG_TARGET,
-						?tx_hash,
-						%error,
-						"invalidate_transaction: send message failed"
-					);
-					tx.remove();
-				}
 			}
-		});
+		}
 	}
 
 	/// Send `Broadcasted` event to listeners of all transactions.
 	///
-	/// This method sends a `TransactionBroadcasted` command to the controller of each transaction
-	/// provided prompting the external `Broadcasted` event.
+	/// This method sends a `TransactionBroadcasted` command to the task's controller for each
+	/// transaction provided. It will prompt the external `Broadcasted` event.
 	pub(crate) fn transactions_broadcasted(
 		&self,
 		propagated: HashMap<ExtrinsicHash<ChainApi>, Vec<String>>,
 	) {
-		let mut controllers = self.controllers.write();
-		propagated.into_iter().for_each(|(tx_hash, peers)| {
-			if let Entry::Occupied(mut tx) = controllers.entry(tx_hash) {
+		for (tx_hash, peers) in propagated {
+			if let Err(error) = self
+				.controller
+				.unbounded_send(ControllerCommand::new_transaction_broadcasted(tx_hash, peers))
+			{
 				trace!(
 					target: LOG_TARGET,
 					?tx_hash,
-					"transaction_broadcasted"
+					%error,
+					"transactions_broadcasted: send message failed"
 				);
-				if let Err(error) =
-					tx.get_mut().unbounded_send(ControllerCommand::TransactionBroadcasted(peers))
-				{
-					trace!(
-						target: LOG_TARGET,
-						?tx_hash,
-						%error,
-						"transactions_broadcasted: send message failed"
-					);
-					tx.remove();
-				}
 			}
-		});
+		}
 	}
 
 	/// Send `Dropped` event to listeners of transactions.
 	///
-	/// This method sends a `TransactionDropped` command to the controller of each requested
-	/// transaction prompting and external `Broadcasted` event.
+	/// This method sends a `TransactionDropped` command to the task's controller. It will prompt
+	/// the external `Broadcasted` event.
 	pub(crate) fn transaction_dropped(&self, dropped: DroppedTransaction<ExtrinsicHash<ChainApi>>) {
-		let mut controllers = self.controllers.write();
-		debug!(
-			target: LOG_TARGET,
-			?dropped,
-			"mvl::transaction_dropped"
-		);
-		if let Some(tx) = controllers.remove(&dropped.tx_hash) {
-			let DroppedTransaction { tx_hash, reason } = dropped;
-			debug!(
+		let DroppedTransaction { tx_hash, reason } = dropped;
+		trace!(target: LOG_TARGET, ?tx_hash, ?reason, "transaction_dropped");
+		if let Err(error) = self
+			.controller
+			.unbounded_send(ControllerCommand::new_transaction_dropped(tx_hash, reason))
+		{
+			trace!(
 				target: LOG_TARGET,
 				?tx_hash,
-				"transaction_dropped"
+				%error,
+				"transaction_dropped: send message failed"
 			);
-			if let Err(error) = tx.unbounded_send(ControllerCommand::TransactionDropped(reason)) {
-				trace!(
-					target: LOG_TARGET,
-					?tx_hash,
-					%error,
-					"transaction_dropped: send message failed"
-				);
-			};
 		}
 	}
 
 	/// Send `Finalized` event for given transaction at given block.
 	///
-	/// This will send `Finalized` event to the external watcher.
-	pub(crate) fn finalize_transaction(
+	/// This will trigger `Finalized` event to the external watcher.
+	pub(crate) fn transaction_finalized(
 		&self,
 		tx_hash: ExtrinsicHash<ChainApi>,
 		block: BlockHash<ChainApi>,
 		idx: TxIndex,
 	) {
-		let mut controllers = self.controllers.write();
-		if let Some(tx) = controllers.remove(&tx_hash) {
+		trace!(target: LOG_TARGET, ?tx_hash, "transaction_finalized");
+		if let Err(error) = self
+			.controller
+			.unbounded_send(ControllerCommand::new_transaction_finalized(tx_hash, block, idx))
+		{
 			trace!(
 				target: LOG_TARGET,
 				?tx_hash,
-				"finalize_transaction"
+				%error,
+				"transaction_finalized: send message failed"
 			);
-			if let Err(error) =
-				tx.unbounded_send(ControllerCommand::FinalizeTransaction(block, idx))
-			{
-				trace!(
-					target: LOG_TARGET,
-					?tx_hash,
-					%error,
-					"finalize_transaction: send message failed"
-				);
-			}
 		};
 	}
 
 	/// Removes stale controllers.
 	pub(crate) fn remove_stale_controllers(&self) {
-		self.controllers.write().retain(|_, c| !c.is_closed());
+		self.external_controllers.write().retain(|_, c| !c.is_closed());
 	}
 }
 
@@ -608,38 +772,60 @@ mod tests {
 	use crate::common::tests::TestApi;
 	use futures::{stream, StreamExt};
 	use sp_core::H256;
+	use tokio::{select, task::JoinHandle};
+	use tracing::debug;
 
 	type MultiViewListener = super::MultiViewListener<TestApi>;
 
+	fn create_multi_view_listener(
+	) -> (MultiViewListener, tokio::sync::oneshot::Sender<()>, JoinHandle<()>) {
+		let (listener, listener_task) = MultiViewListener::new_with_worker();
+
+		let (tx, rx) = tokio::sync::oneshot::channel();
+
+		let listener_handle = tokio::spawn(async move {
+			select! {
+				_ = listener_task => {},
+				_ = rx => { return; }
+			}
+		});
+
+		(listener, tx, listener_handle)
+	}
+
 	#[tokio::test]
 	async fn test01() {
 		sp_tracing::try_init_simple();
-		let listener = MultiViewListener::new();
+		let (listener, terminate_listener, listener_task) = create_multi_view_listener();
 
 		let block_hash = H256::repeat_byte(0x01);
+		let tx_hash = H256::repeat_byte(0x0a);
 		let events = vec![
 			TransactionStatus::Ready,
 			TransactionStatus::InBlock((block_hash, 0)),
 			TransactionStatus::Finalized((block_hash, 0)),
 		];
 
-		let tx_hash = H256::repeat_byte(0x0a);
 		let external_watcher = listener.create_external_watcher_for_tx(tx_hash).unwrap();
 		let handle = tokio::spawn(async move { external_watcher.collect::<Vec<_>>().await });
 
-		let view_stream = futures::stream::iter(events.clone());
+		let view_stream =
+			futures::stream::iter(std::iter::repeat(tx_hash).zip(events.clone().into_iter()));
 
-		listener.add_view_watcher_for_tx(tx_hash, block_hash, view_stream.boxed());
+		listener.add_view_aggregated_stream(block_hash, view_stream.boxed());
 
 		let out = handle.await.unwrap();
 		assert_eq!(out, events);
 		debug!("out: {:#?}", out);
+
+		let _ = terminate_listener.send(());
+		let _ = listener_task.await.unwrap();
 	}
 
 	#[tokio::test]
 	async fn test02() {
 		sp_tracing::try_init_simple();
-		let listener = MultiViewListener::new();
+		let (listener, terminate_listener, listener_task) = create_multi_view_listener();
 
 		let block_hash0 = H256::repeat_byte(0x01);
 		let events0 = vec![
@@ -658,13 +844,15 @@ mod tests {
 		let tx_hash = H256::repeat_byte(0x0a);
 		let external_watcher = listener.create_external_watcher_for_tx(tx_hash).unwrap();
 
-		let view_stream0 = futures::stream::iter(events0.clone());
-		let view_stream1 = futures::stream::iter(events1.clone());
+		let view_stream0 =
+			futures::stream::iter(std::iter::repeat(tx_hash).zip(events0.clone().into_iter()));
+		let view_stream1 =
+			futures::stream::iter(std::iter::repeat(tx_hash).zip(events1.clone().into_iter()));
 
 		let handle = tokio::spawn(async move { external_watcher.collect::<Vec<_>>().await });
 
-		listener.add_view_watcher_for_tx(tx_hash, block_hash0, view_stream0.boxed());
-		listener.add_view_watcher_for_tx(tx_hash, block_hash1, view_stream1.boxed());
+		listener.add_view_aggregated_stream(block_hash0, view_stream0.boxed());
+		listener.add_view_aggregated_stream(block_hash1, view_stream1.boxed());
 
 		let out = handle.await.unwrap();
 
@@ -678,12 +866,15 @@ mod tests {
 		]
 		.contains(v)));
 		assert_eq!(out.len(), 5);
+
+		let _ = terminate_listener.send(());
+		let _ = listener_task.await.unwrap();
 	}
 
 	#[tokio::test]
 	async fn test03() {
 		sp_tracing::try_init_simple();
-		let listener = MultiViewListener::new();
+		let (listener, terminate_listener, listener_task) = create_multi_view_listener();
 
 		let block_hash0 = H256::repeat_byte(0x01);
 		let events0 = vec![
@@ -699,13 +890,18 @@ mod tests {
 		let external_watcher = listener.create_external_watcher_for_tx(tx_hash).unwrap();
 		let handle = tokio::spawn(async move { external_watcher.collect::<Vec<_>>().await });
 
-		let view_stream0 = futures::stream::iter(events0.clone());
-		let view_stream1 = futures::stream::iter(events1.clone());
+		let view_stream0 =
+			futures::stream::iter(std::iter::repeat(tx_hash).zip(events0.clone().into_iter()));
+		let view_stream1 =
+			futures::stream::iter(std::iter::repeat(tx_hash).zip(events1.clone().into_iter()));
 
-		listener.add_view_watcher_for_tx(tx_hash, block_hash0, view_stream0.boxed());
-		listener.add_view_watcher_for_tx(tx_hash, block_hash1, view_stream1.boxed());
+		listener.add_view_aggregated_stream(block_hash0, view_stream0.boxed());
+		listener.add_view_aggregated_stream(block_hash1, view_stream1.boxed());
 
-		listener.invalidate_transactions(&[tx_hash]);
+		listener.remove_view(block_hash0);
+		listener.remove_view(block_hash1);
+
+		listener.transactions_invalidated(&[tx_hash]);
 
 		let out = handle.await.unwrap();
 		debug!("out: {:#?}", out);
@@ -717,12 +913,15 @@ mod tests {
 		]
 		.contains(v)));
 		assert_eq!(out.len(), 4);
-	}
 
+		let _ = terminate_listener.send(());
+		let _ = listener_task.await.unwrap();
+	}
+	//
 	#[tokio::test]
 	async fn test032() {
 		sp_tracing::try_init_simple();
-		let listener = MultiViewListener::new();
+		let (listener, terminate_listener, listener_task) = create_multi_view_listener();
 
 		let block_hash0 = H256::repeat_byte(0x01);
 		let events0_tx0 = vec![TransactionStatus::Future];
@@ -745,19 +944,26 @@ mod tests {
 		let handle0 = tokio::spawn(async move { external_watcher_tx0.collect::<Vec<_>>().await });
 		let handle1 = tokio::spawn(async move { external_watcher_tx1.collect::<Vec<_>>().await });
 
-		let view0_tx0_stream = futures::stream::iter(events0_tx0.clone());
-		let view0_tx1_stream = futures::stream::iter(events0_tx1.clone());
+		let view0_tx0_stream =
+			futures::stream::iter(std::iter::repeat(tx0_hash).zip(events0_tx0.clone()));
+		let view0_tx1_stream =
+			futures::stream::iter(std::iter::repeat(tx1_hash).zip(events0_tx1.clone()));
+
+		let view1_tx0_stream =
+			futures::stream::iter(std::iter::repeat(tx0_hash).zip(events1_tx0.clone()));
+		let view1_tx1_stream =
+			futures::stream::iter(std::iter::repeat(tx1_hash).zip(events1_tx1.clone()));
 
-		let view1_tx0_stream = futures::stream::iter(events1_tx0.clone());
-		let view1_tx1_stream = futures::stream::iter(events1_tx1.clone());
+		listener.add_view_aggregated_stream(block_hash0, view0_tx0_stream.boxed());
+		listener.add_view_aggregated_stream(block_hash1, view1_tx0_stream.boxed());
+		listener.add_view_aggregated_stream(block_hash0, view0_tx1_stream.boxed());
+		listener.add_view_aggregated_stream(block_hash1, view1_tx1_stream.boxed());
 
-		listener.add_view_watcher_for_tx(tx0_hash, block_hash0, view0_tx0_stream.boxed());
-		listener.add_view_watcher_for_tx(tx0_hash, block_hash1, view1_tx0_stream.boxed());
-		listener.add_view_watcher_for_tx(tx1_hash, block_hash0, view0_tx1_stream.boxed());
-		listener.add_view_watcher_for_tx(tx1_hash, block_hash1, view1_tx1_stream.boxed());
+		listener.remove_view(block_hash0);
+		listener.remove_view(block_hash1);
 
-		listener.invalidate_transactions(&[tx0_hash]);
-		listener.invalidate_transactions(&[tx1_hash]);
+		listener.transactions_invalidated(&[tx0_hash]);
+		listener.transactions_invalidated(&[tx1_hash]);
 
 		let out_tx0 = handle0.await.unwrap();
 		let out_tx1 = handle1.await.unwrap();
@@ -780,12 +986,15 @@ mod tests {
 		.contains(v)));
 		assert_eq!(out_tx0.len(), 4);
 		assert_eq!(out_tx1.len(), 3);
+
+		let _ = terminate_listener.send(());
+		let _ = listener_task.await.unwrap();
 	}
 
 	#[tokio::test]
 	async fn test04() {
 		sp_tracing::try_init_simple();
-		let listener = MultiViewListener::new();
+		let (listener, terminate_listener, listener_task) = create_multi_view_listener();
 
 		let block_hash0 = H256::repeat_byte(0x01);
 		let events0 = vec![
@@ -801,18 +1010,20 @@ mod tests {
 		let external_watcher = listener.create_external_watcher_for_tx(tx_hash).unwrap();
 
 		//views will keep transaction valid, invalidation shall not happen
-		let view_stream0 = futures::stream::iter(events0.clone()).chain(stream::pending().boxed());
-		let view_stream1 = futures::stream::iter(events1.clone()).chain(stream::pending().boxed());
+		let view_stream0 = futures::stream::iter(std::iter::repeat(tx_hash).zip(events0.clone()))
+			.chain(stream::pending().boxed());
+		let view_stream1 = futures::stream::iter(std::iter::repeat(tx_hash).zip(events1.clone()))
+			.chain(stream::pending().boxed());
 
 		let handle = tokio::spawn(async move {
 			// views are still there, we need to fetch 3 events
 			external_watcher.take(3).collect::<Vec<_>>().await
 		});
 
-		listener.add_view_watcher_for_tx(tx_hash, block_hash0, view_stream0.boxed());
-		listener.add_view_watcher_for_tx(tx_hash, block_hash1, view_stream1.boxed());
+		listener.add_view_aggregated_stream(block_hash0, view_stream0.boxed());
+		listener.add_view_aggregated_stream(block_hash1, view_stream1.boxed());
 
-		listener.invalidate_transactions(&[tx_hash]);
+		listener.transactions_invalidated(&[tx_hash]);
 
 		let out = handle.await.unwrap();
 		debug!("out: {:#?}", out);
@@ -825,12 +1036,14 @@ mod tests {
 		]
 		.contains(v)));
 		assert_eq!(out.len(), 3);
+		let _ = terminate_listener.send(());
+		let _ = listener_task.await.unwrap();
 	}
 
 	#[tokio::test]
 	async fn test05() {
 		sp_tracing::try_init_simple();
-		let listener = MultiViewListener::new();
+		let (listener, terminate_listener, listener_task) = create_multi_view_listener();
 
 		let block_hash0 = H256::repeat_byte(0x01);
 		let events0 = vec![TransactionStatus::Invalid];
@@ -839,18 +1052,24 @@ mod tests {
 		let external_watcher = listener.create_external_watcher_for_tx(tx_hash).unwrap();
 		let handle = tokio::spawn(async move { external_watcher.collect::<Vec<_>>().await });
 
-		let view_stream0 = futures::stream::iter(events0.clone()).chain(stream::pending().boxed());
+		let view_stream0 = futures::stream::iter(std::iter::repeat(tx_hash).zip(events0.clone()))
+			.chain(stream::pending().boxed());
 
 		// Note: this generates actual Invalid event.
-		// Invalid event from View's stream is intentionally ignored.
-		listener.invalidate_transactions(&[tx_hash]);
+		// Invalid event from View's stream is intentionally ignored .
+		// we need to explicitely remove the view
+		listener.remove_view(block_hash0);
+		listener.transactions_invalidated(&[tx_hash]);
 
-		listener.add_view_watcher_for_tx(tx_hash, block_hash0, view_stream0.boxed());
+		listener.add_view_aggregated_stream(block_hash0, view_stream0.boxed());
 
 		let out = handle.await.unwrap();
 		debug!("out: {:#?}", out);
 
 		assert!(out.iter().all(|v| vec![TransactionStatus::Invalid].contains(v)));
 		assert_eq!(out.len(), 1);
+
+		let _ = terminate_listener.send(());
+		let _ = listener_task.await.unwrap();
 	}
 }
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs
index 440e77313d3e13a35885bd806526139f602d7ad8..e141016ccb28b39a1be816478c0f6da748d875ce 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs
@@ -77,10 +77,10 @@ where
 	Block: BlockT,
 	ChainApi: graph::ChainApi<Block = Block> + 'static,
 {
-	//todo: add listener for updating listeners with events [#5495]
 	/// Is the progress of transaction watched.
 	///
-	/// Was transaction sent with `submit_and_watch`.
+	/// Indicates if transaction was sent with `submit_and_watch`. Serves only stats/testing
+	/// purposes.
 	watched: bool,
 	/// Extrinsic actual body.
 	tx: ExtrinsicFor<ChainApi>,
@@ -93,14 +93,6 @@ where
 	/// Priority of transaction at some block. It is assumed it will not be changed often. None if
 	/// not known.
 	priority: RwLock<Option<TransactionPriority>>,
-	//todo: we need to add future / ready status at finalized block.
-	//If future transactions are stuck in tx_mem_pool (due to limits being hit), we need a means
-	// to replace them somehow with newly coming transactions.
-	// For sure priority is one of them, but some additional criteria maybe required.
-	//
-	// The other maybe simple solution for this could be just obeying 10% limit for future in
-	// tx_mem_pool. Oldest future transaction could be just dropped. *(Status at finalized would
-	// also be needed). Probably is_future_at_finalized:Option<bool> flag will be enought
 }
 
 impl<ChainApi, Block> TxInMemPool<ChainApi, Block>
@@ -215,7 +207,6 @@ where
 	/// A shared instance of the `MultiViewListener`.
 	///
 	/// Provides a side-channel allowing to send per-transaction state changes notification.
-	//todo: could be removed after removing watched field (and adding listener into tx) [#5495]
 	listener: Arc<MultiViewListener<ChainApi>>,
 
 	///  A map that stores the transactions currently in the memory pool.
@@ -277,7 +268,7 @@ where
 	}
 
 	/// Creates a new `TxMemPool` instance for testing purposes.
-	#[allow(dead_code)]
+	#[cfg(test)]
 	fn new_test(
 		api: Arc<ChainApi>,
 		max_transactions_count: usize,
@@ -285,7 +276,7 @@ where
 	) -> Self {
 		Self {
 			api,
-			listener: Arc::from(MultiViewListener::new()),
+			listener: Arc::from(MultiViewListener::new_with_worker().0),
 			transactions: Default::default(),
 			metrics: Default::default(),
 			max_transactions_count,
@@ -469,27 +460,11 @@ where
 		self.try_insert(hash, TxInMemPool::new_watched(source, xt.clone(), length))
 	}
 
-	/// Clones and returns a `HashMap` of references to all unwatched transactions in the memory
-	/// pool.
-	pub(super) fn clone_unwatched(
-		&self,
-	) -> HashMap<ExtrinsicHash<ChainApi>, Arc<TxInMemPool<ChainApi, Block>>> {
-		self.transactions
-			.read()
-			.iter()
-			.filter_map(|(hash, tx)| (!tx.is_watched()).then(|| (*hash, tx.clone())))
-			.collect::<HashMap<_, _>>()
-	}
-
-	/// Clones and returns a `HashMap` of references to all watched transactions in the memory pool.
-	pub(super) fn clone_watched(
+	/// Clones and returns a `HashMap` of references to all transactions in the memory pool.
+	pub(super) fn clone_transactions(
 		&self,
 	) -> HashMap<ExtrinsicHash<ChainApi>, Arc<TxInMemPool<ChainApi, Block>>> {
-		self.transactions
-			.read()
-			.iter()
-			.filter_map(|(hash, tx)| (tx.is_watched()).then(|| (*hash, tx.clone())))
-			.collect::<HashMap<_, _>>()
+		self.transactions.clone_map()
 	}
 
 	/// Removes a transaction with given hash from the memory pool.
@@ -611,7 +586,7 @@ where
 		invalid_hashes.iter().for_each(|i| {
 			transactions.remove(i);
 		});
-		self.listener.invalidate_transactions(&invalid_hashes);
+		self.listener.transactions_invalidated(&invalid_hashes);
 	}
 
 	/// Updates the priority of transaction stored in mempool using provided view_store submission
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs
index 6324997da67b3bd835b85e1ba168b68418c67bff..555444956122b721d220b3754fa67d2308381e89 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs
@@ -27,8 +27,8 @@ use super::metrics::MetricsLink as PrometheusMetrics;
 use crate::{
 	common::tracing_log_xt::log_xt_trace,
 	graph::{
-		self, base_pool::TimedTransactionSource, watcher::Watcher, ExtrinsicFor, ExtrinsicHash,
-		IsValidator, ValidatedPoolSubmitOutcome, ValidatedTransaction, ValidatedTransactionFor,
+		self, base_pool::TimedTransactionSource, ExtrinsicFor, ExtrinsicHash, IsValidator,
+		ValidatedPoolSubmitOutcome, ValidatedTransaction, ValidatedTransactionFor,
 	},
 	LOG_TARGET,
 };
@@ -155,6 +155,18 @@ where
 		}
 	}
 
+	/// Imports single unvalidated extrinsic into the view.
+	pub(super) async fn submit_one(
+		&self,
+		source: TimedTransactionSource,
+		xt: ExtrinsicFor<ChainApi>,
+	) -> Result<ValidatedPoolSubmitOutcome<ChainApi>, ChainApi::Error> {
+		self.submit_many(std::iter::once((source, xt)))
+			.await
+			.pop()
+			.expect("There is exactly one result, qed.")
+	}
+
 	/// Imports many unvalidated extrinsics into the view.
 	pub(super) async fn submit_many(
 		&self,
@@ -162,28 +174,17 @@ where
 	) -> Vec<Result<ValidatedPoolSubmitOutcome<ChainApi>, ChainApi::Error>> {
 		if tracing::enabled!(target: LOG_TARGET, tracing::Level::TRACE) {
 			let xts = xts.into_iter().collect::<Vec<_>>();
-			log_xt_trace!(target: LOG_TARGET, xts.iter().map(|(_,xt)| self.pool.validated_pool().api().hash_and_length(xt).0), "view::submit_many at:{}", self.at.hash);
+			log_xt_trace!(
+				target: LOG_TARGET,
+				xts.iter().map(|(_,xt)| self.pool.validated_pool().api().hash_and_length(xt).0),
+				"view::submit_many at:{}",
+				self.at.hash);
 			self.pool.submit_at(&self.at, xts).await
 		} else {
 			self.pool.submit_at(&self.at, xts).await
 		}
 	}
 
-	/// Import a single extrinsic and starts to watch its progress in the view.
-	pub(super) async fn submit_and_watch(
-		&self,
-		source: TimedTransactionSource,
-		xt: ExtrinsicFor<ChainApi>,
-	) -> Result<ValidatedPoolSubmitOutcome<ChainApi>, ChainApi::Error> {
-		trace!(
-			target: LOG_TARGET,
-			tx_hash = ?self.pool.validated_pool().api().hash_and_length(&xt).0,
-			view_at_hash = ?self.at.hash,
-			"view::submit_and_watch"
-		);
-		self.pool.submit_and_watch(&self.at, source, xt).await
-	}
-
 	/// Synchronously imports single unvalidated extrinsics into the view.
 	pub(super) fn submit_local(
 		&self,
@@ -237,18 +238,6 @@ where
 		self.pool.validated_pool().status()
 	}
 
-	/// Creates a watcher for given transaction.
-	///
-	/// Intended to be called for the transaction that already exists in the pool
-	pub(super) fn create_watcher(
-		&self,
-		tx_hash: ExtrinsicHash<ChainApi>,
-	) -> Watcher<ExtrinsicHash<ChainApi>, ExtrinsicHash<ChainApi>> {
-		//todo(minor): some assert could be added here - to make sure that transaction actually
-		// exists in the view.
-		self.pool.validated_pool().create_watcher(tx_hash)
-	}
-
 	/// Revalidates some part of transaction from the internal pool.
 	///
 	/// Intended to be called from the revalidation worker. The revalidation process can be
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs
index c4209a7d7f4118d78651fa821fc96ff068dbaa78..e534decf9b1ada3d692a1ed3392475b4983b56ae 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs
@@ -31,7 +31,6 @@ use crate::{
 	},
 	ReadyIteratorFor, LOG_TARGET,
 };
-use futures::prelude::*;
 use itertools::Itertools;
 use parking_lot::RwLock;
 use sc_transaction_pool_api::{error::Error as PoolError, PoolStatus};
@@ -55,8 +54,6 @@ where
 	xt: ExtrinsicFor<ChainApi>,
 	/// Source of the transaction.
 	source: TimedTransactionSource,
-	/// Inidicates if transaction is watched.
-	watched: bool,
 }
 
 /// Helper type representing the callback allowing to trigger per-transaction events on
@@ -108,14 +105,10 @@ where
 	ChainApi: graph::ChainApi,
 {
 	/// Creates new unprocessed instance of pending transaction submission.
-	fn new_submission_action(
-		xt: ExtrinsicFor<ChainApi>,
-		source: TimedTransactionSource,
-		watched: bool,
-	) -> Self {
+	fn new_submission_action(xt: ExtrinsicFor<ChainApi>, source: TimedTransactionSource) -> Self {
 		Self {
 			processed: false,
-			action: PreInsertAction::SubmitTx(PendingTxSubmission { xt, source, watched }),
+			action: PreInsertAction::SubmitTx(PendingTxSubmission { xt, source }),
 		}
 	}
 
@@ -290,7 +283,7 @@ where
 		let Some(external_watcher) = self.listener.create_external_watcher_for_tx(tx_hash) else {
 			return Err(PoolError::AlreadyImported(Box::new(tx_hash)).into())
 		};
-		let submit_and_watch_futures = {
+		let submit_futures = {
 			let active_views = self.active_views.read();
 			active_views
 				.iter()
@@ -298,23 +291,11 @@ where
 					let view = view.clone();
 					let xt = xt.clone();
 					let source = source.clone();
-					async move {
-						match view.submit_and_watch(source, xt).await {
-							Ok(mut result) => {
-								self.listener.add_view_watcher_for_tx(
-									tx_hash,
-									view.at.hash,
-									result.expect_watcher().into_stream().boxed(),
-								);
-								Ok(result)
-							},
-							Err(e) => Err(e),
-						}
-					}
+					async move { view.submit_one(source, xt).await }
 				})
 				.collect::<Vec<_>>()
 		};
-		let result = futures::future::join_all(submit_and_watch_futures)
+		let result = futures::future::join_all(submit_futures)
 			.await
 			.into_iter()
 			.find_or_first(Result::is_ok);
@@ -462,7 +443,7 @@ where
 			extrinsics
 				.iter()
 				.enumerate()
-				.for_each(|(i, tx_hash)| self.listener.finalize_transaction(*tx_hash, *block, i));
+				.for_each(|(i, tx_hash)| self.listener.transaction_finalized(*tx_hash, *block, i));
 
 			finalized_transactions.extend(extrinsics);
 		}
@@ -705,14 +686,9 @@ where
 		source: TimedTransactionSource,
 		xt: ExtrinsicFor<ChainApi>,
 		replaced: ExtrinsicHash<ChainApi>,
-		watched: bool,
 	) {
 		if let Entry::Vacant(entry) = self.pending_txs_tasks.write().entry(replaced) {
-			entry.insert(PendingPreInsertTask::new_submission_action(
-				xt.clone(),
-				source.clone(),
-				watched,
-			));
+			entry.insert(PendingPreInsertTask::new_submission_action(xt.clone(), source.clone()));
 		} else {
 			return
 		};
@@ -722,11 +698,9 @@ where
 			target: LOG_TARGET,
 			?replaced,
 			?tx_hash,
-			watched,
 			"replace_transaction"
 		);
-
-		self.replace_transaction_in_views(source, xt, tx_hash, replaced, watched).await;
+		self.replace_transaction_in_views(source, xt, tx_hash, replaced).await;
 
 		if let Some(replacement) = self.pending_txs_tasks.write().get_mut(&replaced) {
 			replacement.mark_processed();
@@ -747,7 +721,6 @@ where
 						submission.source.clone(),
 						submission.xt.clone(),
 						xt_hash,
-						submission.watched,
 					));
 				},
 				PreInsertAction::RemoveSubtree(ref removal) => {
@@ -768,37 +741,15 @@ where
 		source: TimedTransactionSource,
 		xt: ExtrinsicFor<ChainApi>,
 		tx_hash: ExtrinsicHash<ChainApi>,
-		watched: bool,
 	) {
-		if watched {
-			match view.submit_and_watch(source, xt).await {
-				Ok(mut result) => {
-					self.listener.add_view_watcher_for_tx(
-						tx_hash,
-						view.at.hash,
-						result.expect_watcher().into_stream().boxed(),
-					);
-				},
-				Err(error) => {
-					trace!(
-						target: LOG_TARGET,
-						?tx_hash,
-						at_hash = ?view.at.hash,
-						%error,
-						"replace_transaction: submit_and_watch failed"
-					);
-				},
-			}
-		} else {
-			if let Some(Err(error)) = view.submit_many(std::iter::once((source, xt))).await.pop() {
-				trace!(
-					target: LOG_TARGET,
-					?tx_hash,
-					at_hash = ?view.at.hash,
-					%error,
-					"replace_transaction: submit failed"
-				);
-			}
+		if let Err(error) = view.submit_one(source, xt).await {
+			trace!(
+				target: LOG_TARGET,
+				?tx_hash,
+				at_hash = ?view.at.hash,
+				%error,
+				"replace_transaction: submit failed"
+			);
 		}
 	}
 
@@ -812,17 +763,7 @@ where
 		xt: ExtrinsicFor<ChainApi>,
 		tx_hash: ExtrinsicHash<ChainApi>,
 		replaced: ExtrinsicHash<ChainApi>,
-		watched: bool,
 	) {
-		if watched && !self.listener.contains_tx(&tx_hash) {
-			trace!(
-				target: LOG_TARGET,
-				?tx_hash,
-				"error: replace_transaction_in_views: no listener for watched transaction"
-			);
-			return;
-		}
-
 		let submit_futures = {
 			let active_views = self.active_views.read();
 			let inactive_views = self.inactive_views.read();
@@ -836,7 +777,6 @@ where
 						source.clone(),
 						xt.clone(),
 						tx_hash,
-						watched,
 					)
 				})
 				.collect::<Vec<_>>()
diff --git a/substrate/client/transaction-pool/src/graph/listener.rs b/substrate/client/transaction-pool/src/graph/listener.rs
index 7b09ee4c640958736d8acfddd5baab572e1bf530..0e70334ea0e2485967e35e533dc7281a3cef9421 100644
--- a/substrate/client/transaction-pool/src/graph/listener.rs
+++ b/substrate/client/transaction-pool/src/graph/listener.rs
@@ -29,10 +29,13 @@ use super::{watcher, BlockHash, ChainApi, ExtrinsicHash};
 
 static LOG_TARGET: &str = "txpool::watcher";
 
-/// Single event used in dropped by limits stream. It is one of Ready/Future/Dropped.
-pub type DroppedByLimitsEvent<H, BH> = (H, TransactionStatus<H, BH>);
-/// Stream of events used to determine if a transaction was dropped.
-pub type DroppedByLimitsStream<H, BH> = TracingUnboundedReceiver<DroppedByLimitsEvent<H, BH>>;
+/// Single event used in aggregated stream. Tuple containing hash of transactions and its status.
+pub type TransactionStatusEvent<H, BH> = (H, TransactionStatus<H, BH>);
+/// Stream of events providing statuses of all the transactions within the pool.
+pub type AggregatedStream<H, BH> = TracingUnboundedReceiver<TransactionStatusEvent<H, BH>>;
+
+/// Warning threshold for (unbounded) channel used in aggregated stream.
+const AGGREGATED_STREAM_WARN_THRESHOLD: usize = 100_000;
 
 /// Extrinsic pool default listener.
 pub struct Listener<H: hash::Hash + Eq, C: ChainApi> {
@@ -40,10 +43,15 @@ pub struct Listener<H: hash::Hash + Eq, C: ChainApi> {
 	watchers: HashMap<H, watcher::Sender<H, BlockHash<C>>>,
 	finality_watchers: LinkedHashMap<ExtrinsicHash<C>, Vec<H>>,
 
-	/// The sink used to notify dropped-by-enforcing-limits transactions. Also ready and future
-	/// statuses are reported via this channel to allow consumer of the stream tracking actual
-	/// drops.
-	dropped_by_limits_sink: Option<TracingUnboundedSender<DroppedByLimitsEvent<H, BlockHash<C>>>>,
+	/// The sink used to notify dropped by enforcing limits or by being usurped transactions.
+	///
+	/// Note: Ready and future statuses are alse communicated through this channel, enabling the
+	/// stream consumer to track views that reference the transaction.
+	dropped_stream_sink: Option<TracingUnboundedSender<TransactionStatusEvent<H, BlockHash<C>>>>,
+
+	/// The sink of the single, merged stream providing updates for all the transactions in the
+	/// associated pool.
+	aggregated_stream_sink: Option<TracingUnboundedSender<TransactionStatusEvent<H, BlockHash<C>>>>,
 }
 
 /// Maximum number of blocks awaiting finality at any time.
@@ -54,7 +62,8 @@ impl<H: hash::Hash + Eq + Debug, C: ChainApi> Default for Listener<H, C> {
 		Self {
 			watchers: Default::default(),
 			finality_watchers: Default::default(),
-			dropped_by_limits_sink: None,
+			dropped_stream_sink: None,
+			aggregated_stream_sink: None,
 		}
 	}
 }
@@ -84,21 +93,60 @@ impl<H: hash::Hash + traits::Member + Serialize + Clone, C: ChainApi> Listener<H
 		sender.new_watcher(hash)
 	}
 
-	/// Creates a new single stream for entire pool.
+	/// Creates a new single stream intended to watch dropped transactions only.
 	///
-	/// The stream can be used to subscribe to life-cycle events of all extrinsics in the pool.
-	pub fn create_dropped_by_limits_stream(&mut self) -> DroppedByLimitsStream<H, BlockHash<C>> {
-		let (sender, single_stream) = tracing_unbounded("mpsc_txpool_watcher", 100_000);
-		self.dropped_by_limits_sink = Some(sender);
+	/// The stream can be used to subscribe to events related to dropping of all extrinsics in the
+	/// pool.
+	pub fn create_dropped_by_limits_stream(&mut self) -> AggregatedStream<H, BlockHash<C>> {
+		let (sender, single_stream) =
+			tracing_unbounded("mpsc_txpool_watcher", AGGREGATED_STREAM_WARN_THRESHOLD);
+		self.dropped_stream_sink = Some(sender);
 		single_stream
 	}
 
-	/// Notify the listeners about extrinsic broadcast.
+	/// Creates a new single merged stream for all extrinsics in the associated pool.
+	///
+	/// The stream can be used to subscribe to life-cycle events of all extrinsics in the pool. For
+	/// some implementations (e.g. fork-aware pool) this approach may be more efficient than using
+	/// individual streams for every transaction.
+	///
+	/// Note: some of the events which are currently ignored on the other side of this channel
+	/// (external watcher) are not sent.
+	pub fn create_aggregated_stream(&mut self) -> AggregatedStream<H, BlockHash<C>> {
+		let (sender, aggregated_stream) =
+			tracing_unbounded("mpsc_txpool_aggregated_stream", AGGREGATED_STREAM_WARN_THRESHOLD);
+		self.aggregated_stream_sink = Some(sender);
+		aggregated_stream
+	}
+
+	/// Notify the listeners about the extrinsic broadcast.
 	pub fn broadcasted(&mut self, hash: &H, peers: Vec<String>) {
 		trace!(target: LOG_TARGET, "[{:?}] Broadcasted", hash);
 		self.fire(hash, |watcher| watcher.broadcast(peers));
 	}
 
+	/// Sends given event to the `dropped_stream_sink`.
+	fn send_to_dropped_stream_sink(&mut self, tx: &H, status: TransactionStatus<H, BlockHash<C>>) {
+		if let Some(ref sink) = self.dropped_stream_sink {
+			if let Err(e) = sink.unbounded_send((tx.clone(), status.clone())) {
+				trace!(target: LOG_TARGET, "[{:?}] dropped_sink: {:?} send message failed: {:?}", tx, status, e);
+			}
+		}
+	}
+
+	/// Sends given event to the `aggregated_stream_sink`.
+	fn send_to_aggregated_stream_sink(
+		&mut self,
+		tx: &H,
+		status: TransactionStatus<H, BlockHash<C>>,
+	) {
+		if let Some(ref sink) = self.aggregated_stream_sink {
+			if let Err(e) = sink.unbounded_send((tx.clone(), status.clone())) {
+				trace!(target: LOG_TARGET, "[{:?}] aggregated_stream {:?} send message failed: {:?}", tx, status, e);
+			}
+		}
+	}
+
 	/// New transaction was added to the ready pool or promoted from the future pool.
 	pub fn ready(&mut self, tx: &H, old: Option<&H>) {
 		trace!(target: LOG_TARGET, "[{:?}] Ready (replaced with {:?})", tx, old);
@@ -107,22 +155,17 @@ impl<H: hash::Hash + traits::Member + Serialize + Clone, C: ChainApi> Listener<H
 			self.fire(old, |watcher| watcher.usurped(tx.clone()));
 		}
 
-		if let Some(ref sink) = self.dropped_by_limits_sink {
-			if let Err(e) = sink.unbounded_send((tx.clone(), TransactionStatus::Ready)) {
-				trace!(target: LOG_TARGET, "[{:?}] dropped_sink/ready: send message failed: {:?}", tx, e);
-			}
-		}
+		self.send_to_dropped_stream_sink(tx, TransactionStatus::Ready);
+		self.send_to_aggregated_stream_sink(tx, TransactionStatus::Ready);
 	}
 
 	/// New transaction was added to the future pool.
 	pub fn future(&mut self, tx: &H) {
 		trace!(target: LOG_TARGET, "[{:?}] Future", tx);
 		self.fire(tx, |watcher| watcher.future());
-		if let Some(ref sink) = self.dropped_by_limits_sink {
-			if let Err(e) = sink.unbounded_send((tx.clone(), TransactionStatus::Future)) {
-				trace!(target: LOG_TARGET, "[{:?}] dropped_sink: send message failed: {:?}", tx, e);
-			}
-		}
+
+		self.send_to_dropped_stream_sink(tx, TransactionStatus::Future);
+		self.send_to_aggregated_stream_sink(tx, TransactionStatus::Future);
 	}
 
 	/// Transaction was dropped from the pool because of enforcing the limit.
@@ -130,11 +173,7 @@ impl<H: hash::Hash + traits::Member + Serialize + Clone, C: ChainApi> Listener<H
 		trace!(target: LOG_TARGET, "[{:?}] Dropped (limits enforced)", tx);
 		self.fire(tx, |watcher| watcher.limit_enforced());
 
-		if let Some(ref sink) = self.dropped_by_limits_sink {
-			if let Err(e) = sink.unbounded_send((tx.clone(), TransactionStatus::Dropped)) {
-				trace!(target: LOG_TARGET, "[{:?}] dropped_sink: send message failed: {:?}", tx, e);
-			}
-		}
+		self.send_to_dropped_stream_sink(tx, TransactionStatus::Dropped);
 	}
 
 	/// Transaction was replaced with other extrinsic.
@@ -142,13 +181,7 @@ impl<H: hash::Hash + traits::Member + Serialize + Clone, C: ChainApi> Listener<H
 		trace!(target: LOG_TARGET, "[{:?}] Dropped (replaced with {:?})", tx, by);
 		self.fire(tx, |watcher| watcher.usurped(by.clone()));
 
-		if let Some(ref sink) = self.dropped_by_limits_sink {
-			if let Err(e) =
-				sink.unbounded_send((tx.clone(), TransactionStatus::Usurped(by.clone())))
-			{
-				trace!(target: LOG_TARGET, "[{:?}] dropped_sink: send message failed: {:?}", tx, e);
-			}
-		}
+		self.send_to_dropped_stream_sink(tx, TransactionStatus::Usurped(by.clone()));
 	}
 
 	/// Transaction was dropped from the pool because of the failure during the resubmission of
@@ -174,11 +207,17 @@ impl<H: hash::Hash + traits::Member + Serialize + Clone, C: ChainApi> Listener<H
 		let tx_index = txs.len() - 1;
 
 		self.fire(tx, |watcher| watcher.in_block(block_hash, tx_index));
+		self.send_to_aggregated_stream_sink(tx, TransactionStatus::InBlock((block_hash, tx_index)));
 
 		while self.finality_watchers.len() > MAX_FINALITY_WATCHERS {
 			if let Some((hash, txs)) = self.finality_watchers.pop_front() {
 				for tx in txs {
 					self.fire(&tx, |watcher| watcher.finality_timeout(hash));
+					//todo: do we need this? [related issue: #5482]
+					self.send_to_aggregated_stream_sink(
+						&tx,
+						TransactionStatus::FinalityTimeout(hash),
+					);
 				}
 			}
 		}
@@ -188,7 +227,8 @@ impl<H: hash::Hash + traits::Member + Serialize + Clone, C: ChainApi> Listener<H
 	pub fn retracted(&mut self, block_hash: BlockHash<C>) {
 		if let Some(hashes) = self.finality_watchers.remove(&block_hash) {
 			for hash in hashes {
-				self.fire(&hash, |watcher| watcher.retracted(block_hash))
+				self.fire(&hash, |watcher| watcher.retracted(block_hash));
+				// note: [#5479], we do not send to aggregated stream.
 			}
 		}
 	}
diff --git a/substrate/client/transaction-pool/src/graph/mod.rs b/substrate/client/transaction-pool/src/graph/mod.rs
index 2114577f4dee74bf51e25a1375b8650cf8f4acf7..c3161799785a97f668c93c8773257e57d3dfe91d 100644
--- a/substrate/client/transaction-pool/src/graph/mod.rs
+++ b/substrate/client/transaction-pool/src/graph/mod.rs
@@ -46,7 +46,9 @@ pub use validated_pool::{
 };
 
 pub(crate) use self::pool::CheckBannedBeforeVerify;
-pub(crate) use listener::DroppedByLimitsEvent;
+pub(crate) use listener::TransactionStatusEvent;
 
+#[cfg(doc)]
+pub(crate) use listener::AggregatedStream;
 #[cfg(doc)]
 pub(crate) use validated_pool::ValidatedPool;
diff --git a/substrate/client/transaction-pool/src/graph/tracked_map.rs b/substrate/client/transaction-pool/src/graph/tracked_map.rs
index fe15c6eca308084f18261b8d7ebc003509d3d735..ca1ee035cf37e1e5b8834ed8df49fcd818d37b51 100644
--- a/substrate/client/transaction-pool/src/graph/tracked_map.rs
+++ b/substrate/client/transaction-pool/src/graph/tracked_map.rs
@@ -120,11 +120,6 @@ where
 	pub fn len(&self) -> usize {
 		self.inner_guard.len()
 	}
-
-	/// Returns an iterator over all key-value pairs.
-	pub fn iter(&self) -> Iter<'_, K, V> {
-		self.inner_guard.iter()
-	}
 }
 
 pub struct TrackedMapWriteAccess<'a, K, V> {
diff --git a/substrate/client/transaction-pool/src/graph/validated_pool.rs b/substrate/client/transaction-pool/src/graph/validated_pool.rs
index bbfcb9b40acab9a9bb58607fc0b7b96130e8bab1..9631a27ead93416b4fbc52aa35d2f0b2c036bfad 100644
--- a/substrate/client/transaction-pool/src/graph/validated_pool.rs
+++ b/substrate/client/transaction-pool/src/graph/validated_pool.rs
@@ -747,12 +747,20 @@ impl<B: ChainApi> ValidatedPool<B> {
 		self.listener.write().retracted(block_hash)
 	}
 
+	/// Refer to [`Listener::create_dropped_by_limits_stream`] for details.
 	pub fn create_dropped_by_limits_stream(
 		&self,
-	) -> super::listener::DroppedByLimitsStream<ExtrinsicHash<B>, BlockHash<B>> {
+	) -> super::listener::AggregatedStream<ExtrinsicHash<B>, BlockHash<B>> {
 		self.listener.write().create_dropped_by_limits_stream()
 	}
 
+	/// Refer to [`Listener::create_aggregated_stream`]
+	pub fn create_aggregated_stream(
+		&self,
+	) -> super::listener::AggregatedStream<ExtrinsicHash<B>, BlockHash<B>> {
+		self.listener.write().create_aggregated_stream()
+	}
+
 	/// Resends ready and future events for all the ready and future transactions that are already
 	/// in the pool.
 	///
diff --git a/substrate/client/transaction-pool/tests/fatp_common/mod.rs b/substrate/client/transaction-pool/tests/fatp_common/mod.rs
index 530c25caf88e7973b6c877f8370fc77fa1a203b2..20178fdc7c4e36837f414c5762c1796e3c09a9cd 100644
--- a/substrate/client/transaction-pool/tests/fatp_common/mod.rs
+++ b/substrate/client/transaction-pool/tests/fatp_common/mod.rs
@@ -203,8 +203,8 @@ macro_rules! assert_future_iterator {
 	($hash:expr, $pool:expr, [$( $xt:expr ),*]) => {{
 		let futures = $pool.futures_at($hash).unwrap();
 		let expected = vec![ $($pool.api().hash_and_length(&$xt).0),*];
-		log::debug!(target:LOG_TARGET, "expected: {:#?}", futures);
-		log::debug!(target:LOG_TARGET, "output: {:#?}", expected);
+		log::debug!(target:LOG_TARGET, "expected: {:#?}", expected);
+		log::debug!(target:LOG_TARGET, "output: {:#?}", futures);
 		assert_eq!(expected.len(), futures.len());
 		let hsf = futures.iter().map(|a| a.hash).collect::<std::collections::HashSet<_>>();
 		let hse = expected.into_iter().collect::<std::collections::HashSet<_>>();
diff --git a/substrate/client/transaction-pool/tests/fatp_limits.rs b/substrate/client/transaction-pool/tests/fatp_limits.rs
index fb02b21ebc2b0426def3f9a98227f9d179fbe952..50e75e1e28e776c8e14be4cc9ca627b986b46aa6 100644
--- a/substrate/client/transaction-pool/tests/fatp_limits.rs
+++ b/substrate/client/transaction-pool/tests/fatp_limits.rs
@@ -377,12 +377,11 @@ fn fatp_limits_watcher_view_can_drop_transcation() {
 	assert_eq!(xt0_status, vec![TransactionStatus::Ready, TransactionStatus::Dropped,]);
 
 	assert_ready_iterator!(header01.hash(), pool, [xt1, xt2]);
+	let xt3_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).unwrap();
 
 	let header02 = api.push_block_with_parent(header01.hash(), vec![], true);
 	block_on(pool.maintain(finalized_block_event(&pool, api.genesis_hash(), header02.hash())));
 
-	let xt3_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).unwrap();
-
 	let xt1_status = futures::executor::block_on_stream(xt1_watcher).take(2).collect::<Vec<_>>();
 	assert_eq!(xt1_status, vec![TransactionStatus::Ready, TransactionStatus::Dropped]);