From 2b18e080cfcd6b56ee638c729f891154e566e52e Mon Sep 17 00:00:00 2001
From: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com>
Date: Thu, 23 Jan 2025 16:52:58 +0100
Subject: [PATCH] integrated

---
 .../fork_aware_txpool/fork_aware_txpool.rs    | 88 ++++++-------------
 .../src/fork_aware_txpool/tx_mem_pool.rs      |  6 +-
 .../src/fork_aware_txpool/view.rs             | 16 +---
 .../src/fork_aware_txpool/view_store.rs       | 25 +-----
 .../transaction-pool/tests/fatp_limits.rs     |  3 +-
 5 files changed, 34 insertions(+), 104 deletions(-)

diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs
index 76604571825..38f395c7b58 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs
@@ -192,7 +192,9 @@ where
 		future_limits: crate::PoolLimit,
 		mempool_max_transactions_count: usize,
 	) -> (Self, ForkAwareTxPoolTask) {
-		let listener = Arc::from(MultiViewListener::new());
+		let (listener, listener_task) = MultiViewListener::new_with_worker();
+		let listener = Arc::new(listener);
+
 		let (import_notification_sink, import_notification_sink_task) =
 			MultiViewImportNotificationSink::new_with_worker();
 
@@ -219,6 +221,7 @@ where
 
 		let combined_tasks = async move {
 			tokio::select! {
+				_ = listener_task => {},
 				_ = import_notification_sink_task => {},
 				_ = dropped_monitor_task => {}
 			}
@@ -312,7 +315,10 @@ where
 		finalized_hash: Block::Hash,
 	) -> Self {
 		let metrics = PrometheusMetrics::new(prometheus);
-		let listener = Arc::from(MultiViewListener::new());
+
+		let (listener, listener_task) = MultiViewListener::new_with_worker();
+		let listener = Arc::new(listener);
+
 		let (revalidation_queue, revalidation_task) =
 			revalidation_worker::RevalidationQueue::new_with_worker();
 
@@ -341,6 +347,7 @@ where
 
 		let combined_tasks = async move {
 			tokio::select! {
+				_ = listener_task => {}
 				_ = revalidation_task => {},
 				_ = import_notification_sink_task => {},
 				_ = dropped_monitor_task => {}
@@ -1019,6 +1026,7 @@ where
 			)
 		};
 
+		let start = Instant::now();
 		// 1. Capture all import notification from the very beginning, so first register all
 		//the listeners.
 		self.import_notification_sink.add_view(
@@ -1031,26 +1039,27 @@ where
 			view.pool.validated_pool().create_dropped_by_limits_stream().boxed(),
 		);
 
-		let start = Instant::now();
-		let watched_xts = self.register_listeners(&mut view).await;
-		let duration = start.elapsed();
+		//todo [#5495]: maybe we don't need to register listener in view? We could use
+		// multi_view_listener.transaction_in_block
+		self.view_store.listener.add_view_aggregated_stream(
+			view.at.hash,
+			view.pool.validated_pool().create_aggregated_stream().boxed(),
+		);
 		// sync the transactions statuses and referencing views in all the listeners with newly
 		// cloned view.
 		view.pool.validated_pool().retrigger_notifications();
-		log::debug!(target: LOG_TARGET, "register_listeners: at {at:?} took {duration:?}");
+		log::debug!(target: LOG_TARGET, "register listeners: at {at:?} took {:?}", start.elapsed());
 
 		// 2. Handle transactions from the tree route. Pruning transactions from the view first
 		// will make some space for mempool transactions in case we are at the view's limits.
 		let start = Instant::now();
 		self.update_view_with_fork(&view, tree_route, at.clone()).await;
-		let duration = start.elapsed();
-		log::debug!(target: LOG_TARGET, "update_view_with_fork: at {at:?} took {duration:?}");
+		log::debug!(target: LOG_TARGET, "update_view_with_fork: at {at:?} took {:?}", start.elapsed());
 
 		// 3. Finally, submit transactions from the mempool.
 		let start = Instant::now();
-		self.update_view_with_mempool(&mut view, watched_xts).await;
-		let duration = start.elapsed();
-		log::debug!(target: LOG_TARGET, "update_view_with_mempool: at {at:?} took {duration:?}");
+		self.update_view_with_mempool(&mut view).await;
+		log::debug!(target: LOG_TARGET, "update_view_with_mempool: at {at:?} took {:?}", start.elapsed());
 
 		let view = Arc::from(view);
 		self.view_store.insert_new_view(view.clone(), tree_route).await;
@@ -1096,48 +1105,6 @@ where
 		all_extrinsics
 	}
 
-	/// For every watched transaction in the mempool registers a transaction listener in the view.
-	///
-	/// The transaction listener for a given view is also added to multi-view listener. This allows
-	/// to track aggreagated progress of the transaction within the transaction pool.
-	///
-	/// Function returns a list of currently watched transactions in the mempool.
-	async fn register_listeners(
-		&self,
-		view: &View<ChainApi>,
-	) -> Vec<(ExtrinsicHash<ChainApi>, Arc<TxInMemPool<ChainApi, Block>>)> {
-		log::debug!(
-			target: LOG_TARGET,
-			"register_listeners: {:?} xts:{:?} v:{}",
-			view.at,
-			self.mempool.unwatched_and_watched_count(),
-			self.active_views_count()
-		);
-
-		//todo [#5495]: maybe we don't need to register listener in view? We could use
-		// multi_view_listener.transaction_in_block
-		let results = self
-			.mempool
-			.clone_watched()
-			.into_iter()
-			.map(|(tx_hash, tx)| {
-				let watcher = view.create_watcher(tx_hash);
-				let at = view.at.clone();
-				async move {
-					log::trace!(target: LOG_TARGET, "[{:?}] adding watcher {:?}", tx_hash, at.hash);
-					self.view_store.listener.add_view_watcher_for_tx(
-						tx_hash,
-						at.hash,
-						watcher.into_stream().boxed(),
-					);
-					(tx_hash, tx)
-				}
-			})
-			.collect::<Vec<_>>();
-
-		future::join_all(results).await
-	}
-
 	/// Updates the given view with the transactions from the internal mempol.
 	///
 	/// All transactions from the mempool (excluding those which are either already imported or
@@ -1147,15 +1114,10 @@ where
 	/// If there are no views, and mempool transaction is reported as invalid for the given view,
 	/// the transaction is reported as invalid and removed from the mempool. This does not apply to
 	/// stale and temporarily banned transactions.
-	///
-	/// As the listeners for watched transactions were registered at the very beginning of maintain
-	/// procedure (`register_listeners`), this function accepts the list of watched transactions
-	/// from the mempool for which listener was actually registered to avoid submit/maintain races.
-	async fn update_view_with_mempool(
-		&self,
-		view: &View<ChainApi>,
-		watched_xts: Vec<(ExtrinsicHash<ChainApi>, Arc<TxInMemPool<ChainApi, Block>>)>,
-	) {
+	async fn update_view_with_mempool(&self, view: &View<ChainApi>) {
+		let watched_xts: Vec<(ExtrinsicHash<ChainApi>, Arc<TxInMemPool<ChainApi, Block>>)> =
+			self.mempool.clone_watched().into_iter().collect();
+
 		log::debug!(
 			target: LOG_TARGET,
 			"update_view_with_mempool: {:?} xts:{:?} v:{}",
@@ -1203,7 +1165,7 @@ where
 		if self.view_store.is_empty() {
 			for result in watched_results {
 				if let Err(tx_hash) = result {
-					self.view_store.listener.invalidate_transactions(&[tx_hash]);
+					self.view_store.listener.transactions_invalidated(&[tx_hash]);
 					self.mempool.remove_transaction(&tx_hash);
 				}
 			}
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs
index c8a4d0c72dd..1b3e79c889d 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs
@@ -272,7 +272,7 @@ where
 	}
 
 	/// Creates a new `TxMemPool` instance for testing purposes.
-	#[allow(dead_code)]
+	#[cfg(test)]
 	fn new_test(
 		api: Arc<ChainApi>,
 		max_transactions_count: usize,
@@ -280,7 +280,7 @@ where
 	) -> Self {
 		Self {
 			api,
-			listener: Arc::from(MultiViewListener::new()),
+			listener: Arc::from(MultiViewListener::new_with_worker().0),
 			transactions: Default::default(),
 			metrics: Default::default(),
 			max_transactions_count,
@@ -585,7 +585,7 @@ where
 		invalid_hashes.iter().for_each(|i| {
 			transactions.remove(i);
 		});
-		self.listener.invalidate_transactions(&invalid_hashes);
+		self.listener.transactions_invalidated(&invalid_hashes);
 	}
 
 	/// Updates the priority of transaction stored in mempool using provided view_store submission
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs
index a35d68120a3..185a36fcd4f 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs
@@ -27,8 +27,8 @@ use super::metrics::MetricsLink as PrometheusMetrics;
 use crate::{
 	common::log_xt::log_xt_trace,
 	graph::{
-		self, base_pool::TimedTransactionSource, watcher::Watcher, ExtrinsicFor, ExtrinsicHash,
-		IsValidator, ValidatedPoolSubmitOutcome, ValidatedTransaction, ValidatedTransactionFor,
+		self, base_pool::TimedTransactionSource, ExtrinsicFor, ExtrinsicHash, IsValidator,
+		ValidatedPoolSubmitOutcome, ValidatedTransaction, ValidatedTransactionFor,
 	},
 	LOG_TARGET,
 };
@@ -227,18 +227,6 @@ where
 		self.pool.validated_pool().status()
 	}
 
-	/// Creates a watcher for given transaction.
-	///
-	/// Intended to be called for the transaction that already exists in the pool
-	pub(super) fn create_watcher(
-		&self,
-		tx_hash: ExtrinsicHash<ChainApi>,
-	) -> Watcher<ExtrinsicHash<ChainApi>, ExtrinsicHash<ChainApi>> {
-		//todo(minor): some assert could be added here - to make sure that transaction actually
-		// exists in the view.
-		self.pool.validated_pool().create_watcher(tx_hash)
-	}
-
 	/// Revalidates some part of transaction from the internal pool.
 	///
 	/// Intended to be called from the revalidation worker. The revalidation process can be
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs
index 43ed5bbf886..1800a5cbcf1 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs
@@ -31,7 +31,6 @@ use crate::{
 	},
 	ReadyIteratorFor, LOG_TARGET,
 };
-use futures::prelude::*;
 use itertools::Itertools;
 use parking_lot::RwLock;
 use sc_transaction_pool_api::{error::Error as PoolError, PoolStatus};
@@ -292,19 +291,7 @@ where
 					let view = view.clone();
 					let xt = xt.clone();
 					let source = source.clone();
-					async move {
-						match view.submit_and_watch(source, xt).await {
-							Ok(mut result) => {
-								self.listener.add_view_watcher_for_tx(
-									tx_hash,
-									view.at.hash,
-									result.expect_watcher().into_stream().boxed(),
-								);
-								Ok(result)
-							},
-							Err(e) => Err(e),
-						}
-					}
+					async move { view.submit_and_watch(source, xt).await }
 				})
 				.collect::<Vec<_>>()
 		};
@@ -443,7 +430,7 @@ where
 			extrinsics
 				.iter()
 				.enumerate()
-				.for_each(|(i, tx_hash)| self.listener.finalize_transaction(*tx_hash, *block, i));
+				.for_each(|(i, tx_hash)| self.listener.transaction_finalized(*tx_hash, *block, i));
 
 			finalized_transactions.extend(extrinsics);
 		}
@@ -728,13 +715,7 @@ where
 	) {
 		if watched {
 			match view.submit_and_watch(source, xt).await {
-				Ok(mut result) => {
-					self.listener.add_view_watcher_for_tx(
-						xt_hash,
-						view.at.hash,
-						result.expect_watcher().into_stream().boxed(),
-					);
-				},
+				Ok(_) => (),
 				Err(e) => {
 					log::trace!(
 						target:LOG_TARGET,
diff --git a/substrate/client/transaction-pool/tests/fatp_limits.rs b/substrate/client/transaction-pool/tests/fatp_limits.rs
index fb02b21ebc2..50e75e1e28e 100644
--- a/substrate/client/transaction-pool/tests/fatp_limits.rs
+++ b/substrate/client/transaction-pool/tests/fatp_limits.rs
@@ -377,12 +377,11 @@ fn fatp_limits_watcher_view_can_drop_transcation() {
 	assert_eq!(xt0_status, vec![TransactionStatus::Ready, TransactionStatus::Dropped,]);
 
 	assert_ready_iterator!(header01.hash(), pool, [xt1, xt2]);
+	let xt3_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).unwrap();
 
 	let header02 = api.push_block_with_parent(header01.hash(), vec![], true);
 	block_on(pool.maintain(finalized_block_event(&pool, api.genesis_hash(), header02.hash())));
 
-	let xt3_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).unwrap();
-
 	let xt1_status = futures::executor::block_on_stream(xt1_watcher).take(2).collect::<Vec<_>>();
 	assert_eq!(xt1_status, vec![TransactionStatus::Ready, TransactionStatus::Dropped]);
 
-- 
GitLab