From 32541bde152556ad75fc74457f0f148eece8f93f Mon Sep 17 00:00:00 2001
From: Adrian Catangiu <adrian@parity.io>
Date: Fri, 25 Aug 2023 10:51:44 +0300
Subject: [PATCH] sc-consensus-beefy: restart voter on pallet reset (#14821)

When detecting pallet-beefy consensus reset, just reinitialize the worker
and continue without bringing down the task (and possibly the node).

Signed-off-by: Adrian Catangiu <adrian@parity.io>
---
 .../beefy/src/communication/peers.rs          |   2 +-
 .../incoming_requests_handler.rs              |  11 +-
 .../src/communication/request_response/mod.rs |   5 +-
 substrate/client/consensus/beefy/src/error.rs |  12 +-
 substrate/client/consensus/beefy/src/lib.rs   | 154 ++++++++++--------
 .../client/consensus/beefy/src/worker.rs      |  42 ++---
 .../primitives/consensus/beefy/src/mmr.rs     |   6 +
 7 files changed, 130 insertions(+), 102 deletions(-)

diff --git a/substrate/client/consensus/beefy/src/communication/peers.rs b/substrate/client/consensus/beefy/src/communication/peers.rs
index 4704b8dcf45..8f2d5cc90a1 100644
--- a/substrate/client/consensus/beefy/src/communication/peers.rs
+++ b/substrate/client/consensus/beefy/src/communication/peers.rs
@@ -24,7 +24,7 @@ use std::collections::{HashMap, VecDeque};
 
 /// Report specifying a reputation change for a given peer.
 #[derive(Debug, PartialEq)]
-pub(crate) struct PeerReport {
+pub struct PeerReport {
 	pub who: PeerId,
 	pub cost_benefit: ReputationChange,
 }
diff --git a/substrate/client/consensus/beefy/src/communication/request_response/incoming_requests_handler.rs b/substrate/client/consensus/beefy/src/communication/request_response/incoming_requests_handler.rs
index 8240dd71104..b8d8cd35434 100644
--- a/substrate/client/consensus/beefy/src/communication/request_response/incoming_requests_handler.rs
+++ b/substrate/client/consensus/beefy/src/communication/request_response/incoming_requests_handler.rs
@@ -18,7 +18,7 @@
 
 use codec::DecodeAll;
 use futures::{channel::oneshot, StreamExt};
-use log::{debug, error, trace};
+use log::{debug, trace};
 use sc_client_api::BlockBackend;
 use sc_network::{
 	config as netconfig, config::RequestResponseConfig, types::ProtocolName, PeerId,
@@ -182,7 +182,9 @@ where
 	}
 
 	/// Run [`BeefyJustifsRequestHandler`].
-	pub async fn run(mut self) {
+	///
+	/// Should never end, returns `Error` otherwise.
+	pub async fn run(&mut self) -> Error {
 		trace!(target: BEEFY_SYNC_LOG_TARGET, "🥩 Running BeefyJustifsRequestHandler");
 
 		while let Ok(request) = self
@@ -215,9 +217,6 @@ where
 				},
 			}
 		}
-		error!(
-			target: crate::LOG_TARGET,
-			"🥩 On-demand requests receiver stream terminated, closing worker."
-		);
+		Error::RequestsReceiverStreamClosed
 	}
 }
diff --git a/substrate/client/consensus/beefy/src/communication/request_response/mod.rs b/substrate/client/consensus/beefy/src/communication/request_response/mod.rs
index 1801512fa54..4bad3b061c8 100644
--- a/substrate/client/consensus/beefy/src/communication/request_response/mod.rs
+++ b/substrate/client/consensus/beefy/src/communication/request_response/mod.rs
@@ -75,7 +75,7 @@ pub struct JustificationRequest<B: Block> {
 }
 
 #[derive(Debug, thiserror::Error)]
-pub(crate) enum Error {
+pub enum Error {
 	#[error(transparent)]
 	Client(#[from] sp_blockchain::Error),
 
@@ -102,4 +102,7 @@ pub(crate) enum Error {
 
 	#[error("Internal error while getting response.")]
 	ResponseError,
+
+	#[error("On-demand requests receiver stream terminated.")]
+	RequestsReceiverStreamClosed,
 }
diff --git a/substrate/client/consensus/beefy/src/error.rs b/substrate/client/consensus/beefy/src/error.rs
index 08b9960f41a..b4773f94019 100644
--- a/substrate/client/consensus/beefy/src/error.rs
+++ b/substrate/client/consensus/beefy/src/error.rs
@@ -34,8 +34,18 @@ pub enum Error {
 	Signature(String),
 	#[error("Session uninitialized")]
 	UninitSession,
-	#[error("pallet-beefy was reset, please restart voter")]
+	#[error("pallet-beefy was reset")]
 	ConsensusReset,
+	#[error("Block import stream terminated")]
+	BlockImportStreamTerminated,
+	#[error("Gossip Engine terminated")]
+	GossipEngineTerminated,
+	#[error("Finality proofs gossiping stream terminated")]
+	FinalityProofGossipStreamTerminated,
+	#[error("Finality stream terminated")]
+	FinalityStreamTerminated,
+	#[error("Votes gossiping stream terminated")]
+	VotesGossipStreamTerminated,
 }
 
 #[cfg(test)]
diff --git a/substrate/client/consensus/beefy/src/lib.rs b/substrate/client/consensus/beefy/src/lib.rs
index da339dae7e1..0b3baa007c1 100644
--- a/substrate/client/consensus/beefy/src/lib.rs
+++ b/substrate/client/consensus/beefy/src/lib.rs
@@ -221,7 +221,7 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
 	B: Block,
 	BE: Backend<B>,
 	C: Client<B, BE> + BlockBackend<B>,
-	P: PayloadProvider<B>,
+	P: PayloadProvider<B> + Clone,
 	R: ProvideRuntimeApi<B>,
 	R::Api: BeefyApi<B, AuthorityId> + MmrApi<B, MmrRootHash, NumberFor<B>>,
 	N: GossipNetwork<B> + NetworkRequest + Send + Sync + 'static,
@@ -237,7 +237,7 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
 		min_block_delta,
 		prometheus_registry,
 		links,
-		on_demand_justifications_handler,
+		mut on_demand_justifications_handler,
 	} = beefy_params;
 
 	let BeefyNetworkParams {
@@ -248,83 +248,105 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
 		..
 	} = network_params;
 
-	let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
-	// Default votes filter is to discard everything.
-	// Validator is updated later with correct starting round and set id.
-	let (gossip_validator, gossip_report_stream) =
-		communication::gossip::GossipValidator::new(known_peers.clone());
-	let gossip_validator = Arc::new(gossip_validator);
-	let mut gossip_engine = GossipEngine::new(
-		network.clone(),
-		sync.clone(),
-		gossip_protocol_name,
-		gossip_validator.clone(),
-		None,
-	);
 	let metrics = register_metrics(prometheus_registry.clone());
 
-	// The `GossipValidator` adds and removes known peers based on valid votes and network events.
-	let on_demand_justifications = OnDemandJustificationsEngine::new(
-		network.clone(),
-		justifications_protocol_name,
-		known_peers,
-		prometheus_registry.clone(),
-	);
-
 	// Subscribe to finality notifications and justifications before waiting for runtime pallet and
 	// reuse the streams, so we don't miss notifications while waiting for pallet to be available.
 	let mut finality_notifications = client.finality_notification_stream().fuse();
-	let block_import_justif = links.from_block_import_justif_stream.subscribe(100_000).fuse();
-
-	// Wait for BEEFY pallet to be active before starting voter.
-	let persisted_state =
-		match wait_for_runtime_pallet(&*runtime, &mut gossip_engine, &mut finality_notifications)
-			.await
-			.and_then(|(beefy_genesis, best_grandpa)| {
-				load_or_init_voter_state(
-					&*backend,
-					&*runtime,
-					beefy_genesis,
-					best_grandpa,
-					min_block_delta,
-				)
-			}) {
+	let mut block_import_justif = links.from_block_import_justif_stream.subscribe(100_000).fuse();
+
+	// We re-create and re-run the worker in this loop in order to quickly reinit and resume after
+	// select recoverable errors.
+	loop {
+		let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
+		// Default votes filter is to discard everything.
+		// Validator is updated later with correct starting round and set id.
+		let (gossip_validator, gossip_report_stream) =
+			communication::gossip::GossipValidator::new(known_peers.clone());
+		let gossip_validator = Arc::new(gossip_validator);
+		let mut gossip_engine = GossipEngine::new(
+			network.clone(),
+			sync.clone(),
+			gossip_protocol_name.clone(),
+			gossip_validator.clone(),
+			None,
+		);
+
+		// The `GossipValidator` adds and removes known peers based on valid votes and network
+		// events.
+		let on_demand_justifications = OnDemandJustificationsEngine::new(
+			network.clone(),
+			justifications_protocol_name.clone(),
+			known_peers,
+			prometheus_registry.clone(),
+		);
+
+		// Wait for BEEFY pallet to be active before starting voter.
+		let persisted_state = match wait_for_runtime_pallet(
+			&*runtime,
+			&mut gossip_engine,
+			&mut finality_notifications,
+		)
+		.await
+		.and_then(|(beefy_genesis, best_grandpa)| {
+			load_or_init_voter_state(
+				&*backend,
+				&*runtime,
+				beefy_genesis,
+				best_grandpa,
+				min_block_delta,
+			)
+		}) {
 			Ok(state) => state,
 			Err(e) => {
 				error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e);
 				return
 			},
 		};
-	// Update the gossip validator with the right starting round and set id.
-	if let Err(e) = persisted_state
-		.gossip_filter_config()
-		.map(|f| gossip_validator.update_filter(f))
-	{
-		error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e);
-		return
-	}
+		// Update the gossip validator with the right starting round and set id.
+		if let Err(e) = persisted_state
+			.gossip_filter_config()
+			.map(|f| gossip_validator.update_filter(f))
+		{
+			error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e);
+			return
+		}
 
-	let worker = worker::BeefyWorker {
-		backend,
-		payload_provider,
-		runtime,
-		sync,
-		key_store: key_store.into(),
-		gossip_engine,
-		gossip_validator,
-		gossip_report_stream,
-		on_demand_justifications,
-		links,
-		metrics,
-		pending_justifications: BTreeMap::new(),
-		persisted_state,
-	};
+		let worker = worker::BeefyWorker {
+			backend: backend.clone(),
+			payload_provider: payload_provider.clone(),
+			runtime: runtime.clone(),
+			sync: sync.clone(),
+			key_store: key_store.clone().into(),
+			gossip_engine,
+			gossip_validator,
+			gossip_report_stream,
+			on_demand_justifications,
+			links: links.clone(),
+			metrics: metrics.clone(),
+			pending_justifications: BTreeMap::new(),
+			persisted_state,
+		};
 
-	futures::future::select(
-		Box::pin(worker.run(block_import_justif, finality_notifications)),
-		Box::pin(on_demand_justifications_handler.run()),
-	)
-	.await;
+		match futures::future::select(
+			Box::pin(worker.run(&mut block_import_justif, &mut finality_notifications)),
+			Box::pin(on_demand_justifications_handler.run()),
+		)
+		.await
+		{
+			// On `ConsensusReset` error, just reinit and restart voter.
+			futures::future::Either::Left((error::Error::ConsensusReset, _)) => {
+				error!(target: LOG_TARGET, "🥩 Error: {:?}. Restarting voter.", error::Error::ConsensusReset);
+				continue
+			},
+			// On other errors, bring down / finish the task.
+			futures::future::Either::Left((worker_err, _)) =>
+				error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", worker_err),
+			futures::future::Either::Right((odj_handler_err, _)) =>
+				error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", odj_handler_err),
+		};
+		return
+	}
 }
 
 fn load_or_init_voter_state<B, BE, R>(
diff --git a/substrate/client/consensus/beefy/src/worker.rs b/substrate/client/consensus/beefy/src/worker.rs
index 17a8891b061..0d3845a2703 100644
--- a/substrate/client/consensus/beefy/src/worker.rs
+++ b/substrate/client/consensus/beefy/src/worker.rs
@@ -447,11 +447,7 @@ where
 			.ok()
 			.flatten()
 			.filter(|genesis| *genesis == self.persisted_state.pallet_genesis)
-			.ok_or_else(|| {
-				let err = Error::ConsensusReset;
-				error!(target: LOG_TARGET, "🥩 Error: {}", err);
-				err
-			})?;
+			.ok_or(Error::ConsensusReset)?;
 
 		if *header.number() > self.best_grandpa_block() {
 			// update best GRANDPA finalized block we have seen
@@ -795,11 +791,12 @@ where
 	/// Main loop for BEEFY worker.
 	///
 	/// Run the main async loop which is driven by finality notifications and gossiped votes.
+	/// Should never end, returns `Error` otherwise.
 	pub(crate) async fn run(
 		mut self,
-		mut block_import_justif: Fuse<NotificationReceiver<BeefyVersionedFinalityProof<B>>>,
-		mut finality_notifications: Fuse<FinalityNotifications<B>>,
-	) {
+		block_import_justif: &mut Fuse<NotificationReceiver<BeefyVersionedFinalityProof<B>>>,
+		finality_notifications: &mut Fuse<FinalityNotifications<B>>,
+	) -> Error {
 		info!(
 			target: LOG_TARGET,
 			"🥩 run BEEFY worker, best grandpa: #{:?}.",
@@ -848,17 +845,17 @@ where
 				// Use `select_biased!` to prioritize order below.
 				// Process finality notifications first since these drive the voter.
 				notification = finality_notifications.next() => {
-					if notification.and_then(|notif| {
-						self.handle_finality_notification(&notif).ok()
-					}).is_none() {
-						error!(target: LOG_TARGET, "🥩 Finality stream terminated, closing worker.");
-						return;
+					if let Some(notif) = notification {
+						if let Err(err) = self.handle_finality_notification(&notif) {
+							return err;
+						}
+					} else {
+						return Error::FinalityStreamTerminated;
 					}
 				},
 				// Make sure to pump gossip engine.
 				_ = gossip_engine => {
-					error!(target: LOG_TARGET, "🥩 Gossip engine has terminated, closing worker.");
-					return;
+					return Error::GossipEngineTerminated;
 				},
 				// Process incoming justifications as these can make some in-flight votes obsolete.
 				response_info = self.on_demand_justifications.next().fuse() => {
@@ -881,8 +878,7 @@ where
 							debug!(target: LOG_TARGET, "🥩 {}", err);
 						}
 					} else {
-						error!(target: LOG_TARGET, "🥩 Block import stream terminated, closing worker.");
-						return;
+						return Error::BlockImportStreamTerminated;
 					}
 				},
 				justif = gossip_proofs.next() => {
@@ -892,11 +888,7 @@ where
 							debug!(target: LOG_TARGET, "🥩 {}", err);
 						}
 					} else {
-						error!(
-							target: LOG_TARGET,
-							"🥩 Finality proofs gossiping stream terminated, closing worker."
-						);
-						return;
+						return Error::FinalityProofGossipStreamTerminated;
 					}
 				},
 				// Finally process incoming votes.
@@ -907,11 +899,7 @@ where
 							debug!(target: LOG_TARGET, "🥩 {}", err);
 						}
 					} else {
-						error!(
-							target: LOG_TARGET,
-							"🥩 Votes gossiping stream terminated, closing worker."
-						);
-						return;
+						return Error::VotesGossipStreamTerminated;
 					}
 				},
 				// Process peer reports.
diff --git a/substrate/primitives/consensus/beefy/src/mmr.rs b/substrate/primitives/consensus/beefy/src/mmr.rs
index 991dc07c5a7..660506b8763 100644
--- a/substrate/primitives/consensus/beefy/src/mmr.rs
+++ b/substrate/primitives/consensus/beefy/src/mmr.rs
@@ -162,6 +162,12 @@ mod mmr_root_provider {
 		_phantom: PhantomData<B>,
 	}
 
+	impl<B, R> Clone for MmrRootProvider<B, R> {
+		fn clone(&self) -> Self {
+			Self { runtime: self.runtime.clone(), _phantom: PhantomData }
+		}
+	}
+
 	impl<B, R> MmrRootProvider<B, R>
 	where
 		B: Block,
-- 
GitLab