From 560db28c987dd1e634119788ebc8318967df206b Mon Sep 17 00:00:00 2001
From: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com>
Date: Mon, 11 Nov 2024 13:03:41 +0100
Subject: [PATCH] dropped_watcher: dropping future transcations improved

---
 .../src/fork_aware_txpool/dropped_watcher.rs  | 118 ++++++++++++++----
 .../fork_aware_txpool/multi_view_listener.rs  |   1 +
 .../src/fork_aware_txpool/tx_mem_pool.rs      |   2 +-
 .../src/fork_aware_txpool/view_store.rs       |   7 +-
 4 files changed, 103 insertions(+), 25 deletions(-)

diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs
index 390d552cc03..8de10555ea3 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs
@@ -33,7 +33,10 @@ use sc_transaction_pool_api::TransactionStatus;
 use sc_utils::mpsc;
 use sp_runtime::traits::Block as BlockT;
 use std::{
-	collections::{hash_map::Entry, HashMap, HashSet},
+	collections::{
+		hash_map::{Entry, OccupiedEntry},
+		HashMap, HashSet,
+	},
 	fmt::{self, Debug, Formatter},
 	pin::Pin,
 };
@@ -98,7 +101,7 @@ where
 	AddView(BlockHash<ChainApi>, ViewStream<ChainApi>),
 	/// Removes an existing view's stream associated with a specific block hash.
 	RemoveView(BlockHash<ChainApi>),
-	/// Removes internal states for given extrinsic hashes.
+	/// Removes referencing views for given extrinsic hashes.
 	///
 	/// Intended to ba called on finalization.
 	RemoveFinalizedTxs(Vec<ExtrinsicHash<ChainApi>>),
@@ -129,16 +132,29 @@ where
 	/// A map that associates the views identified by corresponding block hashes with their streams
 	/// of dropped-related events. This map is used to keep track of active views and their event
 	/// streams.
+	/// todo: rename: view_stream map
 	stream_map: StreamMap<BlockHash<ChainApi>, ViewStream<ChainApi>>,
 	/// A receiver for commands to control the state of the stream, allowing the addition and
 	/// removal of views. This is used to dynamically update which views are being tracked.
 	command_receiver: CommandReceiver<Command<ChainApi>>,
-
 	/// For each transaction hash we keep the set of hashes representing the views that see this
-	/// transaction as ready or future.
+	/// transaction as ready or in_block.
+	///
+	/// Even if all views referencing a ready transactions are removed, we still want to keep
+	/// transaction, there can be a fork which sees the transaction as ready.
+	///
+	/// Once transaction is dropped, dropping view is removed from the set.
+	ready_transaction_views: HashMap<ExtrinsicHash<ChainApi>, HashSet<BlockHash<ChainApi>>>,
+	/// For each transaction hash we keep the set of hashes representing the views that see this
+	/// transaction as future.
+	///
+	/// Once all views referencing a future transactions are removed, the future can be dropped.
 	///
 	/// Once transaction is dropped, dropping view is removed from the set.
-	transaction_states: HashMap<ExtrinsicHash<ChainApi>, HashSet<BlockHash<ChainApi>>>,
+	future_transaction_views: HashMap<ExtrinsicHash<ChainApi>, HashSet<BlockHash<ChainApi>>>,
+
+	/// Transactions that need to be notified as dropped.
+	pending_dropped_transactions: Vec<ExtrinsicHash<ChainApi>>,
 }
 
 impl<C> MultiViewDropWatcherContext<C>
@@ -146,6 +162,23 @@ where
 	C: graph::ChainApi + 'static,
 	<<C as graph::ChainApi>::Block as BlockT>::Hash: Unpin,
 {
+	/// Provides the ready or future `HashSet` containing views referencing given transaction.
+	fn get_transaction_views(
+		&mut self,
+		tx_hash: ExtrinsicHash<C>,
+	) -> Option<OccupiedEntry<ExtrinsicHash<C>, HashSet<BlockHash<C>>>> {
+		if let Entry::Occupied(views_keeping_tx_valid) = self.ready_transaction_views.entry(tx_hash)
+		{
+			return Some(views_keeping_tx_valid)
+		}
+		if let Entry::Occupied(views_keeping_tx_valid) =
+			self.future_transaction_views.entry(tx_hash)
+		{
+			return Some(views_keeping_tx_valid)
+		}
+		None
+	}
+
 	/// Processes a `ViewStreamEvent` from a specific view and updates the internal state
 	/// accordingly.
 	///
@@ -164,13 +197,19 @@ where
 		);
 		let (tx_hash, status) = event;
 		match status {
-			TransactionStatus::Ready | TransactionStatus::Future => {
-				self.transaction_states.entry(tx_hash).or_default().insert(block_hash);
+			TransactionStatus::Future => {
+				self.future_transaction_views.entry(tx_hash).or_default().insert(block_hash);
+			},
+			TransactionStatus::Ready | TransactionStatus::InBlock(..) => {
+				if let Some(mut views) = self.future_transaction_views.remove(&tx_hash) {
+					views.insert(block_hash);
+					self.ready_transaction_views.insert(tx_hash, views);
+				} else {
+					self.ready_transaction_views.entry(tx_hash).or_default().insert(block_hash);
+				}
 			},
 			TransactionStatus::Dropped => {
-				if let Entry::Occupied(mut views_keeping_tx_valid) =
-					self.transaction_states.entry(tx_hash)
-				{
+				if let Some(mut views_keeping_tx_valid) = self.get_transaction_views(tx_hash) {
 					views_keeping_tx_valid.get_mut().remove(&block_hash);
 					if views_keeping_tx_valid.get().is_empty() {
 						return Some(DroppedTransaction::new_enforced_by_limts(tx_hash))
@@ -186,7 +225,7 @@ where
 			//    replace it with new one (also in mempool).
 			TransactionStatus::Usurped(by) => {
 				if let Entry::Occupied(mut views_keeping_tx_valid) =
-					self.transaction_states.entry(tx_hash)
+					self.ready_transaction_views.entry(tx_hash)
 				{
 					views_keeping_tx_valid.get_mut().remove(&block_hash);
 					if views_keeping_tx_valid.get().is_empty() {
@@ -202,6 +241,25 @@ where
 		None
 	}
 
+	/// Gets pending dropped transactions if any.
+	fn get_pending_dropped_transaction(&mut self) -> Option<DroppedTransaction<ExtrinsicHash<C>>> {
+		while let Some(tx_hash) = self.pending_dropped_transactions.pop() {
+			// never drop transaction that was seens as ready. It may not have a referencing
+			// view now, but such fork can appear.
+			if let Some(_) = self.ready_transaction_views.get(&tx_hash) {
+				continue
+			}
+
+			if let Some(views) = self.future_transaction_views.get(&tx_hash) {
+				if views.is_empty() {
+					self.future_transaction_views.remove(&tx_hash);
+					return Some(DroppedTransaction::new_enforced_by_limts(tx_hash))
+				}
+			}
+		}
+		return None
+	}
+
 	/// Creates a new `StreamOfDropped` and its associated event stream controller.
 	///
 	/// This method initializes the internal structures and unfolds the stream of dropped
@@ -218,13 +276,25 @@ where
 		let ctx = Self {
 			stream_map: StreamMap::new(),
 			command_receiver,
-			transaction_states: Default::default(),
+			ready_transaction_views: Default::default(),
+			future_transaction_views: Default::default(),
+			pending_dropped_transactions: Default::default(),
 		};
 
 		let stream_map = futures::stream::unfold(ctx, |mut ctx| async move {
 			loop {
+				if let Some(dropped) = ctx.get_pending_dropped_transaction() {
+					debug!("dropped_watcher: sending out (pending): {dropped:?}");
+					return Some((dropped, ctx));
+				}
 				tokio::select! {
 					biased;
+					Some(event) = next_event(&mut ctx.stream_map) => {
+						if let Some(dropped) = ctx.handle_event(event.0, event.1) {
+							debug!("dropped_watcher: sending out: {dropped:?}");
+							return Some((dropped, ctx));
+						}
+					},
 					cmd = ctx.command_receiver.next() => {
 						match cmd? {
 							Command::AddView(key,stream) => {
@@ -234,26 +304,30 @@ where
 							Command::RemoveView(key) => {
 								trace!(target: LOG_TARGET,"dropped_watcher: Command::RemoveView {key:?} views:{:?}",ctx.stream_map.keys().collect::<Vec<_>>());
 								ctx.stream_map.remove(&key);
-								ctx.transaction_states.iter_mut().for_each(|(_,state)| {
-									state.remove(&key);
+								ctx.ready_transaction_views.iter_mut().for_each(|(tx_hash,views)| {
+									trace!(target: LOG_TARGET,"[{:?}] dropped_watcher: Command::RemoveView ready views: {:?}",tx_hash, views);
+									views.remove(&key);
+								});
+
+								ctx.future_transaction_views.iter_mut().for_each(|(tx_hash,views)| {
+									trace!(target: LOG_TARGET,"[{:?}] dropped_watcher: Command::RemoveView future views: {:?}",tx_hash, views);
+									views.remove(&key);
+									if views.is_empty() {
+										ctx.pending_dropped_transactions.push(*tx_hash);
+									}
 								});
 							},
 							Command::RemoveFinalizedTxs(xts) => {
 								log_xt_trace!(target: LOG_TARGET, xts.clone(), "[{:?}] dropped_watcher: finalized xt removed");
 								xts.iter().for_each(|xt| {
-									ctx.transaction_states.remove(xt);
+									ctx.ready_transaction_views.remove(xt);
+									ctx.future_transaction_views.remove(xt);
 								});
 
 							},
 						}
-					},
-
-					Some(event) = next_event(&mut ctx.stream_map) => {
-						if let Some(dropped) = ctx.handle_event(event.0, event.1) {
-							debug!("dropped_watcher: sending out: {dropped:?}");
-							return Some((dropped, ctx));
-						}
 					}
+
 				}
 			}
 		})
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs
index 081afccf2d7..52acb22d8a6 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs
@@ -270,6 +270,7 @@ where
 	/// stream map.
 	fn remove_view(&mut self, block_hash: BlockHash<ChainApi>) {
 		self.status_stream_map.remove(&block_hash);
+		self.views_keeping_tx_valid.remove(&block_hash);
 		trace!(target: LOG_TARGET, "[{:?}] RemoveView view: {:?} views:{:?}", self.tx_hash, block_hash, self.status_stream_map.keys().collect::<Vec<_>>());
 	}
 }
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs
index be00471317b..16952afe64b 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs
@@ -401,7 +401,7 @@ where
 
 		log::debug!(
 			target: LOG_TARGET,
-			"mempool::revalidate: at {finalized_block:?} count:{input_len}/{count} purged:{} took {duration:?}", invalid_hashes.len(),
+			"mempool::revalidate: at {finalized_block:?} count:{input_len}/{count} invalid_hashes:{} took {duration:?}", invalid_hashes.len(),
 		);
 
 		invalid_hashes
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs
index 8f618fde561..e2af6d3851b 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs
@@ -471,13 +471,16 @@ where
 			log::trace!(target:LOG_TARGET,"handle_finalized: inactive_views: {:?}", inactive_views.keys());
 		}
 
+		log::trace!(target:LOG_TARGET,"handle_finalized: dropped_views: {:?}", dropped_views);
+
+		self.listener.remove_stale_controllers();
+		self.dropped_stream_controller.remove_finalized_txs(finalized_xts.clone());
+
 		self.listener.remove_view(finalized_hash);
 		for view in dropped_views {
 			self.listener.remove_view(view);
 			self.dropped_stream_controller.remove_view(view);
 		}
-		self.listener.remove_stale_controllers();
-		self.dropped_stream_controller.remove_finalized_txs(finalized_xts.clone());
 
 		finalized_xts
 	}
-- 
GitLab