From 1614ce0996bfb367545d3058cf8f1bf37dfe2753 Mon Sep 17 00:00:00 2001
From: Nikolay Volf <nikvolf@gmail.com>
Date: Fri, 24 Jan 2020 05:22:39 -0800
Subject: [PATCH] Async/await in transaction-graph (#4645)

* async/await in tx graph

* review notes

* remove unused typedef
---
 .../client/transaction-pool/graph/src/pool.rs | 229 ++++++++----------
 substrate/client/transaction-pool/src/lib.rs  |  34 ++-
 .../primitives/transaction-pool/src/pool.rs   |  37 +--
 3 files changed, 144 insertions(+), 156 deletions(-)

diff --git a/substrate/client/transaction-pool/graph/src/pool.rs b/substrate/client/transaction-pool/graph/src/pool.rs
index 5be879f079a..d0f14ad40ad 100644
--- a/substrate/client/transaction-pool/graph/src/pool.rs
+++ b/substrate/client/transaction-pool/graph/src/pool.rs
@@ -27,7 +27,6 @@ use serde::Serialize;
 use futures::{
 	Future, FutureExt,
 	channel::mpsc,
-	future::{Either, ready, join_all},
 };
 use sp_runtime::{
 	generic::BlockId,
@@ -132,8 +131,8 @@ impl<B: ChainApi> Pool<B> {
 	}
 
 	/// Imports a bunch of unverified extrinsics to the pool
-	pub fn submit_at<T>(&self, at: &BlockId<B::Block>, xts: T, force: bool)
-		-> impl Future<Output=Result<Vec<Result<ExHash<B>, B::Error>>, B::Error>>
+	pub async fn submit_at<T>(&self, at: &BlockId<B::Block>, xts: T, force: bool)
+		-> Result<Vec<Result<ExHash<B>, B::Error>>, B::Error>
 	where
 		T: IntoIterator<Item=ExtrinsicFor<B>>
 	{
@@ -143,48 +142,43 @@ impl<B: ChainApi> Pool<B> {
 				.map(|validated_transactions| validated_pool.submit(validated_transactions
 					.into_iter()
 					.map(|(_, tx)| tx))))
+			.await
 	}
 
 	/// Imports one unverified extrinsic to the pool
-	pub fn submit_one(
+	pub async fn submit_one(
 		&self,
 		at: &BlockId<B::Block>,
 		xt: ExtrinsicFor<B>,
-	) -> impl Future<Output=Result<ExHash<B>, B::Error>> {
+	) -> Result<ExHash<B>, B::Error> {
 		self.submit_at(at, std::iter::once(xt), false)
 			.map(|import_result| import_result.and_then(|mut import_result| import_result
 				.pop()
 				.expect("One extrinsic passed; one result returned; qed")
 			))
+			.await
 	}
 
 	/// Import a single extrinsic and starts to watch their progress in the pool.
-	pub fn submit_and_watch(
+	pub async fn submit_and_watch(
 		&self,
 		at: &BlockId<B::Block>,
 		xt: ExtrinsicFor<B>,
-	) -> impl Future<Output=Result<Watcher<ExHash<B>, BlockHash<B>>, B::Error>> {
-		let block_number = match self.resolve_block_number(at) {
-			Ok(block_number) => block_number,
-			Err(err) => return Either::Left(ready(Err(err)))
-		};
-
-		let validated_pool = self.validated_pool.clone();
-		Either::Right(
-			self.verify_one(at, block_number, xt, false)
-				.map(move |validated_transactions| validated_pool.submit_and_watch(validated_transactions.1))
-		)
+	) -> Result<Watcher<ExHash<B>, BlockHash<B>>, B::Error> {
+		let block_number = self.resolve_block_number(at)?;
+		let (_, tx) = self.verify_one(at, block_number, xt, false).await;
+		self.validated_pool.submit_and_watch(tx)
 	}
 
 	/// Revalidate all ready transactions.
 	///
 	/// Returns future that performs validation of all ready transactions and
 	/// then resubmits all transactions back to the pool.
-	pub fn revalidate_ready(
+	pub async fn revalidate_ready(
 		&self,
 		at: &BlockId<B::Block>,
 		max: Option<usize>,
-	) -> impl Future<Output=Result<(), B::Error>> {
+	) -> Result<(), B::Error> {
 		use std::time::Instant;
 		log::debug!(target: "txpool",
 			"Fetching ready transactions (up to: {})",
@@ -196,23 +190,20 @@ impl<B: ChainApi> Pool<B> {
 			.take(max.unwrap_or_else(usize::max_value));
 
 		let now = Instant::now();
-		self.verify(at, ready, false)
-			.map(move |revalidated_transactions| {
-				log::debug!(target: "txpool",
-					"Re-verified transactions, took {} ms. Resubmitting.",
-					now.elapsed().as_millis()
-				);
-				let now = Instant::now();
-				let res = revalidated_transactions.map(
-					|revalidated_transactions| validated_pool.resubmit(revalidated_transactions)
-				);
-				log::debug!(target: "txpool",
-					"Resubmitted. Took {} ms. Status: {:?}",
-					now.elapsed().as_millis(),
-					validated_pool.status()
-				);
-				res
-			})
+		let revalidated_transactions = self.verify(at, ready, false).await?;
+		log::debug!(target: "txpool",
+			"Re-verified transactions, took {} ms. Resubmitting.",
+			now.elapsed().as_millis()
+		);
+
+		let now = Instant::now();
+		self.validated_pool.resubmit(revalidated_transactions);
+		log::debug!(target: "txpool",
+			"Resubmitted. Took {} ms. Status: {:?}",
+			now.elapsed().as_millis(),
+			validated_pool.status()
+		);
+		Ok(())
 	}
 
 	/// Prunes known ready transactions.
@@ -238,12 +229,12 @@ impl<B: ChainApi> Pool<B> {
 	/// To perform pruning we need the tags that each extrinsic provides and to avoid calling
 	/// into runtime too often we first lookup all extrinsics that are in the pool and get
 	/// their provided tags from there. Otherwise we query the runtime at the `parent` block.
-	pub fn prune(
+	pub async fn prune(
 		&self,
 		at: &BlockId<B::Block>,
 		parent: &BlockId<B::Block>,
 		extrinsics: &[ExtrinsicFor<B>],
-	) -> impl Future<Output=Result<(), B::Error>> {
+	) -> Result<(), B::Error> {
 		log::debug!(
 			target: "txpool",
 			"Starting pruning of block {:?} (extrinsics: {})",
@@ -257,34 +248,26 @@ impl<B: ChainApi> Pool<B> {
 		// Zip the ones from the pool with the full list (we get pairs `(Extrinsic, Option<Vec<Tag>>)`)
 		let all = extrinsics.iter().zip(in_pool_tags.into_iter());
 
-		// Prepare future that collect tags for all extrinsics
-		let future_tags = join_all(all
-			.map(|(extrinsic, in_pool_tags)|
-				match in_pool_tags {
-					// reuse the tags for extrinsics that were found in the pool
-					Some(tags) => Either::Left(
-						ready(tags)
-					),
-					// if it's not found in the pool query the runtime at parent block
-					// to get validity info and tags that the extrinsic provides.
-					None => Either::Right(self.validated_pool.api().validate_transaction(parent, extrinsic.clone())
-						.then(|validity| ready(match validity {
-							Ok(Ok(validity)) => validity.provides,
-							// silently ignore invalid extrinsics,
-							// cause they might just be inherent
-							_ => Vec::new(),
-						}))),
-				}
-			));
+		let mut future_tags = Vec::new();
+		for (extrinsic, in_pool_tags) in all {
+			match in_pool_tags {
+				// reuse the tags for extrinsics that were found in the pool
+				Some(tags) => future_tags.extend(tags),
+				// if it's not found in the pool query the runtime at parent block
+				// to get validity info and tags that the extrinsic provides.
+				None => {
+					let validity = self.validated_pool.api()
+						.validate_transaction(parent, extrinsic.clone())
+						.await;
+
+					if let Ok(Ok(validity)) = validity {
+						future_tags.extend(validity.provides);
+					}
+				},
+			}
+		}
 
-		// Prune transactions by tags
-		let at = at.clone();
-		let self_clone = self.clone();
-		future_tags.then(move |tags| self_clone.prune_tags(
-			&at,
-			tags.into_iter().flat_map(|tags| tags),
-			in_pool_hashes,
-		))
+		self.prune_tags(at, future_tags, in_pool_hashes).await
 	}
 
 	/// Prunes ready transactions that provide given list of tags.
@@ -308,17 +291,17 @@ impl<B: ChainApi> Pool<B> {
 	/// the second parameter of `known_imported_hashes`. These transactions
 	/// (if pruned) are not revalidated and become temporarily banned to
 	/// prevent importing them in the (near) future.
-	pub fn prune_tags(
+	pub async fn prune_tags(
 		&self,
 		at: &BlockId<B::Block>,
 		tags: impl IntoIterator<Item=Tag>,
 		known_imported_hashes: impl IntoIterator<Item=ExHash<B>> + Clone,
-	) -> impl Future<Output=Result<(), B::Error>> {
+	) -> Result<(), B::Error> {
 		log::debug!(target: "txpool", "Pruning at {:?}", at);
 		// Prune all transactions that provide given tags
 		let prune_status = match self.validated_pool.prune_tags(tags) {
 			Ok(prune_status) => prune_status,
-			Err(e) => return Either::Left(ready(Err(e))),
+			Err(e) => return Err(e),
 		};
 
 		// Make sure that we don't revalidate extrinsics that were part of the recently
@@ -330,21 +313,18 @@ impl<B: ChainApi> Pool<B> {
 		// note that `known_imported_hashes` will be rejected here due to temporary ban.
 		let pruned_hashes = prune_status.pruned.iter().map(|tx| tx.hash.clone()).collect::<Vec<_>>();
 		let pruned_transactions = prune_status.pruned.into_iter().map(|tx| tx.data.clone());
-		let reverify_future = self.verify(at, pruned_transactions, false);
+
+		let reverified_transactions = self.verify(at, pruned_transactions, false).await?;
 
 		log::trace!(target: "txpool", "Prunning at {:?}. Resubmitting transactions.", at);
 		// And finally - submit reverified transactions back to the pool
-		let at = at.clone();
-		let validated_pool = self.validated_pool.clone();
-		Either::Right(reverify_future.then(move |reverified_transactions|
-			ready(reverified_transactions.and_then(|reverified_transactions|
-				validated_pool.resubmit_pruned(
-					&at,
-					known_imported_hashes,
-					pruned_hashes,
-					reverified_transactions.into_iter().map(|(_, xt)| xt).collect(),
-				))
-			)))
+
+		self.validated_pool.resubmit_pruned(
+			&at,
+			known_imported_hashes,
+			pruned_hashes,
+			reverified_transactions.into_iter().map(|(_, xt)| xt).collect(),
+		)
 	}
 
 	/// Return an event stream of notifications for when transactions are imported to the pool.
@@ -388,69 +368,74 @@ impl<B: ChainApi> Pool<B> {
 	}
 
 	/// Returns future that validates a bunch of transactions at given block.
-	fn verify(
+	async fn verify(
 		&self,
 		at: &BlockId<B::Block>,
 		xts: impl IntoIterator<Item=ExtrinsicFor<B>>,
 		force: bool,
-	) -> impl Future<Output=Result<HashMap<ExHash<B>, ValidatedTransactionFor<B>>, B::Error>> {
+	) -> Result<HashMap<ExHash<B>, ValidatedTransactionFor<B>>, B::Error> {
 		// we need a block number to compute tx validity
-		let block_number = match self.resolve_block_number(at) {
-			Ok(block_number) => block_number,
-			Err(err) => return Either::Left(ready(Err(err))),
-		};
+		let block_number = self.resolve_block_number(at)?;
+		let mut result = HashMap::new();
 
-		// for each xt, prepare a validation future
-		let validation_futures = xts.into_iter().map(move |xt|
-			self.verify_one(at, block_number, xt, force)
-		);
+		for xt in xts {
+			let (hash, validated_tx) = self.verify_one(at, block_number, xt, force).await;
+			result.insert(hash, validated_tx);
+		}
 
-		// make single validation future that waits all until all extrinsics are validated
-		Either::Right(join_all(validation_futures).then(|x| ready(Ok(x.into_iter().collect()))))
+		Ok(result)
 	}
 
 	/// Returns future that validates single transaction at given block.
-	fn verify_one(
+	async fn verify_one(
 		&self,
 		block_id: &BlockId<B::Block>,
 		block_number: NumberFor<B>,
 		xt: ExtrinsicFor<B>,
 		force: bool,
-	) -> impl Future<Output=(ExHash<B>, ValidatedTransactionFor<B>)> {
+	) -> (ExHash<B>, ValidatedTransactionFor<B>) {
 		let (hash, bytes) = self.validated_pool.api().hash_and_length(&xt);
 		if !force && self.validated_pool.is_banned(&hash) {
-			return Either::Left(ready((
+			return (
 				hash.clone(),
 				ValidatedTransaction::Invalid(hash, error::Error::TemporarilyBanned.into()),
-			)))
+			)
 		}
 
-		Either::Right(self.validated_pool.api().validate_transaction(block_id, xt.clone())
-			.then(move |validation_result| ready((hash.clone(), match validation_result {
-				Ok(validity) => match validity {
-					Ok(validity) => if validity.provides.is_empty() {
-						ValidatedTransaction::Invalid(hash, error::Error::NoTagsProvided.into())
-					} else {
-						ValidatedTransaction::Valid(base::Transaction {
-							data: xt,
-							bytes,
-							hash,
-							priority: validity.priority,
-							requires: validity.requires,
-							provides: validity.provides,
-							propagate: validity.propagate,
-							valid_till: block_number
-								.saturated_into::<u64>()
-								.saturating_add(validity.longevity),
-						})
-					},
-					Err(TransactionValidityError::Invalid(e)) =>
-						ValidatedTransaction::Invalid(hash, error::Error::InvalidTransaction(e).into()),
-					Err(TransactionValidityError::Unknown(e)) =>
-						ValidatedTransaction::Unknown(hash, error::Error::UnknownTransaction(e).into()),
-				},
-				Err(e) => ValidatedTransaction::Invalid(hash, e),
-			}))))
+		let validation_result = self.validated_pool.api().validate_transaction(block_id, xt.clone()).await;
+
+		let status = match validation_result {
+			Ok(status) => status,
+			Err(e) => return (hash.clone(), ValidatedTransaction::Invalid(hash, e)),
+		};
+
+		let validity = match status {
+			Ok(validity) => {
+				if validity.provides.is_empty() {
+					ValidatedTransaction::Invalid(hash.clone(), error::Error::NoTagsProvided.into())
+				} else {
+					ValidatedTransaction::Valid(base::Transaction {
+						data: xt,
+						bytes,
+						hash: hash.clone(),
+						priority: validity.priority,
+						requires: validity.requires,
+						provides: validity.provides,
+						propagate: validity.propagate,
+						valid_till: block_number
+							.saturated_into::<u64>()
+							.saturating_add(validity.longevity),
+					})
+				}
+			},
+
+			Err(TransactionValidityError::Invalid(e)) =>
+				ValidatedTransaction::Invalid(hash.clone(), error::Error::InvalidTransaction(e).into()),
+			Err(TransactionValidityError::Unknown(e)) =>
+				ValidatedTransaction::Unknown(hash.clone(), error::Error::UnknownTransaction(e).into()),
+		};
+
+		(hash, validity)
 	}
 }
 
diff --git a/substrate/client/transaction-pool/src/lib.rs b/substrate/client/transaction-pool/src/lib.rs
index f6f7774935b..b085dba279c 100644
--- a/substrate/client/transaction-pool/src/lib.rs
+++ b/substrate/client/transaction-pool/src/lib.rs
@@ -39,9 +39,11 @@ use sp_runtime::{
 use sp_transaction_pool::{
 	TransactionPool, PoolStatus, ImportNotificationStream,
 	TxHash, TransactionFor, TransactionStatusStreamFor, BlockHash,
-	MaintainedTransactionPool,
+	MaintainedTransactionPool, PoolFuture,
 };
 
+type PoolResult<T> = PoolFuture<T, error::Error>;
+
 /// Basic implementation of transaction pool that can be customized by providing PoolApi.
 pub struct BasicPool<PoolApi, Block>
 	where
@@ -129,28 +131,40 @@ impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
 	fn submit_at(
 		&self,
 		at: &BlockId<Self::Block>,
-		xts: impl IntoIterator<Item=TransactionFor<Self>> + 'static,
-	) -> Box<dyn Future<Output=Result<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error>> + Send + Unpin> {
-		Box::new(self.pool.submit_at(at, xts, false))
+		xts: Vec<TransactionFor<Self>>,
+	) -> PoolResult<Vec<Result<TxHash<Self>, Self::Error>>> {
+		let pool = self.pool.clone();
+		let at = *at;
+		async move {
+			pool.submit_at(&at, xts, false).await
+		}.boxed()
 	}
 
 	fn submit_one(
 		&self,
 		at: &BlockId<Self::Block>,
 		xt: TransactionFor<Self>,
-	) -> Box<dyn Future<Output=Result<TxHash<Self>, Self::Error>> + Send + Unpin> {
-		Box::new(self.pool.submit_one(at, xt))
+	) -> PoolResult<TxHash<Self>> {
+		let pool = self.pool.clone();
+		let at = *at;
+		async move {
+			pool.submit_one(&at, xt).await
+		}.boxed()
 	}
 
 	fn submit_and_watch(
 		&self,
 		at: &BlockId<Self::Block>,
 		xt: TransactionFor<Self>,
-	) -> Box<dyn Future<Output=Result<Box<TransactionStatusStreamFor<Self>>, Self::Error>> + Send + Unpin> {
-		Box::new(
-			self.pool.submit_and_watch(at, xt)
+	) -> PoolResult<Box<TransactionStatusStreamFor<Self>>> {
+		let at = *at;
+		let pool = self.pool.clone();
+
+		async move {
+			pool.submit_and_watch(&at, xt)
 				.map(|result| result.map(|watcher| Box::new(watcher.into_stream()) as _))
-		)
+				.await
+		}.boxed()
 	}
 
 	fn remove_invalid(&self, hashes: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>> {
diff --git a/substrate/primitives/transaction-pool/src/pool.rs b/substrate/primitives/transaction-pool/src/pool.rs
index ed24ad0619a..92a0c4b5c38 100644
--- a/substrate/primitives/transaction-pool/src/pool.rs
+++ b/substrate/primitives/transaction-pool/src/pool.rs
@@ -124,6 +124,9 @@ pub type TransactionFor<P> = <<P as TransactionPool>::Block as BlockT>::Extrinsi
 /// Type of transactions event stream for a pool.
 pub type TransactionStatusStreamFor<P> = TransactionStatusStream<TxHash<P>, BlockHash<P>>;
 
+/// Typical future type used in transaction pool api.
+pub type PoolFuture<T, E> = std::pin::Pin<Box<dyn Future<Output=Result<T, E>> + Send>>;
+
 /// In-pool transaction interface.
 ///
 /// The pool is container of transactions that are implementing this trait.
@@ -170,55 +173,41 @@ pub trait TransactionPool: Send + Sync {
 	fn submit_at(
 		&self,
 		at: &BlockId<Self::Block>,
-		xts: impl IntoIterator<Item=TransactionFor<Self>> + 'static,
-	) -> Box<dyn Future<Output=Result<
-		Vec<Result<TxHash<Self>, Self::Error>>,
-		Self::Error
-	>> + Send + Unpin>;
+		xts: Vec<TransactionFor<Self>>,
+	) -> PoolFuture<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error>;
 
 	/// Returns a future that imports one unverified transaction to the pool.
 	fn submit_one(
 		&self,
 		at: &BlockId<Self::Block>,
 		xt: TransactionFor<Self>,
-	) -> Box<dyn Future<Output=Result<
-		TxHash<Self>,
-		Self::Error
-	>> + Send + Unpin>;
-
-	// RPC
+	) -> PoolFuture<TxHash<Self>, Self::Error>;
 
+	// *** RPC
 	/// Returns a future that import a single transaction and starts to watch their progress in the pool.
 	fn submit_and_watch(
 		&self,
 		at: &BlockId<Self::Block>,
 		xt: TransactionFor<Self>,
-	) -> Box<dyn Future<Output=Result<Box<TransactionStatusStreamFor<Self>>, Self::Error>> + Send + Unpin>;
-
-
-	// Block production / Networking
+	) -> PoolFuture<Box<TransactionStatusStreamFor<Self>>, Self::Error>;
 
+	// *** Block production / Networking
 	/// Get an iterator for ready transactions ordered by priority
 	fn ready(&self) -> Box<dyn Iterator<Item=Arc<Self::InPoolTransaction>>>;
 
-
-	// Block production
-
+	// *** Block production
 	/// Remove transactions identified by given hashes (and dependent transactions) from the pool.
 	fn remove_invalid(&self, hashes: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>>;
 
-	// logging
-
+	// *** logging
 	/// Returns pool status.
 	fn status(&self) -> PoolStatus;
 
-	// logging / RPC / networking
-
+	// *** logging / RPC / networking
 	/// Return an event stream of transactions imported to the pool.
 	fn import_notification_stream(&self) -> ImportNotificationStream;
 
-	// networking
-
+	// *** networking
 	/// Notify the pool about transactions broadcast.
 	fn on_broadcasted(&self, propagations: HashMap<TxHash<Self>, Vec<String>>);
 
-- 
GitLab