From 515cb4042d097581ed6b4195e57b04494e385a17 Mon Sep 17 00:00:00 2001
From: Michal Kucharczyk <>
Date: Tue, 11 Feb 2025 21:24:36 +0100
Subject: [PATCH] View: added observer and streams + some renames

 .../src/fork_aware_txpool/  |   3 +-
 .../fork_aware_txpool/    |  58 +++---
 .../src/fork_aware_txpool/              |   2 +-
 .../fork_aware_txpool/  |   4 +-
 .../fork_aware_txpool/  |  10 +-
 .../src/fork_aware_txpool/             | 194 ++++++++++++++++--
 .../src/fork_aware_txpool/       |  21 +-
 7 files changed, 226 insertions(+), 66 deletions(-)

diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/ b/substrate/client/transaction-pool/src/fork_aware_txpool/
index be20a160896..17f73f9ea77 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/
@@ -74,7 +74,8 @@ pub enum DroppedReason<Hash> {
 /// Dropped-logic related event from the single view.
-pub type ViewStreamEvent<C> = crate::graph::TransactionStatusEvent<ExtrinsicHash<C>, BlockHash<C>>;
+pub type ViewStreamEvent<C> =
+	crate::fork_aware_txpool::view::TransactionStatusEvent<ExtrinsicHash<C>, BlockHash<C>>;
 /// Dropped-logic stream of events coming from the single view.
 type ViewStream<C> = Pin<Box<dyn futures::Stream<Item = ViewStreamEvent<C>> + Send>>;
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/ b/substrate/client/transaction-pool/src/fork_aware_txpool/
index ffe6c20d92b..ce5dfca49e9 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/
@@ -466,7 +466,7 @@ where
 		if let Ok((Some(best_tree_route), Some(best_view))) = best_result {
-			let tmp_view: View<ChainApi> =
+			let (tmp_view, _, _): (View<ChainApi>, _, _) =
 				View::new_from_other(&best_view, &HashAndNumber { hash: at, number: block_number });
 			let mut all_extrinsics = vec![];
@@ -1056,26 +1056,28 @@ where
-		let mut view = if let Some(origin_view) = origin_view {
-			let mut view = View::new_from_other(&origin_view, at);
-			if !tree_route.retracted().is_empty() {
-				view.pool.clear_recently_pruned();
-			}
-			view
-		} else {
-			debug!(
-				target: LOG_TARGET,
-				?at,
-				"creating non-cloned view"
-			);
-			View::new(
-				self.api.clone(),
-				at.clone(),
-				self.options.clone(),
-				self.metrics.clone(),
-				self.is_validator.clone(),
-			)
-		};
+		let (mut view, view_dropped_stream, view_aggregated_stream) =
+			if let Some(origin_view) = origin_view {
+				let (mut view, view_dropped_stream, view_aggragated_stream) =
+					View::new_from_other(&origin_view, at);
+				if !tree_route.retracted().is_empty() {
+					view.pool.clear_recently_pruned();
+				}
+				(view, view_dropped_stream, view_aggragated_stream)
+			} else {
+				debug!(
+					target: LOG_TARGET,
+					?at,
+					"creating non-cloned view"
+				);
+				View::new(
+					self.api.clone(),
+					at.clone(),
+					self.options.clone(),
+					self.metrics.clone(),
+					self.is_validator.clone(),
+				)
+			};
 		let start = Instant::now();
 		// 1. Capture all import notification from the very beginning, so first register all
@@ -1085,15 +1087,13 @@ where
-		self.view_store.dropped_stream_controller.add_view(
-			view.pool.validated_pool().create_dropped_by_limits_stream().boxed(),
-		);
+		self.view_store
+			.dropped_stream_controller
+			.add_view(, view_dropped_stream.boxed());
-		self.view_store.listener.add_view_aggregated_stream(
-			view.pool.validated_pool().create_aggregated_stream().boxed(),
-		);
+		self.view_store
+			.listener
+			.add_view_aggregated_stream(, view_aggregated_stream.boxed());
 		// sync the transactions statuses and referencing views in all the listeners with newly
 		// cloned view.
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/ b/substrate/client/transaction-pool/src/fork_aware_txpool/
index 2c4da0182a2..fce2d4ad6b2 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/
@@ -323,7 +323,7 @@
 //! [`MultiViewListener`]: crate::fork_aware_txpool::multi_view_listener::MultiViewListener
 //! [`Pool`]: crate::graph::Pool
 //! [`Watcher`]: crate::graph::watcher::Watcher
-//! [`AggregatedStream`]: crate::graph::AggregatedStream
+//! [`AggregatedStream`]: crate::fork_aware_txpool::view::AggregatedStream
 //! [`Options`]: crate::graph::Options
 //! [`vp::import_notification_stream`]: ../graph/validated_pool/struct.ValidatedPool.html#method.import_notification_stream
 //! [`vp::enforce_limits`]: ../graph/validated_pool/struct.ValidatedPool.html#method.enforce_limits
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/ b/substrate/client/transaction-pool/src/fork_aware_txpool/
index 107c2941ec1..d3d85aaf093 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/
@@ -22,8 +22,8 @@
 use crate::{
-	fork_aware_txpool::stream_map_util::next_event,
-	graph::{self, BlockHash, ExtrinsicHash, TransactionStatusEvent},
+	fork_aware_txpool::{stream_map_util::next_event, view::TransactionStatusEvent},
+	graph::{self, BlockHash, ExtrinsicHash},
 use futures::{Future, FutureExt, Stream, StreamExt};
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/ b/substrate/client/transaction-pool/src/fork_aware_txpool/
index 0025d3e9f2d..b7e7584640d 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/
@@ -205,13 +205,9 @@ mod tests {
 		let api = Arc::new(TestApi::default());
 		let block0 = api.expect_hash_and_number(0);
-		let view = Arc::new(View::new(
-			api.clone(),
-			block0,
-			Default::default(),
-			Default::default(),
-			false.into(),
-		));
+		let view = Arc::new(
+			View::new(api.clone(), block0, Default::default(), Default::default(), false.into()).0,
+		);
 		let queue = Arc::new(RevalidationQueue::new());
 		let uxt = uxt(Transfer {
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/ b/substrate/client/transaction-pool/src/fork_aware_txpool/
index 55544495612..15253e0d145 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/
@@ -27,13 +27,14 @@ use super::metrics::MetricsLink as PrometheusMetrics;
 use crate::{
-		self, base_pool::TimedTransactionSource, ExtrinsicFor, ExtrinsicHash, IsValidator,
-		ValidatedPoolSubmitOutcome, ValidatedTransaction, ValidatedTransactionFor,
+		self, base_pool::TimedTransactionSource, BlockHash, ExtrinsicFor, ExtrinsicHash,
+		IsValidator, ValidatedPoolSubmitOutcome, ValidatedTransaction, ValidatedTransactionFor,
 use parking_lot::Mutex;
-use sc_transaction_pool_api::{error::Error as TxPoolError, PoolStatus};
+use sc_transaction_pool_api::{error::Error as TxPoolError, PoolStatus, TransactionStatus};
+use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
 use sp_blockchain::HashAndNumber;
 use sp_runtime::{
 	generic::BlockId, traits::Block as BlockT, transaction_validity::TransactionValidityError,
@@ -108,13 +109,139 @@ impl<ChainApi: graph::ChainApi> FinishRevalidationWorkerChannels<ChainApi> {
+/// Single event used in aggregated stream. Tuple containing hash of transactions and its status.
+pub(super) type TransactionStatusEvent<H, BH> = (H, TransactionStatus<H, BH>);
+/// Warning threshold for (unbounded) channel used in aggregated view's streams.
+const VIEW_STREAM_WARN_THRESHOLD: usize = 100_000;
+/// Stream of events providing statuses of all the transactions within the pool.
+pub(super) type AggregatedStream<H, BH> = TracingUnboundedReceiver<TransactionStatusEvent<H, BH>>;
+/// Type alias for a stream of events intended to track dropped transactions.
+type DroppedMonitoringStream<H, BH> = TracingUnboundedReceiver<TransactionStatusEvent<H, BH>>;
+/// Notification handler for transactions updates triggered in `ValidatedPool`.
+/// `ViewPoolObserver` handles transaction status changes notifications coming from an instance of
+/// validated pool associated with the `View` and forwards them through specified channels
+/// into the View's streams.
+pub(super) struct ViewPoolObserver<ChainApi: graph::ChainApi> {
+	/// The sink used to notify dropped by enforcing limits or by being usurped transactions.
+	///
+	/// Note: Ready and future statuses are alse communicated through this channel, enabling the
+	/// stream consumer to track views that reference the transaction.
+	dropped_stream_sink: TracingUnboundedSender<
+		TransactionStatusEvent<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
+	>,
+	/// The sink of the single, merged stream providing updates for all the transactions in the
+	/// associated pool.
+	///
+	/// Note: some of the events which are currently ignored on the other side of this channel
+	/// (external watcher) are not relayed.
+	aggregated_stream_sink: TracingUnboundedSender<
+		TransactionStatusEvent<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
+	>,
+impl<C: graph::ChainApi> graph::EventHandler<C> for ViewPoolObserver<C> {
+	// note: skipped, notified by ForkAwareTxPool directly to multi view listener.
+	fn broadcasted(&self, _: ExtrinsicHash<C>, _: Vec<String>) {}
+	fn dropped(&self, _: ExtrinsicHash<C>) {}
+	fn invalid(&self, _: ExtrinsicHash<C>) {}
+	fn finalized(&self, _: ExtrinsicHash<C>, _: BlockHash<C>, _: usize) {}
+	fn retracted(&self, _: ExtrinsicHash<C>, _: BlockHash<C>) {
+		// note: [#5479], we do not send to aggregated stream.
+	}
+	fn ready(&self, tx: ExtrinsicHash<C>) {
+		let status = TransactionStatus::Ready;
+		self.send_to_dropped_stream_sink(tx, status.clone());
+		self.send_to_aggregated_stream_sink(tx, status);
+	}
+	fn future(&self, tx: ExtrinsicHash<C>) {
+		let status = TransactionStatus::Future;
+		self.send_to_dropped_stream_sink(tx, status.clone());
+		self.send_to_aggregated_stream_sink(tx, status);
+	}
+	fn limits_enforced(&self, tx: ExtrinsicHash<C>) {
+		let status = TransactionStatus::Dropped;
+		self.send_to_dropped_stream_sink(tx, status);
+	}
+	fn usurped(&self, tx: ExtrinsicHash<C>, by: ExtrinsicHash<C>) {
+		let status = TransactionStatus::Usurped(by.clone());
+		self.send_to_dropped_stream_sink(tx, status);
+	}
+	fn pruned(&self, tx: ExtrinsicHash<C>, block_hash: BlockHash<C>, tx_index: usize) {
+		let status = TransactionStatus::InBlock((block_hash, tx_index));
+		self.send_to_aggregated_stream_sink(tx, status);
+	}
+	fn finality_timeout(&self, tx: ExtrinsicHash<C>, hash: BlockHash<C>) {
+		let status = TransactionStatus::FinalityTimeout(hash);
+		//todo: do we need this? [related issue: #5482]
+		self.send_to_aggregated_stream_sink(tx, status);
+	}
+impl<ChainApi: graph::ChainApi> ViewPoolObserver<ChainApi> {
+	/// Creates an instance of `ViewPoolObserver` together with associated view's streams.
+	///
+	/// This methods creates an event handler that shall be registered in the `ValidatedPool`
+	/// instance associated with the view. It also creates new view's streams:
+	/// - a single stream intended to watch dropped transactions only. The stream can be used to
+	///   subscribe to events related to dropping of all extrinsics in the pool.
+	/// - a single merged stream for all extrinsics in the associated pool. The stream can be used
+	/// to subscribe to life-cycle events of all extrinsics in the pool. For fork-aware
+	/// pool implementation this approach seems to be more efficient than using individual
+	/// streams for every transaction.
+	fn new() -> (
+		Self,
+		DroppedMonitoringStream<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
+		AggregatedStream<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
+	) {
+		let (dropped_stream_sink, dropped_stream) =
+			tracing_unbounded("mpsc_txpool_watcher", VIEW_STREAM_WARN_THRESHOLD);
+		let (aggregated_stream_sink, aggregated_stream) =
+			tracing_unbounded("mpsc_txpool_aggregated_stream", VIEW_STREAM_WARN_THRESHOLD);
+		(Self { dropped_stream_sink, aggregated_stream_sink }, dropped_stream, aggregated_stream)
+	}
+	/// Sends given event to the `dropped_stream_sink`.
+	fn send_to_dropped_stream_sink(
+		&self,
+		tx: ExtrinsicHash<ChainApi>,
+		status: TransactionStatus<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
+	) {
+		if let Err(e) = self.dropped_stream_sink.unbounded_send((tx.clone(), status.clone())) {
+			trace!(target: LOG_TARGET, "[{:?}] dropped_sink: {:?} send message failed: {:?}", tx, status, e);
+		}
+	}
+	/// Sends given event to the `aggregated_stream_sink`.
+	fn send_to_aggregated_stream_sink(
+		&self,
+		tx: ExtrinsicHash<ChainApi>,
+		status: TransactionStatus<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
+	) {
+		if let Err(e) = self.aggregated_stream_sink.unbounded_send((tx.clone(), status.clone())) {
+			trace!(target: LOG_TARGET, "[{:?}] aggregated_stream {:?} send message failed: {:?}", tx, status, e);
+		}
+	}
 /// Represents the state of transaction pool for given block.
 /// Refer to [*View*](../index.html#view) section for more details on the purpose and life cycle of
 /// the `View`.
 pub(super) struct View<ChainApi: graph::ChainApi> {
 	/// The internal pool keeping the set of ready and future transaction at the given block.
-	pub(super) pool: graph::Pool<ChainApi>,
+	pub(super) pool: graph::Pool<ChainApi, ViewPoolObserver<ChainApi>>,
 	/// The hash and number of the block with which this view is associated.
 	pub(super) at: HashAndNumber<ChainApi::Block>,
 	/// Endpoints of communication channel with background worker.
@@ -135,24 +262,50 @@ where
 		options: graph::Options,
 		metrics: PrometheusMetrics,
 		is_validator: IsValidator,
-	) -> Self {
+	) -> (
+		Self,
+		DroppedMonitoringStream<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
+		AggregatedStream<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
+	) {|metrics|;
-		Self {
-			pool: graph::Pool::new(options, is_validator, api),
-			at,
-			revalidation_worker_channels: Mutex::from(None),
-			metrics,
-		}
+		let (event_handler, dropped_stream, aggregated_stream) = ViewPoolObserver::new();
+		(
+			Self {
+				pool: graph::Pool::new_with_event_handler(
+					options,
+					is_validator,
+					api,
+					event_handler,
+				),
+				at,
+				revalidation_worker_channels: Mutex::from(None),
+				metrics,
+			},
+			dropped_stream,
+			aggregated_stream,
+		)
 	/// Creates a copy of the other view.
-	pub(super) fn new_from_other(&self, at: &HashAndNumber<ChainApi::Block>) -> Self {
-		View {
-			at: at.clone(),
-			pool: self.pool.deep_clone(),
-			revalidation_worker_channels: Mutex::from(None),
-			metrics: self.metrics.clone(),
-		}
+	pub(super) fn new_from_other(
+		&self,
+		at: &HashAndNumber<ChainApi::Block>,
+	) -> (
+		Self,
+		DroppedMonitoringStream<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
+		AggregatedStream<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
+	) {
+		let (event_handler, dropped_stream, aggregated_stream) = ViewPoolObserver::new();
+		(
+			View {
+				at: at.clone(),
+				pool: self.pool.deep_clone_with_event_handler(event_handler),
+				revalidation_worker_channels: Mutex::from(None),
+				metrics: self.metrics.clone(),
+			},
+			dropped_stream,
+			aggregated_stream,
+		)
 	/// Imports single unvalidated extrinsic into the view.
@@ -498,7 +651,10 @@ where
 		listener_action: F,
 	) -> Vec<ExtrinsicHash<ChainApi>>
-		F: Fn(&mut crate::graph::Listener<ChainApi>, ExtrinsicHash<ChainApi>),
+		F: Fn(
+			&mut crate::graph::EventDispatcher<ChainApi, ViewPoolObserver<ChainApi>>,
+			ExtrinsicHash<ChainApi>,
+		),
 		self.pool.validated_pool().remove_subtree(tx_hash, listener_action)
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/ b/substrate/client/transaction-pool/src/fork_aware_txpool/
index e534decf9b1..6688ac549dc 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/
@@ -20,7 +20,7 @@
 use super::{
 	multi_view_listener::{MultiViewListener, TxStatusStream},
-	view::View,
+	view::{View, ViewPoolObserver},
 use crate::{
@@ -58,8 +58,13 @@ where
 /// Helper type representing the callback allowing to trigger per-transaction events on
 /// `ValidatedPool`'s listener.
-type RemovalListener<ChainApi> =
-	Arc<dyn Fn(&mut crate::graph::Listener<ChainApi>, ExtrinsicHash<ChainApi>) + Send + Sync>;
+type RemovalCallback<ChainApi> = Arc<
+	dyn Fn(
+			&mut crate::graph::EventDispatcher<ChainApi, ViewPoolObserver<ChainApi>>,
+			ExtrinsicHash<ChainApi>,
+		) + Send
+		+ Sync,
 /// Helper struct to maintain the context for pending transaction removal, executed for
 /// newly inserted views.
@@ -70,7 +75,7 @@ where
 	/// Hash of the transaction that will be removed,
 	xt_hash: ExtrinsicHash<ChainApi>,
 	/// Action that shall be executed on underlying `ValidatedPool`'s listener.
-	listener_action: RemovalListener<ChainApi>,
+	listener_action: RemovalCallback<ChainApi>,
 /// This enum represents an action that should be executed on the newly built
@@ -115,7 +120,7 @@ where
 	/// Creates new unprocessed instance of pending transaction removal.
 	fn new_removal_action(
 		xt_hash: ExtrinsicHash<ChainApi>,
-		listener: RemovalListener<ChainApi>,
+		listener: RemovalCallback<ChainApi>,
 	) -> Self {
 		Self {
 			processed: false,
@@ -806,8 +811,10 @@ where
 		listener_action: F,
 	) -> Vec<ExtrinsicHash<ChainApi>>
-		F: Fn(&mut crate::graph::Listener<ChainApi>, ExtrinsicHash<ChainApi>)
-			+ Clone
+		F: Fn(
+				&mut crate::graph::EventDispatcher<ChainApi, ViewPoolObserver<ChainApi>>,
+				ExtrinsicHash<ChainApi>,
+			) + Clone
 			+ Send
 			+ Sync
 			+ 'static,