From 48214a381438f9b78653b8995bb4e62df9da504a Mon Sep 17 00:00:00 2001
From: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com>
Date: Wed, 5 Feb 2025 15:55:40 +0100
Subject: [PATCH] Dropped(invalid) supported

When view report transaction as invalid it shall be treated as dropped.
This commit adds the support for this.
---
 .../src/fork_aware_txpool/dropped_watcher.rs  | 20 ++++++++++-
 .../fork_aware_txpool/fork_aware_txpool.rs    |  8 +++--
 .../fork_aware_txpool/multi_view_listener.rs  |  5 ++-
 .../transaction-pool/src/graph/listener.rs    |  5 ++-
 .../client/transaction-pool/tests/fatp.rs     | 34 ++++++++-----------
 5 files changed, 47 insertions(+), 25 deletions(-)

diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs
index be20a160896..9e078617635 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs
@@ -62,6 +62,11 @@ impl<Hash> DroppedTransaction<Hash> {
 	pub fn new_enforced_by_limts(tx_hash: Hash) -> Self {
 		Self { reason: DroppedReason::LimitsEnforced, tx_hash }
 	}
+
+	/// Creates a new instance with reason set to `DroppedReason::Invalid`.
+	pub fn new_invalid(tx_hash: Hash) -> Self {
+		Self { reason: DroppedReason::Invalid, tx_hash }
+	}
 }
 
 /// Provides reason of why transactions was dropped.
@@ -71,6 +76,8 @@ pub enum DroppedReason<Hash> {
 	Usurped(Hash),
 	/// Transaction was dropped because of internal pool limits being enforced.
 	LimitsEnforced,
+	/// Transaction was dropped because of being invalid.
+	Invalid,
 }
 
 /// Dropped-logic related event from the single view.
@@ -279,12 +286,23 @@ where
 						return Some(DroppedTransaction::new_enforced_by_limts(tx_hash))
 					}
 				} else {
-					debug!(target: LOG_TARGET, ?tx_hash, "dropped_watcher: removing (non-tracked) tx");
+					debug!(target: LOG_TARGET, ?tx_hash, "dropped_watcher: removing (non-tracked dropped) tx");
 					return Some(DroppedTransaction::new_enforced_by_limts(tx_hash))
 				}
 			},
 			TransactionStatus::Usurped(by) =>
 				return Some(DroppedTransaction::new_usurped(tx_hash, by)),
+			TransactionStatus::Invalid => {
+				if let Some(mut views_keeping_tx_valid) = self.transaction_views(tx_hash) {
+					views_keeping_tx_valid.get_mut().remove(&block_hash);
+					if views_keeping_tx_valid.get().is_empty() {
+						return Some(DroppedTransaction::new_invalid(tx_hash))
+					}
+				} else {
+					debug!(target: LOG_TARGET, ?tx_hash, "dropped_watcher: removing (non-tracked invalid) tx");
+					return Some(DroppedTransaction::new_invalid(tx_hash))
+				}
+			},
 			_ => {},
 		};
 		None
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs
index 7938d875015..85c517e7e0d 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs
@@ -289,14 +289,16 @@ where
 							tx_hash = ?new_tx_hash,
 							"error: dropped_monitor_task: no entry in mempool for new transaction"
 						);
-					}
+					};
+				},
+				DroppedReason::LimitsEnforced | DroppedReason::Invalid => {
+					view_store.remove_transaction_subtree(tx_hash, |_, _| {});
 				},
-				DroppedReason::LimitsEnforced => {},
 			};
 
 			mempool.remove_transaction(&tx_hash);
-			view_store.listener.transaction_dropped(dropped);
 			import_notification_sink.clean_notified_items(&[tx_hash]);
+			view_store.listener.transaction_dropped(dropped);
 		}
 	}
 
diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs
index 1ef733f3565..1f7635e1a13 100644
--- a/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs
+++ b/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs
@@ -307,8 +307,11 @@ where
 				self.terminate = true;
 				return Some(TransactionStatus::Usurped(by))
 			},
+			TransactionStatusUpdate::TransactionDropped(_, DroppedReason::Invalid) => {
+				self.terminate = true;
+				return Some(TransactionStatus::Invalid)
+			},
 		};
-		None
 	}
 
 	/// Handles various transaction status updates from individual views and manages internal states
diff --git a/substrate/client/transaction-pool/src/graph/listener.rs b/substrate/client/transaction-pool/src/graph/listener.rs
index 0e70334ea0e..340b6d429ae 100644
--- a/substrate/client/transaction-pool/src/graph/listener.rs
+++ b/substrate/client/transaction-pool/src/graph/listener.rs
@@ -43,7 +43,8 @@ pub struct Listener<H: hash::Hash + Eq, C: ChainApi> {
 	watchers: HashMap<H, watcher::Sender<H, BlockHash<C>>>,
 	finality_watchers: LinkedHashMap<ExtrinsicHash<C>, Vec<H>>,
 
-	/// The sink used to notify dropped by enforcing limits or by being usurped transactions.
+	/// The sink used to notify dropped by enforcing limits or by being usurped, or invalid
+	/// transactions.
 	///
 	/// Note: Ready and future statuses are alse communicated through this channel, enabling the
 	/// stream consumer to track views that reference the transaction.
@@ -195,6 +196,8 @@ impl<H: hash::Hash + traits::Member + Serialize + Clone, C: ChainApi> Listener<H
 	pub fn invalid(&mut self, tx: &H) {
 		trace!(target: LOG_TARGET, "[{:?}] Extrinsic invalid", tx);
 		self.fire(tx, |watcher| watcher.invalid());
+
+		self.send_to_dropped_stream_sink(tx, TransactionStatus::Invalid);
 	}
 
 	/// Transaction was pruned from the pool.
diff --git a/substrate/client/transaction-pool/tests/fatp.rs b/substrate/client/transaction-pool/tests/fatp.rs
index ecdf2b0a083..a4a932dd853 100644
--- a/substrate/client/transaction-pool/tests/fatp.rs
+++ b/substrate/client/transaction-pool/tests/fatp.rs
@@ -788,11 +788,12 @@ fn fatp_linear_progress_finalization() {
 	let f00 = forks[0][0].hash();
 	let f12 = forks[1][2].hash();
 	let f14 = forks[1][4].hash();
+	let f15 = forks[1][5].hash();
 
 	let event = new_best_block_event(&pool, None, f00);
 	block_on(pool.maintain(event));
 
-	let xt0 = uxt(Bob, 204);
+	let xt0 = uxt(Bob, 205);
 	let submissions = vec![pool.submit_one(invalid_hash(), SOURCE, xt0.clone())];
 	block_on(futures::future::join_all(submissions));
 
@@ -803,13 +804,13 @@ fn fatp_linear_progress_finalization() {
 
 	log::debug!(target:LOG_TARGET, "stats: {:#?}", pool.status_all());
 
-	let event = ChainEvent::Finalized { hash: f14, tree_route: Arc::from(vec![]) };
-	block_on(pool.maintain(event));
+	block_on(pool.maintain(new_best_block_event(&pool, Some(f12), f15)));
+	block_on(pool.maintain(finalized_block_event(&pool, f00, f14)));
 
 	log::debug!(target:LOG_TARGET, "stats: {:#?}", pool.status_all());
 
 	assert_eq!(pool.active_views_count(), 1);
-	assert_pool_status!(f14, &pool, 1, 0);
+	assert_pool_status!(f15, &pool, 1, 0);
 }
 
 #[test]
@@ -870,14 +871,12 @@ fn fatp_watcher_future() {
 
 	assert_pool_status!(header01.hash(), &pool, 0, 1);
 
-	let header02 = api.push_block(2, vec![], true);
-	let event = ChainEvent::Finalized {
-		hash: header02.hash(),
-		tree_route: Arc::from(vec![header01.hash()]),
-	};
-	block_on(pool.maintain(event));
+	let header02 = api.push_block_with_parent(header01.hash(), vec![], true);
+	let header03 = api.push_block_with_parent(header02.hash(), vec![], true);
+	block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header03.hash())));
+	block_on(pool.maintain(finalized_block_event(&pool, header01.hash(), header02.hash())));
 
-	assert_pool_status!(header02.hash(), &pool, 0, 1);
+	assert_pool_status!(header03.hash(), &pool, 0, 1);
 
 	let xt0_events = block_on(xt0_watcher.take(1).collect::<Vec<_>>());
 	assert_eq!(xt0_events, vec![TransactionStatus::Future]);
@@ -1001,15 +1000,12 @@ fn fatp_watcher_future_and_finalized() {
 
 	assert_pool_status!(header01.hash(), &pool, 1, 1);
 
-	let header02 = api.push_block(2, vec![xt0], true);
-	let event = ChainEvent::Finalized {
-		hash: header02.hash(),
-		tree_route: Arc::from(vec![header01.hash()]),
-	};
-	// let event = new_best_block_event(&pool, Some(header01.hash()), header02.hash());
-	block_on(pool.maintain(event));
+	let header02 = api.push_block_with_parent(header01.hash(), vec![xt0], true);
+	let header03 = api.push_block_with_parent(header02.hash(), vec![], true);
+	block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header03.hash())));
+	block_on(pool.maintain(finalized_block_event(&pool, header01.hash(), header02.hash())));
 
-	assert_pool_status!(header02.hash(), &pool, 0, 1);
+	assert_pool_status!(header03.hash(), &pool, 0, 1);
 
 	let xt1_status = block_on(xt1_watcher.take(1).collect::<Vec<_>>());
 	assert_eq!(xt1_status, vec![TransactionStatus::Future]);
-- 
GitLab