From c7069de04439ddeb358916da43b0106abb9ac12b Mon Sep 17 00:00:00 2001
From: Robert Habermeier <rphmeier@gmail.com>
Date: Wed, 15 Jan 2020 21:09:27 +0100
Subject: [PATCH] Make Proposer instantiation potentially async. (#4630)

* Make Proposer instantiation potentially async.

* fix node-service test

* fix basic-authority doc-test

* only block once on futures in test

* use async/await
---
 substrate/bin/node/cli/src/service.rs         | 16 ++++++-----
 .../basic-authorship/src/basic_authorship.rs  | 14 ++++++----
 substrate/client/basic-authorship/src/lib.rs  |  7 +++--
 substrate/client/consensus/aura/src/lib.rs    | 14 ++++++----
 substrate/client/consensus/babe/src/lib.rs    |  9 ++++--
 substrate/client/consensus/babe/src/tests.rs  |  9 +++---
 substrate/client/consensus/pow/src/lib.rs     |  2 +-
 substrate/client/consensus/slots/src/lib.rs   | 28 ++++++++++---------
 .../primitives/consensus/common/src/lib.rs    |  7 +++--
 9 files changed, 63 insertions(+), 43 deletions(-)

diff --git a/substrate/bin/node/cli/src/service.rs b/substrate/bin/node/cli/src/service.rs
index 2f53fb7637d..b1c1759ae8f 100644
--- a/substrate/bin/node/cli/src/service.rs
+++ b/substrate/bin/node/cli/src/service.rs
@@ -535,13 +535,15 @@ mod tests {
 
 				digest.push(<DigestItem as CompatibleDigestItem>::babe_pre_digest(babe_pre_digest));
 
-				let mut proposer = proposer_factory.init(&parent_header).unwrap();
-				let new_block = futures::executor::block_on(proposer.propose(
-					inherent_data,
-					digest,
-					std::time::Duration::from_secs(1),
-					RecordProof::Yes,
-				)).expect("Error making test block").block;
+				let new_block = futures::executor::block_on(async move {
+					let proposer = proposer_factory.init(&parent_header).await;
+					proposer.unwrap().propose(
+						inherent_data,
+						digest,
+						std::time::Duration::from_secs(1),
+						RecordProof::Yes,
+					).await
+				}).expect("Error making test block").block;
 
 				let (new_header, new_body) = new_block.deconstruct();
 				let pre_hash = new_header.hash();
diff --git a/substrate/client/basic-authorship/src/basic_authorship.rs b/substrate/client/basic-authorship/src/basic_authorship.rs
index 543428b0734..576009ef274 100644
--- a/substrate/client/basic-authorship/src/basic_authorship.rs
+++ b/substrate/client/basic-authorship/src/basic_authorship.rs
@@ -34,6 +34,7 @@ use sp_transaction_pool::{TransactionPool, InPoolTransaction};
 use sc_telemetry::{telemetry, CONSENSUS_INFO};
 use sc_block_builder::BlockBuilderApi;
 use sp_api::{ProvideRuntimeApi, ApiExt};
+use futures::prelude::*;
 
 /// Proposer factory.
 pub struct ProposerFactory<C, A> where A: TransactionPool {
@@ -59,7 +60,7 @@ impl<B, E, Block, RA, A> ProposerFactory<SubstrateClient<B, E, Block, RA>, A>
 		&mut self,
 		parent_header: &<Block as BlockT>::Header,
 		now: Box<dyn Fn() -> time::Instant + Send + Sync>,
-	) -> Result<Proposer<Block, SubstrateClient<B, E, Block, RA>, A>, sp_blockchain::Error> {
+	) -> Proposer<Block, SubstrateClient<B, E, Block, RA>, A> {
 		let parent_hash = parent_header.hash();
 
 		let id = BlockId::hash(parent_hash);
@@ -77,7 +78,7 @@ impl<B, E, Block, RA, A> ProposerFactory<SubstrateClient<B, E, Block, RA>, A>
 			}),
 		};
 
-		Ok(proposer)
+		proposer
 	}
 }
 
@@ -94,14 +95,15 @@ impl<B, E, Block, RA, A> sp_consensus::Environment<Block> for
 				BlockBuilderApi<Block, Error = sp_blockchain::Error> +
 				ApiExt<Block, StateBackend = backend::StateBackendFor<B, Block>>,
 {
+	type CreateProposer = future::Ready<Result<Self::Proposer, Self::Error>>;
 	type Proposer = Proposer<Block, SubstrateClient<B, E, Block, RA>, A>;
 	type Error = sp_blockchain::Error;
 
 	fn init(
 		&mut self,
 		parent_header: &<Block as BlockT>::Header,
-	) -> Result<Self::Proposer, sp_blockchain::Error> {
-		self.init_with_now(parent_header, Box::new(time::Instant::now))
+	) -> Self::CreateProposer {
+		future::ready(Ok(self.init_with_now(parent_header, Box::new(time::Instant::now))))
 	}
 }
 
@@ -324,7 +326,7 @@ mod tests {
 				*value = new;
 				old
 			})
-		).unwrap();
+		);
 
 		// when
 		let deadline = time::Duration::from_secs(3);
@@ -359,7 +361,7 @@ mod tests {
 		let mut proposer = proposer_factory.init_with_now(
 			&client.header(&block_id).unwrap().unwrap(),
 			Box::new(move || time::Instant::now()),
-		).unwrap();
+		);
 
 		let deadline = time::Duration::from_secs(9);
 		let proposal = futures::executor::block_on(
diff --git a/substrate/client/basic-authorship/src/lib.rs b/substrate/client/basic-authorship/src/lib.rs
index cf77c8a3f34..5465768e83c 100644
--- a/substrate/client/basic-authorship/src/lib.rs
+++ b/substrate/client/basic-authorship/src/lib.rs
@@ -34,9 +34,12 @@
 //! };
 //!
 //! // From this factory, we create a `Proposer`.
-//! let mut proposer = proposer_factory.init(
+//! let proposer = proposer_factory.init(
 //! 	&client.header(&BlockId::number(0)).unwrap().unwrap(),
-//! ).unwrap();
+//! );
+//!
+//! // The proposer is created asynchronously.
+//! let mut proposer = futures::executor::block_on(proposer).unwrap();
 //!
 //! // This `Proposer` allows us to create a block proposition.
 //! // The proposer will grab transactions from the transaction pool, and put them into the block.
diff --git a/substrate/client/consensus/aura/src/lib.rs b/substrate/client/consensus/aura/src/lib.rs
index c4983797213..61568cea8b6 100644
--- a/substrate/client/consensus/aura/src/lib.rs
+++ b/substrate/client/consensus/aura/src/lib.rs
@@ -217,6 +217,9 @@ impl<B, C, E, I, P, Error, SO> sc_consensus_slots::SimpleSlotWorker<B> for AuraW
 {
 	type BlockImport = I;
 	type SyncOracle = SO;
+	type CreateProposer = Pin<Box<
+		dyn Future<Output = Result<E::Proposer, sp_consensus::Error>> + Send + 'static
+	>>;
 	type Proposer = E::Proposer;
 	type Claim = P;
 	type EpochData = Vec<AuthorityId<P>>;
@@ -302,10 +305,10 @@ impl<B, C, E, I, P, Error, SO> sc_consensus_slots::SimpleSlotWorker<B> for AuraW
 		&mut self.sync_oracle
 	}
 
-	fn proposer(&mut self, block: &B::Header) -> Result<Self::Proposer, sp_consensus::Error> {
-		self.env.init(block).map_err(|e| {
+	fn proposer(&mut self, block: &B::Header) -> Self::CreateProposer {
+		Box::pin(self.env.init(block).map_err(|e| {
 			sp_consensus::Error::ClientImport(format!("{:?}", e)).into()
-		})
+		}))
 	}
 
 	fn proposing_remaining_duration(
@@ -874,12 +877,13 @@ mod tests {
 
 	impl Environment<TestBlock> for DummyFactory {
 		type Proposer = DummyProposer;
+		type CreateProposer = futures::future::Ready<Result<DummyProposer, Error>>;
 		type Error = Error;
 
 		fn init(&mut self, parent_header: &<TestBlock as BlockT>::Header)
-			-> Result<DummyProposer, Error>
+			-> Self::CreateProposer
 		{
-			Ok(DummyProposer(parent_header.number + 1, self.0.clone()))
+			futures::future::ready(Ok(DummyProposer(parent_header.number + 1, self.0.clone())))
 		}
 	}
 
diff --git a/substrate/client/consensus/babe/src/lib.rs b/substrate/client/consensus/babe/src/lib.rs
index 4eb1e3915b6..770f8a4eec6 100644
--- a/substrate/client/consensus/babe/src/lib.rs
+++ b/substrate/client/consensus/babe/src/lib.rs
@@ -363,6 +363,9 @@ impl<B, C, E, I, Error, SO> sc_consensus_slots::SimpleSlotWorker<B> for BabeWork
 	type EpochData = Epoch;
 	type Claim = (BabePreDigest, AuthorityPair);
 	type SyncOracle = SO;
+	type CreateProposer = Pin<Box<
+		dyn Future<Output = Result<E::Proposer, sp_consensus::Error>> + Send + 'static
+	>>;
 	type Proposer = E::Proposer;
 	type BlockImport = I;
 
@@ -466,10 +469,10 @@ impl<B, C, E, I, Error, SO> sc_consensus_slots::SimpleSlotWorker<B> for BabeWork
 		&mut self.sync_oracle
 	}
 
-	fn proposer(&mut self, block: &B::Header) -> Result<Self::Proposer, sp_consensus::Error> {
-		self.env.init(block).map_err(|e| {
+	fn proposer(&mut self, block: &B::Header) -> Self::CreateProposer {
+		Box::pin(self.env.init(block).map_err(|e| {
 			sp_consensus::Error::ClientImport(format!("{:?}", e))
-		})
+		}))
 	}
 
 	fn proposing_remaining_duration(
diff --git a/substrate/client/consensus/babe/src/tests.rs b/substrate/client/consensus/babe/src/tests.rs
index 305c8939826..880e028000b 100644
--- a/substrate/client/consensus/babe/src/tests.rs
+++ b/substrate/client/consensus/babe/src/tests.rs
@@ -72,23 +72,24 @@ struct DummyProposer {
 }
 
 impl Environment<TestBlock> for DummyFactory {
+	type CreateProposer = future::Ready<Result<DummyProposer, Error>>;
 	type Proposer = DummyProposer;
 	type Error = Error;
 
 	fn init(&mut self, parent_header: &<TestBlock as BlockT>::Header)
-		-> Result<DummyProposer, Error>
+		-> Self::CreateProposer
 	{
 
 		let parent_slot = crate::find_pre_digest::<TestBlock>(parent_header)
 			.expect("parent header has a pre-digest")
 			.slot_number();
 
-		Ok(DummyProposer {
+		future::ready(Ok(DummyProposer {
 			factory: self.clone(),
 			parent_hash: parent_header.hash(),
 			parent_number: *parent_header.number(),
 			parent_slot,
-		})
+		}))
 	}
 }
 
@@ -547,7 +548,7 @@ fn propose_and_import_block<Transaction>(
 	proposer_factory: &mut DummyFactory,
 	block_import: &mut BoxBlockImport<TestBlock, Transaction>,
 ) -> sp_core::H256 {
-	let mut proposer = proposer_factory.init(parent).unwrap();
+	let mut proposer = futures::executor::block_on(proposer_factory.init(parent)).unwrap();
 
 	let slot_number = slot_number.unwrap_or_else(|| {
 		let parent_pre_digest = find_pre_digest::<TestBlock>(parent).unwrap();
diff --git a/substrate/client/consensus/pow/src/lib.rs b/substrate/client/consensus/pow/src/lib.rs
index 5c491057e57..2ea0cbdf0f7 100644
--- a/substrate/client/consensus/pow/src/lib.rs
+++ b/substrate/client/consensus/pow/src/lib.rs
@@ -486,7 +486,7 @@ fn mine_loop<B: BlockT, C, Algorithm, E, SO, S, CAW>(
 		}
 
 		let mut aux = PowAux::read(client, &best_hash)?;
-		let mut proposer = env.init(&best_header)
+		let mut proposer = futures::executor::block_on(env.init(&best_header))
 			.map_err(|e| Error::Environment(format!("{:?}", e)))?;
 
 		let inherent_data = inherent_data_providers
diff --git a/substrate/client/consensus/slots/src/lib.rs b/substrate/client/consensus/slots/src/lib.rs
index a69c710a7e9..3aa243af72b 100644
--- a/substrate/client/consensus/slots/src/lib.rs
+++ b/substrate/client/consensus/slots/src/lib.rs
@@ -70,6 +70,10 @@ pub trait SimpleSlotWorker<B: BlockT> {
 	/// A handle to a `SyncOracle`.
 	type SyncOracle: SyncOracle;
 
+	/// The type of future resolving to the proposer.
+	type CreateProposer: Future<Output = Result<Self::Proposer, sp_consensus::Error>>
+		+ Send + Unpin + 'static;
+
 	/// The type of proposer to use to build blocks.
 	type Proposer: Proposer<B>;
 
@@ -129,7 +133,7 @@ pub trait SimpleSlotWorker<B: BlockT> {
 	fn sync_oracle(&mut self) -> &mut Self::SyncOracle;
 
 	/// Returns a `Proposer` to author on top of the given block.
-	fn proposer(&mut self, block: &B::Header) -> Result<Self::Proposer, sp_consensus::Error>;
+	fn proposer(&mut self, block: &B::Header) -> Self::CreateProposer;
 
 	/// Remaining duration of the slot.
 	fn slot_remaining_duration(&self, slot_info: &SlotInfo) -> Duration {
@@ -216,32 +220,30 @@ pub trait SimpleSlotWorker<B: BlockT> {
 			"timestamp" => timestamp,
 		);
 
-		let mut proposer = match self.proposer(&chain_head) {
-			Ok(proposer) => proposer,
-			Err(err) => {
-				warn!("Unable to author block in slot {:?}: {:?}", slot_number, err);
+		let awaiting_proposer = self.proposer(&chain_head).map_err(move |err| {
+			warn!("Unable to author block in slot {:?}: {:?}", slot_number, err);
 
-				telemetry!(CONSENSUS_WARN; "slots.unable_authoring_block";
-					"slot" => slot_number, "err" => ?err
-				);
+			telemetry!(CONSENSUS_WARN; "slots.unable_authoring_block";
+				"slot" => slot_number, "err" => ?err
+			);
 
-				return Box::pin(future::ready(Ok(())));
-			},
-		};
+			err
+		});
 
 		let slot_remaining_duration = self.slot_remaining_duration(&slot_info);
 		let proposing_remaining_duration = self.proposing_remaining_duration(&chain_head, &slot_info);
 		let logs = self.pre_digest_data(slot_number, &claim);
 
 		// deadline our production to approx. the end of the slot
-		let proposing = proposer.propose(
+		let proposing = awaiting_proposer.and_then(move |mut proposer| proposer.propose(
 			slot_info.inherent_data,
 			sp_runtime::generic::Digest {
 				logs,
 			},
 			slot_remaining_duration,
 			RecordProof::No,
-		).map_err(|e| sp_consensus::Error::ClientImport(format!("{:?}", e)));
+		).map_err(|e| sp_consensus::Error::ClientImport(format!("{:?}", e))));
+
 		let delay: Box<dyn Future<Output=()> + Unpin + Send> = match proposing_remaining_duration {
 			Some(r) => Box::new(Delay::new(r)),
 			None => Box::new(future::pending()),
diff --git a/substrate/primitives/consensus/common/src/lib.rs b/substrate/primitives/consensus/common/src/lib.rs
index 1b98ede3768..4927faede06 100644
--- a/substrate/primitives/consensus/common/src/lib.rs
+++ b/substrate/primitives/consensus/common/src/lib.rs
@@ -74,13 +74,16 @@ pub enum BlockStatus {
 /// Environment producer for a Consensus instance. Creates proposer instance and communication streams.
 pub trait Environment<B: BlockT> {
 	/// The proposer type this creates.
-	type Proposer: Proposer<B> + 'static;
+	type Proposer: Proposer<B> + Send + 'static;
+	/// A future that resolves to the proposer.
+	type CreateProposer: Future<Output = Result<Self::Proposer, Self::Error>>
+		+ Send + Unpin + 'static;
 	/// Error which can occur upon creation.
 	type Error: From<Error> + std::fmt::Debug + 'static;
 
 	/// Initialize the proposal logic on top of a specific header. Provide
 	/// the authorities at that header.
-	fn init(&mut self, parent_header: &B::Header) -> Result<Self::Proposer, Self::Error>;
+	fn init(&mut self, parent_header: &B::Header) -> Self::CreateProposer;
 }
 
 /// A proposal that is created by a [`Proposer`].
-- 
GitLab