From 085935dd0a1349b69a5704b34ac7bc7b9ef87a05 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= <tomusdrw@users.noreply.github.com>
Date: Fri, 1 Oct 2021 16:25:13 +0200
Subject: [PATCH] Quickly skip invalid transactions during block authorship.
 (#9789)

* Support skipping invalid transactions in the iterator.

* Expose concrete iterator.

* cargo +nightly fmt --all

* More consistent placement.

* Update Cargo.lock

* Pass transaction to 'report_invalid'
---
 substrate/bin/node/bench/src/construct.rs     |  28 +++--
 .../basic-authorship/src/basic_authorship.rs  |   8 +-
 .../client/transaction-pool/api/src/lib.rs    |  28 ++++-
 .../client/transaction-pool/graph/Cargo.toml  |  39 ------
 .../transaction-pool/src/graph/base_pool.rs   |   4 +-
 .../transaction-pool/src/graph/ready.rs       | 114 +++++++++++++++---
 .../src/graph/validated_pool.rs               |   4 +-
 substrate/client/transaction-pool/src/lib.rs  |   5 +-
 8 files changed, 159 insertions(+), 71 deletions(-)
 delete mode 100644 substrate/client/transaction-pool/graph/Cargo.toml

diff --git a/substrate/bin/node/bench/src/construct.rs b/substrate/bin/node/bench/src/construct.rs
index 1532e02bd3e..ca1a1c18f9e 100644
--- a/substrate/bin/node/bench/src/construct.rs
+++ b/substrate/bin/node/bench/src/construct.rs
@@ -30,8 +30,8 @@ use std::{borrow::Cow, collections::HashMap, pin::Pin, sync::Arc};
 use node_primitives::Block;
 use node_testing::bench::{BenchDb, BlockType, DatabaseType, KeyTypes, Profile};
 use sc_transaction_pool_api::{
-	ImportNotificationStream, PoolFuture, PoolStatus, TransactionFor, TransactionSource,
-	TransactionStatusStreamFor, TxHash,
+	ImportNotificationStream, PoolFuture, PoolStatus, ReadyTransactions, TransactionFor,
+	TransactionSource, TransactionStatusStreamFor, TxHash,
 };
 use sp_consensus::{Environment, Proposer};
 use sp_inherents::InherentDataProvider;
@@ -216,6 +216,19 @@ impl sc_transaction_pool_api::InPoolTransaction for PoolTransaction {
 
 #[derive(Clone, Debug)]
 pub struct Transactions(Vec<Arc<PoolTransaction>>);
+pub struct TransactionsIterator(std::vec::IntoIter<Arc<PoolTransaction>>);
+
+impl Iterator for TransactionsIterator {
+	type Item = Arc<PoolTransaction>;
+
+	fn next(&mut self) -> Option<Self::Item> {
+		self.0.next()
+	}
+}
+
+impl ReadyTransactions for TransactionsIterator {
+	fn report_invalid(&mut self, _tx: &Self::Item) {}
+}
 
 impl sc_transaction_pool_api::TransactionPool for Transactions {
 	type Block = Block;
@@ -257,16 +270,17 @@ impl sc_transaction_pool_api::TransactionPool for Transactions {
 		_at: NumberFor<Self::Block>,
 	) -> Pin<
 		Box<
-			dyn Future<Output = Box<dyn Iterator<Item = Arc<Self::InPoolTransaction>> + Send>>
-				+ Send,
+			dyn Future<
+					Output = Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>,
+				> + Send,
 		>,
 	> {
-		let iter: Box<dyn Iterator<Item = Arc<PoolTransaction>> + Send> =
-			Box::new(self.0.clone().into_iter());
+		let iter: Box<dyn ReadyTransactions<Item = Arc<PoolTransaction>> + Send> =
+			Box::new(TransactionsIterator(self.0.clone().into_iter()));
 		Box::pin(futures::future::ready(iter))
 	}
 
-	fn ready(&self) -> Box<dyn Iterator<Item = Arc<Self::InPoolTransaction>> + Send> {
+	fn ready(&self) -> Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send> {
 		unimplemented!()
 	}
 
diff --git a/substrate/client/basic-authorship/src/basic_authorship.rs b/substrate/client/basic-authorship/src/basic_authorship.rs
index e38bb11688f..bbee60ae98d 100644
--- a/substrate/client/basic-authorship/src/basic_authorship.rs
+++ b/substrate/client/basic-authorship/src/basic_authorship.rs
@@ -344,7 +344,7 @@ where
 		let mut t2 =
 			futures_timer::Delay::new(deadline.saturating_duration_since((self.now)()) / 8).fuse();
 
-		let pending_iterator = select! {
+		let mut pending_iterator = select! {
 			res = t1 => res,
 			_ = t2 => {
 				log::warn!(
@@ -363,7 +363,7 @@ where
 		let mut transaction_pushed = false;
 		let mut hit_block_size_limit = false;
 
-		for pending_tx in pending_iterator {
+		while let Some(pending_tx) = pending_iterator.next() {
 			if (self.now)() > deadline {
 				debug!(
 					"Consensus deadline reached when pushing block transactions, \
@@ -378,6 +378,7 @@ where
 			let block_size =
 				block_builder.estimate_block_size(self.include_proof_in_block_size_estimation);
 			if block_size + pending_tx_data.encoded_size() > block_size_limit {
+				pending_iterator.report_invalid(&pending_tx);
 				if skipped < MAX_SKIPPED_TRANSACTIONS {
 					skipped += 1;
 					debug!(
@@ -400,6 +401,7 @@ where
 					debug!("[{:?}] Pushed to the block.", pending_tx_hash);
 				},
 				Err(ApplyExtrinsicFailed(Validity(e))) if e.exhausted_resources() => {
+					pending_iterator.report_invalid(&pending_tx);
 					if skipped < MAX_SKIPPED_TRANSACTIONS {
 						skipped += 1;
 						debug!(
@@ -412,6 +414,7 @@ where
 					}
 				},
 				Err(e) if skipped > 0 => {
+					pending_iterator.report_invalid(&pending_tx);
 					trace!(
 						"[{:?}] Ignoring invalid transaction when skipping: {}",
 						pending_tx_hash,
@@ -419,6 +422,7 @@ where
 					);
 				},
 				Err(e) => {
+					pending_iterator.report_invalid(&pending_tx);
 					debug!("[{:?}] Invalid transaction: {}", pending_tx_hash, e);
 					unqueue_invalid.push(pending_tx_hash);
 				},
diff --git a/substrate/client/transaction-pool/api/src/lib.rs b/substrate/client/transaction-pool/api/src/lib.rs
index a6252f1373c..cd8784bfc83 100644
--- a/substrate/client/transaction-pool/api/src/lib.rs
+++ b/substrate/client/transaction-pool/api/src/lib.rs
@@ -223,13 +223,14 @@ pub trait TransactionPool: Send + Sync {
 		at: NumberFor<Self::Block>,
 	) -> Pin<
 		Box<
-			dyn Future<Output = Box<dyn Iterator<Item = Arc<Self::InPoolTransaction>> + Send>>
-				+ Send,
+			dyn Future<
+					Output = Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>,
+				> + Send,
 		>,
 	>;
 
 	/// Get an iterator for ready transactions ordered by priority.
-	fn ready(&self) -> Box<dyn Iterator<Item = Arc<Self::InPoolTransaction>> + Send>;
+	fn ready(&self) -> Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>;
 
 	// *** Block production
 	/// Remove transactions identified by given hashes (and dependent transactions) from the pool.
@@ -254,6 +255,27 @@ pub trait TransactionPool: Send + Sync {
 	fn ready_transaction(&self, hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>>;
 }
 
+/// An iterator of ready transactions.
+///
+/// The trait extends regular [`std::iter::Iterator`] trait and allows reporting
+/// last-returned element as invalid.
+///
+/// The implementation is then allowed, for performance reasons, to change the elements
+/// returned next, by e.g.  skipping elements that are known to depend on the reported
+/// transaction, which yields them invalid as well.
+pub trait ReadyTransactions: Iterator {
+	/// Report given transaction as invalid.
+	///
+	/// This might affect subsequent elements returned by the iterator, so dependent transactions
+	/// are skipped for performance reasons.
+	fn report_invalid(&mut self, _tx: &Self::Item);
+}
+
+/// A no-op implementation for an empty iterator.
+impl<T> ReadyTransactions for std::iter::Empty<T> {
+	fn report_invalid(&mut self, _tx: &T) {}
+}
+
 /// Events that the transaction pool listens for.
 pub enum ChainEvent<B: BlockT> {
 	/// New best block have been added to the chain
diff --git a/substrate/client/transaction-pool/graph/Cargo.toml b/substrate/client/transaction-pool/graph/Cargo.toml
deleted file mode 100644
index b49cadc51c3..00000000000
--- a/substrate/client/transaction-pool/graph/Cargo.toml
+++ /dev/null
@@ -1,39 +0,0 @@
-[package]
-name = "sc-transaction-graph"
-version = "4.0.0-dev"
-authors = ["Parity Technologies <admin@parity.io>"]
-edition = "2018"
-license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
-homepage = "https://substrate.dev"
-repository = "https://github.com/paritytech/substrate/"
-description = "Generic Transaction Pool"
-readme = "README.md"
-
-[package.metadata.docs.rs]
-targets = ["x86_64-unknown-linux-gnu"]
-
-[dependencies]
-derive_more = "0.99.2"
-thiserror = "1.0.21"
-futures = "0.3.9"
-log = "0.4.8"
-parking_lot = "0.11.1"
-serde = { version = "1.0.101", features = ["derive"] }
-sp-blockchain = { version = "4.0.0-dev", path = "../../../primitives/blockchain" }
-sc-utils = { version = "4.0.0-dev", path = "../../utils" }
-sp-core = { version = "4.0.0-dev", path = "../../../primitives/core" }
-sp-runtime = { version = "4.0.0-dev", path = "../../../primitives/runtime" }
-sp-transaction-pool = { version = "4.0.0-dev", path = "../../../primitives/transaction-pool" }
-parity-util-mem = { version = "0.10.0", default-features = false, features = ["primitive-types"] }
-linked-hash-map = "0.5.4"
-retain_mut = "0.1.3"
-
-[dev-dependencies]
-assert_matches = "1.3.0"
-codec = { package = "parity-scale-codec", version = "2.0.0" }
-substrate-test-runtime = { version = "2.0.0", path = "../../../test-utils/runtime" }
-criterion = "0.3"
-
-[[bench]]
-name = "basics"
-harness = false
diff --git a/substrate/client/transaction-pool/src/graph/base_pool.rs b/substrate/client/transaction-pool/src/graph/base_pool.rs
index 890a87e8292..2c8becdfb2f 100644
--- a/substrate/client/transaction-pool/src/graph/base_pool.rs
+++ b/substrate/client/transaction-pool/src/graph/base_pool.rs
@@ -36,7 +36,7 @@ use sp_runtime::{
 
 use super::{
 	future::{FutureTransactions, WaitingTransaction},
-	ready::ReadyTransactions,
+	ready::{BestIterator, ReadyTransactions},
 };
 
 /// Successful import result.
@@ -355,7 +355,7 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash,
 	}
 
 	/// Returns an iterator over ready transactions in the pool.
-	pub fn ready(&self) -> impl Iterator<Item = Arc<Transaction<Hash, Ex>>> {
+	pub fn ready(&self) -> BestIterator<Hash, Ex> {
 		self.ready.get()
 	}
 
diff --git a/substrate/client/transaction-pool/src/graph/ready.rs b/substrate/client/transaction-pool/src/graph/ready.rs
index 03689aeb32e..99a034689cc 100644
--- a/substrate/client/transaction-pool/src/graph/ready.rs
+++ b/substrate/client/transaction-pool/src/graph/ready.rs
@@ -23,7 +23,7 @@ use std::{
 	sync::Arc,
 };
 
-use log::trace;
+use log::{debug, trace};
 use sc_transaction_pool_api::error;
 use serde::Serialize;
 use sp_runtime::{traits::Member, transaction_validity::TransactionTag as Tag};
@@ -156,11 +156,16 @@ impl<Hash: hash::Hash + Member + Serialize, Ex> ReadyTransactions<Hash, Ex> {
 	/// - transactions that are valid for a shorter time go first
 	/// 4. Lastly we sort by the time in the queue
 	/// - transactions that are longer in the queue go first
-	pub fn get(&self) -> impl Iterator<Item = Arc<Transaction<Hash, Ex>>> {
+	///
+	/// The iterator is providing a way to report transactions that the receiver considers invalid.
+	/// In such case the entire subgraph of transactions that depend on the reported one will be
+	/// skipped.
+	pub fn get(&self) -> BestIterator<Hash, Ex> {
 		BestIterator {
 			all: self.ready.clone(),
 			best: self.best.clone(),
 			awaiting: Default::default(),
+			invalid: Default::default(),
 		}
 	}
 
@@ -482,6 +487,7 @@ pub struct BestIterator<Hash, Ex> {
 	all: ReadOnlyTrackedMap<Hash, ReadyTx<Hash, Ex>>,
 	awaiting: HashMap<Hash, (usize, TransactionRef<Hash, Ex>)>,
 	best: BTreeSet<TransactionRef<Hash, Ex>>,
+	invalid: HashSet<Hash>,
 }
 
 impl<Hash: hash::Hash + Member, Ex> BestIterator<Hash, Ex> {
@@ -498,6 +504,34 @@ impl<Hash: hash::Hash + Member, Ex> BestIterator<Hash, Ex> {
 	}
 }
 
+impl<Hash: hash::Hash + Member, Ex> sc_transaction_pool_api::ReadyTransactions
+	for BestIterator<Hash, Ex>
+{
+	fn report_invalid(&mut self, tx: &Self::Item) {
+		BestIterator::report_invalid(self, tx)
+	}
+}
+
+impl<Hash: hash::Hash + Member, Ex> BestIterator<Hash, Ex> {
+	/// Report given transaction as invalid.
+	///
+	/// As a consequence, all values that depend on the invalid one will be skipped.
+	/// When given transaction is not in the pool it has no effect.
+	/// When invoked on a fully drained iterator it has no effect either.
+	pub fn report_invalid(&mut self, tx: &Arc<Transaction<Hash, Ex>>) {
+		if let Some(to_report) = self.all.read().get(&tx.hash) {
+			debug!(
+				target: "txpool",
+				"[{:?}] Reported as invalid. Will skip sub-chains while iterating.",
+				to_report.transaction.transaction.hash
+			);
+			for hash in &to_report.unlocks {
+				self.invalid.insert(hash.clone());
+			}
+		}
+	}
+}
+
 impl<Hash: hash::Hash + Member, Ex> Iterator for BestIterator<Hash, Ex> {
 	type Item = Arc<Transaction<Hash, Ex>>;
 
@@ -505,8 +539,19 @@ impl<Hash: hash::Hash + Member, Ex> Iterator for BestIterator<Hash, Ex> {
 		loop {
 			let best = self.best.iter().next_back()?.clone();
 			let best = self.best.take(&best)?;
+			let hash = &best.transaction.hash;
+
+			// Check if the transaction was marked invalid.
+			if self.invalid.contains(hash) {
+				debug!(
+					target: "txpool",
+					"[{:?}] Skipping invalid child transaction while iterating.",
+					hash
+				);
+				continue
+			}
 
-			let next = self.all.read().get(&best.transaction.hash).cloned();
+			let next = self.all.read().get(hash).cloned();
 			let ready = match next {
 				Some(ready) => ready,
 				// The transaction is not in all, maybe it was removed in the meantime?
@@ -635,10 +680,13 @@ mod tests {
 		assert_eq!(ready.get().count(), 3);
 	}
 
-	#[test]
-	fn should_return_best_transactions_in_correct_order() {
-		// given
-		let mut ready = ReadyTransactions::default();
+	/// Populate the pool, with a graph that looks like so:
+	///
+	/// tx1 -> tx2 \
+	///     ->  ->  tx3
+	///     -> tx4 -> tx5 -> tx6
+	///     -> tx7
+	fn populate_pool(ready: &mut ReadyTransactions<u64, Vec<u8>>) {
 		let mut tx1 = tx(1);
 		tx1.requires.clear();
 		let mut tx2 = tx(2);
@@ -649,11 +697,17 @@ mod tests {
 		tx3.provides = vec![];
 		let mut tx4 = tx(4);
 		tx4.requires = vec![tx1.provides[0].clone()];
-		tx4.provides = vec![];
-		let tx5 = Transaction {
-			data: vec![5],
+		tx4.provides = vec![vec![107]];
+		let mut tx5 = tx(5);
+		tx5.requires = vec![tx4.provides[0].clone()];
+		tx5.provides = vec![vec![108]];
+		let mut tx6 = tx(6);
+		tx6.requires = vec![tx5.provides[0].clone()];
+		tx6.provides = vec![];
+		let tx7 = Transaction {
+			data: vec![7],
 			bytes: 1,
-			hash: 5,
+			hash: 7,
 			priority: 1,
 			valid_till: u64::MAX, // use the max here for testing.
 			requires: vec![tx1.provides[0].clone()],
@@ -663,20 +717,30 @@ mod tests {
 		};
 
 		// when
-		for tx in vec![tx1, tx2, tx3, tx4, tx5] {
-			import(&mut ready, tx).unwrap();
+		for tx in vec![tx1, tx2, tx3, tx7, tx4, tx5, tx6] {
+			import(ready, tx).unwrap();
 		}
 
-		// then
 		assert_eq!(ready.best.len(), 1);
+	}
+
+	#[test]
+	fn should_return_best_transactions_in_correct_order() {
+		// given
+		let mut ready = ReadyTransactions::default();
+		populate_pool(&mut ready);
 
+		// when
 		let mut it = ready.get().map(|tx| tx.data[0]);
 
+		// then
 		assert_eq!(it.next(), Some(1));
 		assert_eq!(it.next(), Some(2));
 		assert_eq!(it.next(), Some(3));
 		assert_eq!(it.next(), Some(4));
 		assert_eq!(it.next(), Some(5));
+		assert_eq!(it.next(), Some(6));
+		assert_eq!(it.next(), Some(7));
 		assert_eq!(it.next(), None);
 	}
 
@@ -725,4 +789,26 @@ mod tests {
 				TransactionRef { transaction: Arc::new(with_priority(3, 3)), insertion_id: 2 }
 		);
 	}
+
+	#[test]
+	fn should_skip_invalid_transactions_while_iterating() {
+		// given
+		let mut ready = ReadyTransactions::default();
+		populate_pool(&mut ready);
+
+		// when
+		let mut it = ready.get();
+		let data = |tx: &Arc<Transaction<u64, Vec<u8>>>| tx.data[0];
+
+		// then
+		assert_eq!(it.next().as_ref().map(data), Some(1));
+		assert_eq!(it.next().as_ref().map(data), Some(2));
+		assert_eq!(it.next().as_ref().map(data), Some(3));
+		let tx4 = it.next();
+		assert_eq!(tx4.as_ref().map(data), Some(4));
+		// report 4 as invalid, which should skip 5 & 6.
+		it.report_invalid(&tx4.unwrap());
+		assert_eq!(it.next().as_ref().map(data), Some(7));
+		assert_eq!(it.next().as_ref().map(data), None);
+	}
 }
diff --git a/substrate/client/transaction-pool/src/graph/validated_pool.rs b/substrate/client/transaction-pool/src/graph/validated_pool.rs
index e4aad7f342b..dba586adc84 100644
--- a/substrate/client/transaction-pool/src/graph/validated_pool.rs
+++ b/substrate/client/transaction-pool/src/graph/validated_pool.rs
@@ -25,7 +25,7 @@ use std::{
 use futures::channel::mpsc::{channel, Sender};
 use parking_lot::{Mutex, RwLock};
 use retain_mut::RetainMut;
-use sc_transaction_pool_api::{error, PoolStatus};
+use sc_transaction_pool_api::{error, PoolStatus, ReadyTransactions};
 use serde::Serialize;
 use sp_runtime::{
 	generic::BlockId,
@@ -630,7 +630,7 @@ impl<B: ChainApi> ValidatedPool<B> {
 	}
 
 	/// Get an iterator for ready transactions ordered by priority
-	pub fn ready(&self) -> impl Iterator<Item = TransactionFor<B>> + Send {
+	pub fn ready(&self) -> impl ReadyTransactions<Item = TransactionFor<B>> + Send {
 		self.pool.read().ready()
 	}
 
diff --git a/substrate/client/transaction-pool/src/lib.rs b/substrate/client/transaction-pool/src/lib.rs
index 6eb5bd2f332..4d355df22d8 100644
--- a/substrate/client/transaction-pool/src/lib.rs
+++ b/substrate/client/transaction-pool/src/lib.rs
@@ -56,7 +56,8 @@ use std::{
 use graph::{ExtrinsicHash, IsValidator};
 use sc_transaction_pool_api::{
 	ChainEvent, ImportNotificationStream, MaintainedTransactionPool, PoolFuture, PoolStatus,
-	TransactionFor, TransactionPool, TransactionSource, TransactionStatusStreamFor, TxHash,
+	ReadyTransactions, TransactionFor, TransactionPool, TransactionSource,
+	TransactionStatusStreamFor, TxHash,
 };
 use sp_core::traits::SpawnEssentialNamed;
 use sp_runtime::{
@@ -69,7 +70,7 @@ use crate::metrics::MetricsLink as PrometheusMetrics;
 use prometheus_endpoint::Registry as PrometheusRegistry;
 
 type BoxedReadyIterator<Hash, Data> =
-	Box<dyn Iterator<Item = Arc<graph::base_pool::Transaction<Hash, Data>>> + Send>;
+	Box<dyn ReadyTransactions<Item = Arc<graph::base_pool::Transaction<Hash, Data>>> + Send>;
 
 type ReadyIteratorFor<PoolApi> =
 	BoxedReadyIterator<graph::ExtrinsicHash<PoolApi>, graph::ExtrinsicFor<PoolApi>>;
-- 
GitLab