From 67bdfc7d8e07a8e2f979c00d0716c873408df2a4 Mon Sep 17 00:00:00 2001
From: Pierre Krieger <pierre.krieger1708@gmail.com>
Date: Wed, 5 Jun 2019 16:31:20 +0200
Subject: [PATCH] Small slots refactor (#2780)

* Deprecate SlotWorker::on_start

* start_slot_worker no longer needs an Arc

* start_slot_worker now always succeeds

* Removed on_exit parameter from start_*_worker

* Minor doc

* Fix node-template
---
 substrate/core/consensus/aura/src/lib.rs  |  29 ++---
 substrate/core/consensus/babe/src/lib.rs  |  31 ++----
 substrate/core/consensus/slots/src/lib.rs | 124 +++++++++-------------
 substrate/node-template/src/service.rs    |   7 +-
 substrate/node/cli/src/service.rs         |   7 +-
 5 files changed, 77 insertions(+), 121 deletions(-)

diff --git a/substrate/core/consensus/aura/src/lib.rs b/substrate/core/consensus/aura/src/lib.rs
index 36561645f07..e3fffad33fb 100644
--- a/substrate/core/consensus/aura/src/lib.rs
+++ b/substrate/core/consensus/aura/src/lib.rs
@@ -65,7 +65,7 @@ use srml_aura::{
 };
 use substrate_telemetry::{telemetry, CONSENSUS_TRACE, CONSENSUS_DEBUG, CONSENSUS_WARN, CONSENSUS_INFO};
 
-use slots::{CheckedHeader, SlotWorker, SlotInfo, SlotCompatible, slot_now, check_equivocation};
+use slots::{CheckedHeader, SlotData, SlotWorker, SlotInfo, SlotCompatible, slot_now, check_equivocation};
 
 pub use aura_primitives::*;
 pub use consensus_common::{SyncOracle, ExtraVerification};
@@ -125,7 +125,7 @@ impl SlotCompatible for AuraSlotCompatible {
 }
 
 /// Start the aura worker. The returned future should be run in a tokio runtime.
-pub fn start_aura<B, C, SC, E, I, P, SO, Error, OnExit, H>(
+pub fn start_aura<B, C, SC, E, I, P, SO, Error, H>(
 	slot_duration: SlotDuration,
 	local_key: Arc<P>,
 	client: Arc<C>,
@@ -133,7 +133,6 @@ pub fn start_aura<B, C, SC, E, I, P, SO, Error, OnExit, H>(
 	block_import: Arc<I>,
 	env: Arc<E>,
 	sync_oracle: SO,
-	on_exit: OnExit,
 	inherent_data_providers: InherentDataProviders,
 	force_authoring: bool,
 ) -> Result<impl Future<Item=(), Error=()>, consensus_common::Error> where
@@ -156,25 +155,26 @@ pub fn start_aura<B, C, SC, E, I, P, SO, Error, OnExit, H>(
 	I: BlockImport<B> + Send + Sync + 'static,
 	Error: ::std::error::Error + Send + From<::consensus_common::Error> + From<I::Error> + 'static,
 	SO: SyncOracle + Send + Sync + Clone,
-	OnExit: Future<Item=(), Error=()>,
 {
 	let worker = AuraWorker {
 		client: client.clone(),
 		block_import,
 		env,
 		local_key,
-		inherent_data_providers: inherent_data_providers.clone(),
 		sync_oracle: sync_oracle.clone(),
 		force_authoring,
 	};
-	slots::start_slot_worker::<_, _, _, _, _, AuraSlotCompatible, _>(
+	register_aura_inherent_data_provider(
+		&inherent_data_providers,
+		slot_duration.0.slot_duration()
+	)?;
+	Ok(slots::start_slot_worker::<_, _, _, _, _, AuraSlotCompatible>(
 		slot_duration.0,
 		select_chain,
-		Arc::new(worker),
+		worker,
 		sync_oracle,
-		on_exit,
 		inherent_data_providers
-	)
+	))
 }
 
 struct AuraWorker<C, E, I, P, SO> {
@@ -183,7 +183,6 @@ struct AuraWorker<C, E, I, P, SO> {
 	env: Arc<E>,
 	local_key: Arc<P>,
 	sync_oracle: SO,
-	inherent_data_providers: InherentDataProviders,
 	force_authoring: bool,
 }
 
@@ -208,13 +207,6 @@ impl<H, B, C, E, I, P, Error, SO> SlotWorker<B> for AuraWorker<C, E, I, P, SO> w
 {
 	type OnSlot = Box<dyn Future<Item=(), Error=consensus_common::Error> + Send>;
 
-	fn on_start(
-		&self,
-		slot_duration: u64
-	) -> Result<(), consensus_common::Error> {
-		register_aura_inherent_data_provider(&self.inherent_data_providers, slot_duration)
-	}
-
 	fn on_slot(
 		&self,
 		chain_head: B::Header,
@@ -902,7 +894,7 @@ mod tests {
 				&inherent_data_providers, slot_duration.get()
 			).expect("Registers aura inherent data provider");
 
-			let aura = start_aura::<_, _, _, _, _, sr25519::Pair, _, _, _, _>(
+			let aura = start_aura::<_, _, _, _, _, sr25519::Pair, _, _, _>(
 				slot_duration,
 				Arc::new(key.clone().into()),
 				client.clone(),
@@ -910,7 +902,6 @@ mod tests {
 				client,
 				environ.clone(),
 				DummyOracle,
-				futures::empty(),
 				inherent_data_providers,
 				false,
 			).expect("Starts aura");
diff --git a/substrate/core/consensus/babe/src/lib.rs b/substrate/core/consensus/babe/src/lib.rs
index 0cb95c02df7..c66eb681377 100644
--- a/substrate/core/consensus/babe/src/lib.rs
+++ b/substrate/core/consensus/babe/src/lib.rs
@@ -82,7 +82,7 @@ use futures::{Future, IntoFuture, future};
 use tokio::timer::Timeout;
 use log::{error, warn, debug, info, trace};
 
-use slots::{SlotWorker, SlotInfo, SlotCompatible, slot_now};
+use slots::{SlotWorker, SlotData, SlotInfo, SlotCompatible, slot_now};
 
 
 /// A slot duration. Create with `get_or_compute`.
@@ -134,7 +134,7 @@ impl SlotCompatible for BabeSlotCompatible {
 }
 
 /// Parameters for BABE.
-pub struct BabeParams<C, E, I, SO, SC, OnExit> {
+pub struct BabeParams<C, E, I, SO, SC> {
 
 	/// The configuration for BABE.  Includes the slot duration, threshold, and
 	/// other parameters.
@@ -158,9 +158,6 @@ pub struct BabeParams<C, E, I, SO, SC, OnExit> {
 	/// A sync oracle
 	pub sync_oracle: SO,
 
-	/// Exit callback.
-	pub on_exit: OnExit,
-
 	/// Providers for inherent data.
 	pub inherent_data_providers: InherentDataProviders,
 
@@ -169,7 +166,7 @@ pub struct BabeParams<C, E, I, SO, SC, OnExit> {
 }
 
 /// Start the babe worker. The returned future should be run in a tokio runtime.
-pub fn start_babe<B, C, SC, E, I, SO, Error, OnExit, H>(BabeParams {
+pub fn start_babe<B, C, SC, E, I, SO, Error, H>(BabeParams {
 	config,
 	local_key,
 	client,
@@ -177,10 +174,9 @@ pub fn start_babe<B, C, SC, E, I, SO, Error, OnExit, H>(BabeParams {
 	block_import,
 	env,
 	sync_oracle,
-	on_exit,
 	inherent_data_providers,
 	force_authoring,
-}: BabeParams<C, E, I, SO, SC, OnExit>) -> Result<
+}: BabeParams<C, E, I, SO, SC>) -> Result<
 	impl Future<Item=(), Error=()>,
 	consensus_common::Error,
 > where
@@ -200,26 +196,24 @@ pub fn start_babe<B, C, SC, E, I, SO, Error, OnExit, H>(BabeParams {
 	I: BlockImport<B> + Send + Sync + 'static,
 	Error: std::error::Error + Send + From<::consensus_common::Error> + From<I::Error> + 'static,
 	SO: SyncOracle + Send + Sync + Clone,
-	OnExit: Future<Item=(), Error=()>,
 {
 	let worker = BabeWorker {
 		client: client.clone(),
 		block_import,
 		env,
 		local_key,
-		inherent_data_providers: inherent_data_providers.clone(),
 		sync_oracle: sync_oracle.clone(),
 		force_authoring,
 		threshold: config.threshold(),
 	};
-	slots::start_slot_worker::<_, _, _, _, _, BabeSlotCompatible, _>(
+	register_babe_inherent_data_provider(&inherent_data_providers, config.0.slot_duration())?;
+	Ok(slots::start_slot_worker::<_, _, _, _, _, BabeSlotCompatible>(
 		config.0,
 		select_chain,
-		Arc::new(worker),
+		worker,
 		sync_oracle,
-		on_exit,
 		inherent_data_providers
-	)
+	))
 }
 
 struct BabeWorker<C, E, I, SO> {
@@ -228,7 +222,6 @@ struct BabeWorker<C, E, I, SO> {
 	env: Arc<E>,
 	local_key: Arc<sr25519::Pair>,
 	sync_oracle: SO,
-	inherent_data_providers: InherentDataProviders,
 	force_authoring: bool,
 	threshold: u64,
 }
@@ -253,13 +246,6 @@ impl<Hash, H, B, C, E, I, Error, SO> SlotWorker<B> for BabeWorker<C, E, I, SO> w
 {
 	type OnSlot = Box<dyn Future<Item=(), Error=consensus_common::Error> + Send>;
 
-	fn on_start(
-		&self,
-		slot_duration: u64
-	) -> Result<(), consensus_common::Error> {
-		register_babe_inherent_data_provider(&self.inherent_data_providers, slot_duration)
-	}
-
 	fn on_slot(
 		&self,
 		chain_head: B::Header,
@@ -985,7 +971,6 @@ mod tests {
 				client,
 				env: environ.clone(),
 				sync_oracle: DummyOracle,
-				on_exit: futures::empty(),
 				inherent_data_providers,
 				force_authoring: false,
 			}).expect("Starts babe");
diff --git a/substrate/core/consensus/slots/src/lib.rs b/substrate/core/consensus/slots/src/lib.rs
index 783cb018b9e..aee398a9ce5 100644
--- a/substrate/core/consensus/slots/src/lib.rs
+++ b/substrate/core/consensus/slots/src/lib.rs
@@ -41,7 +41,7 @@ use runtime_primitives::generic::BlockId;
 use runtime_primitives::traits::{ApiRef, Block, ProvideRuntimeApi};
 use std::fmt::Debug;
 use std::ops::Deref;
-use std::sync::{mpsc, Arc};
+use std::sync::mpsc;
 use std::thread;
 
 /// A worker that should be invoked at every new slot.
@@ -51,7 +51,8 @@ pub trait SlotWorker<B: Block> {
 	type OnSlot: IntoFuture<Item = (), Error = consensus_common::Error>;
 
 	/// Called when the proposer starts.
-	fn on_start(&self, slot_duration: u64) -> Result<(), consensus_common::Error>;
+	#[deprecated(note = "Not called. Please perform any initialization before calling start_slot_worker.")]
+	fn on_start(&self, _slot_duration: u64) -> Result<(), consensus_common::Error> { Ok(()) }
 
 	/// Called when a new slot is triggered.
 	fn on_slot(&self, chain_head: B::Header, slot_info: SlotInfo) -> Self::OnSlot;
@@ -70,7 +71,7 @@ pub trait SlotCompatible {
 pub fn start_slot_worker_thread<B, C, W, SO, SC, T, OnExit>(
 	slot_duration: SlotDuration<T>,
 	select_chain: C,
-	worker: Arc<W>,
+	worker: W,
 	sync_oracle: SO,
 	on_exit: OnExit,
 	inherent_data_providers: InherentDataProviders,
@@ -97,29 +98,19 @@ where
 			}
 		};
 
-		let slot_worker_future = match start_slot_worker::<_, _, _, T, _, SC, _>(
+		let slot_worker_future = start_slot_worker::<_, _, _, T, _, SC>(
 			slot_duration.clone(),
 			select_chain,
 			worker,
 			sync_oracle,
-			on_exit,
 			inherent_data_providers,
-		) {
-			Ok(slot_worker_future) => {
-				result_sender
-					.send(Ok(()))
-					.expect("Receive is not dropped before receiving a result; qed");
-				slot_worker_future
-			}
-			Err(e) => {
-				result_sender
-					.send(Err(e))
-					.expect("Receive is not dropped before receiving a result; qed");
-				return;
-			}
-		};
+		);
+
+		result_sender
+			.send(Ok(()))
+			.expect("Receive is not dropped before receiving a result; qed");
 
-		let _ = runtime.block_on(slot_worker_future);
+		let _ = runtime.block_on(slot_worker_future.select(on_exit).map(|_| ()));
 	});
 
 	result_recv
@@ -128,67 +119,58 @@ where
 }
 
 /// Start a new slot worker.
-pub fn start_slot_worker<B, C, W, T, SO, SC, OnExit>(
+///
+/// Every time a new slot is triggered, `worker.on_slot` is called and the future it returns is
+/// polled until completion, unless we are major syncing.
+pub fn start_slot_worker<B, C, W, T, SO, SC>(
 	slot_duration: SlotDuration<T>,
 	client: C,
-	worker: Arc<W>,
+	worker: W,
 	sync_oracle: SO,
-	on_exit: OnExit,
 	inherent_data_providers: InherentDataProviders,
-) -> Result<impl Future<Item = (), Error = ()>, consensus_common::Error>
+) -> impl Future<Item = (), Error = ()>
 where
 	B: Block,
 	C: SelectChain<B> + Clone,
 	W: SlotWorker<B>,
 	SO: SyncOracle + Send + Clone,
 	SC: SlotCompatible,
-	OnExit: Future<Item = (), Error = ()>,
 	T: SlotData + Clone,
 {
-	worker.on_start(slot_duration.slot_duration())?;
-
-	let make_authorship = move || {
-		let client = client.clone();
-		let worker = worker.clone();
-		let sync_oracle = sync_oracle.clone();
-		let SlotDuration(slot_duration) = slot_duration.clone();
-		let inherent_data_providers = inherent_data_providers.clone();
-
-		// rather than use a timer interval, we schedule our waits ourselves
-		Slots::<SC>::new(slot_duration.slot_duration(), inherent_data_providers)
-			.map_err(|e| debug!(target: "slots", "Faulty timer: {:?}", e))
-			.for_each(move |slot_info| {
-				let client = client.clone();
-				let worker = worker.clone();
-				let sync_oracle = sync_oracle.clone();
-
-				// only propose when we are not syncing.
-				if sync_oracle.is_major_syncing() {
-					debug!(target: "slots", "Skipping proposal slot due to sync.");
+	let SlotDuration(slot_duration) = slot_duration;
+
+	// rather than use a timer interval, we schedule our waits ourselves
+	let mut authorship = Slots::<SC>::new(slot_duration.slot_duration(), inherent_data_providers)
+		.map_err(|e| debug!(target: "slots", "Faulty timer: {:?}", e))
+		.for_each(move |slot_info| {
+			// only propose when we are not syncing.
+			if sync_oracle.is_major_syncing() {
+				debug!(target: "slots", "Skipping proposal slot due to sync.");
+				return Either::B(future::ok(()));
+			}
+
+			let slot_num = slot_info.number;
+			let chain_head = match client.best_chain() {
+				Ok(x) => x,
+				Err(e) => {
+					warn!(target: "slots", "Unable to author block in slot {}. \
+					no best block header: {:?}", slot_num, e);
 					return Either::B(future::ok(()));
 				}
-
-				let slot_num = slot_info.number;
-				let chain_head = match client.best_chain() {
-					Ok(x) => x,
-					Err(e) => {
-						warn!(target: "slots", "Unable to author block in slot {}. \
-						no best block header: {:?}", slot_num, e);
-						return Either::B(future::ok(()));
-					}
-				};
-
-				Either::A(worker.on_slot(chain_head, slot_info).into_future().map_err(
-					|e| warn!(target: "slots", "Encountered consensus error: {:?}", e),
-				))
-			})
-	};
-
-	let work = future::loop_fn((), move |()| {
-		let authorship_task = ::std::panic::AssertUnwindSafe(make_authorship());
-		authorship_task.catch_unwind().then(|res| {
-			match res {
-				Ok(Ok(())) => (),
+			};
+
+			Either::A(worker.on_slot(chain_head, slot_info).into_future().map_err(
+				|e| warn!(target: "slots", "Encountered consensus error: {:?}", e),
+			))
+		});
+
+	future::poll_fn(move ||
+		loop {
+			let mut authorship = std::panic::AssertUnwindSafe(&mut authorship);
+			match std::panic::catch_unwind(move || authorship.poll()) {
+				Ok(Ok(Async::Ready(()))) =>
+					warn!(target: "slots", "Slots stream has terminated unexpectedly."),
+				Ok(Ok(Async::NotReady)) => break Ok(Async::NotReady),
 				Ok(Err(())) => warn!(target: "slots", "Authorship task terminated unexpectedly. Restarting"),
 				Err(e) => {
 					if let Some(s) = e.downcast_ref::<&'static str>() {
@@ -198,12 +180,8 @@ where
 					warn!(target: "slots", "Restarting authorship task");
 				}
 			}
-
-			Ok(future::Loop::Continue(()))
-		})
-	});
-
-	Ok(work.select(on_exit).then(|_| Ok(())))
+		}
+	)
 }
 
 /// A header which has been checked
diff --git a/substrate/node-template/src/service.rs b/substrate/node-template/src/service.rs
index c8f5dfc0a19..25e7db8dec3 100644
--- a/substrate/node-template/src/service.rs
+++ b/substrate/node-template/src/service.rs
@@ -14,6 +14,7 @@ use substrate_service::{
 };
 use basic_authorship::ProposerFactory;
 use consensus::{import_queue, start_aura, AuraImportQueue, SlotDuration, NothingExtra};
+use futures::prelude::*;
 use substrate_client::{self as client, LongestChain};
 use primitives::{ed25519::Pair, Pair as PairT};
 use inherents::InherentDataProviders;
@@ -75,7 +76,7 @@ construct_service_factory! {
 					let client = service.client();
 					let select_chain = service.select_chain()
 						.ok_or_else(|| ServiceError::SelectChainRequired)?;
-					executor.spawn(start_aura(
+					let aura = start_aura(
 						SlotDuration::get_or_compute(&*client)?,
 						key.clone(),
 						client.clone(),
@@ -83,10 +84,10 @@ construct_service_factory! {
 						client,
 						proposer,
 						service.network(),
-						service.on_exit(),
 						service.config.custom.inherent_data_providers.clone(),
 						service.config.force_authoring,
-					)?);
+					)?;
+					executor.spawn(aura.select(service.on_exit()).then(|_| Ok(())));
 				}
 
 				Ok(service)
diff --git a/substrate/node/cli/src/service.rs b/substrate/node/cli/src/service.rs
index 415ddbe71be..1458392b0f5 100644
--- a/substrate/node/cli/src/service.rs
+++ b/substrate/node/cli/src/service.rs
@@ -26,6 +26,7 @@ use consensus::{import_queue, start_aura, AuraImportQueue, SlotDuration, Nothing
 use grandpa::{self, FinalityProofProvider as GrandpaFinalityProofProvider};
 use node_executor;
 use primitives::{Pair as PairT, ed25519};
+use futures::prelude::*;
 use node_primitives::Block;
 use node_runtime::{GenesisConfig, RuntimeApi};
 use substrate_service::{
@@ -92,7 +93,7 @@ construct_service_factory! {
 					let client = service.client();
 					let select_chain = service.select_chain()
 						.ok_or(ServiceError::SelectChainRequired)?;
-					executor.spawn(start_aura(
+					let aura = start_aura(
 						SlotDuration::get_or_compute(&*client)?,
 						key.clone(),
 						client,
@@ -100,10 +101,10 @@ construct_service_factory! {
 						block_import.clone(),
 						proposer,
 						service.network(),
-						service.on_exit(),
 						service.config.custom.inherent_data_providers.clone(),
 						service.config.force_authoring,
-					)?);
+					)?;
+					executor.spawn(aura.select(service.on_exit()).then(|_| Ok(())));
 
 					info!("Running Grandpa session as Authority {}", key.public());
 				}
-- 
GitLab