From b177c2860e6d75d189e66185954a283db000f7ef Mon Sep 17 00:00:00 2001
From: Serban Iorga <serban@parity.io>
Date: Thu, 1 Feb 2024 12:24:16 +0100
Subject: [PATCH] [BEEFY] Avoid missing voting sessions during node restart
 (#3074)

Related to https://github.com/paritytech/polkadot-sdk/issues/3003 and
https://github.com/paritytech/polkadot-sdk/issues/2842

---------

Co-authored-by: Adrian Catangiu <adrian@parity.io>
---
 .../client/consensus/beefy/src/aux_schema.rs  |  20 +-
 .../incoming_requests_handler.rs              |   4 +-
 .../outgoing_requests_engine.rs               |  10 +-
 .../client/consensus/beefy/src/import.rs      |   4 +-
 substrate/client/consensus/beefy/src/lib.rs   | 189 ++--------
 .../client/consensus/beefy/src/metrics.rs     |  12 +-
 substrate/client/consensus/beefy/src/tests.rs | 138 ++++++-
 .../client/consensus/beefy/src/worker.rs      | 357 ++++++++++++++----
 8 files changed, 456 insertions(+), 278 deletions(-)

diff --git a/substrate/client/consensus/beefy/src/aux_schema.rs b/substrate/client/consensus/beefy/src/aux_schema.rs
index 409eb30d09a..944a00f8372 100644
--- a/substrate/client/consensus/beefy/src/aux_schema.rs
+++ b/substrate/client/consensus/beefy/src/aux_schema.rs
@@ -18,11 +18,10 @@
 
 //! Schema for BEEFY state persisted in the aux-db.
 
-use crate::{worker::PersistedState, LOG_TARGET};
+use crate::{error::Error, worker::PersistedState, LOG_TARGET};
 use codec::{Decode, Encode};
 use log::{info, trace};
 use sc_client_api::{backend::AuxStore, Backend};
-use sp_blockchain::{Error as ClientError, Result as ClientResult};
 use sp_runtime::traits::Block as BlockT;
 
 const VERSION_KEY: &[u8] = b"beefy_auxschema_version";
@@ -30,31 +29,33 @@ const WORKER_STATE_KEY: &[u8] = b"beefy_voter_state";
 
 const CURRENT_VERSION: u32 = 4;
 
-pub(crate) fn write_current_version<BE: AuxStore>(backend: &BE) -> ClientResult<()> {
+pub(crate) fn write_current_version<BE: AuxStore>(backend: &BE) -> Result<(), Error> {
 	info!(target: LOG_TARGET, "🥩 write aux schema version {:?}", CURRENT_VERSION);
 	AuxStore::insert_aux(backend, &[(VERSION_KEY, CURRENT_VERSION.encode().as_slice())], &[])
+		.map_err(|e| Error::Backend(e.to_string()))
 }
 
 /// Write voter state.
 pub(crate) fn write_voter_state<B: BlockT, BE: AuxStore>(
 	backend: &BE,
 	state: &PersistedState<B>,
-) -> ClientResult<()> {
+) -> Result<(), Error> {
 	trace!(target: LOG_TARGET, "🥩 persisting {:?}", state);
 	AuxStore::insert_aux(backend, &[(WORKER_STATE_KEY, state.encode().as_slice())], &[])
+		.map_err(|e| Error::Backend(e.to_string()))
 }
 
-fn load_decode<BE: AuxStore, T: Decode>(backend: &BE, key: &[u8]) -> ClientResult<Option<T>> {
-	match backend.get_aux(key)? {
+fn load_decode<BE: AuxStore, T: Decode>(backend: &BE, key: &[u8]) -> Result<Option<T>, Error> {
+	match backend.get_aux(key).map_err(|e| Error::Backend(e.to_string()))? {
 		None => Ok(None),
 		Some(t) => T::decode(&mut &t[..])
-			.map_err(|e| ClientError::Backend(format!("BEEFY DB is corrupted: {}", e)))
+			.map_err(|e| Error::Backend(format!("BEEFY DB is corrupted: {}", e)))
 			.map(Some),
 	}
 }
 
 /// Load or initialize persistent data from backend.
-pub(crate) fn load_persistent<B, BE>(backend: &BE) -> ClientResult<Option<PersistedState<B>>>
+pub(crate) fn load_persistent<B, BE>(backend: &BE) -> Result<Option<PersistedState<B>>, Error>
 where
 	B: BlockT,
 	BE: Backend<B>,
@@ -65,8 +66,7 @@ where
 		None => (),
 		Some(1) | Some(2) | Some(3) => (), // versions 1, 2 & 3 are obsolete and should be ignored
 		Some(4) => return load_decode::<_, PersistedState<B>>(backend, WORKER_STATE_KEY),
-		other =>
-			return Err(ClientError::Backend(format!("Unsupported BEEFY DB version: {:?}", other))),
+		other => return Err(Error::Backend(format!("Unsupported BEEFY DB version: {:?}", other))),
 	}
 
 	// No persistent state found in DB.
diff --git a/substrate/client/consensus/beefy/src/communication/request_response/incoming_requests_handler.rs b/substrate/client/consensus/beefy/src/communication/request_response/incoming_requests_handler.rs
index 71c5c49b369..d856e9748a1 100644
--- a/substrate/client/consensus/beefy/src/communication/request_response/incoming_requests_handler.rs
+++ b/substrate/client/consensus/beefy/src/communication/request_response/incoming_requests_handler.rs
@@ -201,7 +201,7 @@ where
 			let peer = request.peer;
 			match self.handle_request(request) {
 				Ok(()) => {
-					metric_inc!(self, beefy_successful_justification_responses);
+					metric_inc!(self.metrics, beefy_successful_justification_responses);
 					debug!(
 						target: BEEFY_SYNC_LOG_TARGET,
 						"🥩 Handled BEEFY justification request from {:?}.", peer
@@ -209,7 +209,7 @@ where
 				},
 				Err(e) => {
 					// peer reputation changes already applied in `self.handle_request()`
-					metric_inc!(self, beefy_failed_justification_responses);
+					metric_inc!(self.metrics, beefy_failed_justification_responses);
 					debug!(
 						target: BEEFY_SYNC_LOG_TARGET,
 						"🥩 Failed to handle BEEFY justification request from {:?}: {}", peer, e,
diff --git a/substrate/client/consensus/beefy/src/communication/request_response/outgoing_requests_engine.rs b/substrate/client/consensus/beefy/src/communication/request_response/outgoing_requests_engine.rs
index 7121410ea10..992b9fa08c0 100644
--- a/substrate/client/consensus/beefy/src/communication/request_response/outgoing_requests_engine.rs
+++ b/substrate/client/consensus/beefy/src/communication/request_response/outgoing_requests_engine.rs
@@ -148,7 +148,7 @@ impl<B: Block> OnDemandJustificationsEngine<B> {
 		if let Some(peer) = self.try_next_peer() {
 			self.request_from_peer(peer, RequestInfo { block, active_set });
 		} else {
-			metric_inc!(self, beefy_on_demand_justification_no_peer_to_request_from);
+			metric_inc!(self.metrics, beefy_on_demand_justification_no_peer_to_request_from);
 			debug!(
 				target: BEEFY_SYNC_LOG_TARGET,
 				"🥩 no good peers to request justif #{:?} from", block
@@ -194,13 +194,13 @@ impl<B: Block> OnDemandJustificationsEngine<B> {
 				);
 				match e {
 					RequestFailure::Refused => {
-						metric_inc!(self, beefy_on_demand_justification_peer_refused);
+						metric_inc!(self.metrics, beefy_on_demand_justification_peer_refused);
 						let peer_report =
 							PeerReport { who: *peer, cost_benefit: cost::REFUSAL_RESPONSE };
 						Error::InvalidResponse(peer_report)
 					},
 					_ => {
-						metric_inc!(self, beefy_on_demand_justification_peer_error);
+						metric_inc!(self.metrics, beefy_on_demand_justification_peer_error);
 						Error::ResponseError
 					},
 				}
@@ -212,7 +212,7 @@ impl<B: Block> OnDemandJustificationsEngine<B> {
 					&req_info.active_set,
 				)
 				.map_err(|(err, signatures_checked)| {
-					metric_inc!(self, beefy_on_demand_justification_invalid_proof);
+					metric_inc!(self.metrics, beefy_on_demand_justification_invalid_proof);
 					debug!(
 						target: BEEFY_SYNC_LOG_TARGET,
 						"🥩 for on demand justification #{:?}, peer {:?} responded with invalid proof: {:?}",
@@ -261,7 +261,7 @@ impl<B: Block> OnDemandJustificationsEngine<B> {
 				}
 			},
 			Ok(proof) => {
-				metric_inc!(self, beefy_on_demand_justification_good_proof);
+				metric_inc!(self.metrics, beefy_on_demand_justification_good_proof);
 				debug!(
 					target: BEEFY_SYNC_LOG_TARGET,
 					"🥩 received valid on-demand justif #{:?} from {:?}", block, peer
diff --git a/substrate/client/consensus/beefy/src/import.rs b/substrate/client/consensus/beefy/src/import.rs
index 6eced17b58f..fc19ecc3014 100644
--- a/substrate/client/consensus/beefy/src/import.rs
+++ b/substrate/client/consensus/beefy/src/import.rs
@@ -165,7 +165,7 @@ where
 						self.justification_sender
 							.notify(|| Ok::<_, ()>(proof))
 							.expect("the closure always returns Ok; qed.");
-						metric_inc!(self, beefy_good_justification_imports);
+						metric_inc!(self.metrics, beefy_good_justification_imports);
 					},
 					Err(err) => {
 						debug!(
@@ -174,7 +174,7 @@ where
 							number,
 							err,
 						);
-						metric_inc!(self, beefy_bad_justification_imports);
+						metric_inc!(self.metrics, beefy_bad_justification_imports);
 					},
 				}
 			},
diff --git a/substrate/client/consensus/beefy/src/lib.rs b/substrate/client/consensus/beefy/src/lib.rs
index 2e2e22288e3..1f10d8099d8 100644
--- a/substrate/client/consensus/beefy/src/lib.rs
+++ b/substrate/client/consensus/beefy/src/lib.rs
@@ -27,10 +27,9 @@ use crate::{
 			outgoing_requests_engine::OnDemandJustificationsEngine, BeefyJustifsRequestHandler,
 		},
 	},
+	error::Error,
 	import::BeefyBlockImport,
 	metrics::register_metrics,
-	round::Rounds,
-	worker::PersistedState,
 };
 use futures::{stream::Fuse, StreamExt};
 use log::{debug, error, info, warn};
@@ -47,17 +46,11 @@ use sp_blockchain::{
 use sp_consensus::{Error as ConsensusError, SyncOracle};
 use sp_consensus_beefy::{
 	ecdsa_crypto::AuthorityId, BeefyApi, MmrRootHash, PayloadProvider, ValidatorSet,
-	BEEFY_ENGINE_ID,
 };
 use sp_keystore::KeystorePtr;
 use sp_mmr_primitives::MmrApi;
 use sp_runtime::traits::{Block, Header as HeaderT, NumberFor, Zero};
-use std::{
-	collections::{BTreeMap, VecDeque},
-	marker::PhantomData,
-	sync::Arc,
-	time::Duration,
-};
+use std::{collections::BTreeMap, marker::PhantomData, sync::Arc, time::Duration};
 
 mod aux_schema;
 mod error;
@@ -309,14 +302,17 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
 			},
 		};
 
-		let persisted_state = match load_or_init_voter_state(
-			&*backend,
-			&*runtime,
-			beefy_genesis,
-			best_grandpa,
-			min_block_delta,
-		)
-		.await
+		let mut worker_base = worker::BeefyWorkerBase {
+			backend: backend.clone(),
+			runtime: runtime.clone(),
+			key_store: key_store.clone().into(),
+			metrics: metrics.clone(),
+			_phantom: Default::default(),
+		};
+
+		let persisted_state = match worker_base
+			.load_or_init_state(beefy_genesis, best_grandpa, min_block_delta)
+			.await
 		{
 			Ok(state) => state,
 			Err(e) => {
@@ -334,14 +330,11 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
 		}
 
 		let worker = worker::BeefyWorker {
-			backend: backend.clone(),
+			base: worker_base,
 			payload_provider: payload_provider.clone(),
-			runtime: runtime.clone(),
 			sync: sync.clone(),
-			key_store: key_store.clone().into(),
 			comms: beefy_comms,
 			links: links.clone(),
-			metrics: metrics.clone(),
 			pending_justifications: BTreeMap::new(),
 			persisted_state,
 		};
@@ -368,43 +361,6 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
 	}
 }
 
-async fn load_or_init_voter_state<B, BE, R>(
-	backend: &BE,
-	runtime: &R,
-	beefy_genesis: NumberFor<B>,
-	best_grandpa: <B as Block>::Header,
-	min_block_delta: u32,
-) -> ClientResult<PersistedState<B>>
-where
-	B: Block,
-	BE: Backend<B>,
-	R: ProvideRuntimeApi<B>,
-	R::Api: BeefyApi<B, AuthorityId>,
-{
-	// Initialize voter state from AUX DB if compatible.
-	if let Some(mut state) = crate::aux_schema::load_persistent(backend)?
-		// Verify state pallet genesis matches runtime.
-		.filter(|state| state.pallet_genesis() == beefy_genesis)
-	{
-		// Overwrite persisted state with current best GRANDPA block.
-		state.set_best_grandpa(best_grandpa.clone());
-		// Overwrite persisted data with newly provided `min_block_delta`.
-		state.set_min_block_delta(min_block_delta);
-		info!(target: LOG_TARGET, "🥩 Loading BEEFY voter state from db: {:?}.", state);
-
-		// Make sure that all the headers that we need have been synced.
-		let mut header = best_grandpa.clone();
-		while *header.number() > state.best_beefy() {
-			header =
-				wait_for_parent_header(backend.blockchain(), header, HEADER_SYNC_DELAY).await?;
-		}
-		return Ok(state)
-	}
-
-	// No valid voter-state persisted, re-initialize from pallet genesis.
-	initialize_voter_state(backend, runtime, beefy_genesis, best_grandpa, min_block_delta).await
-}
-
 /// Waits until the parent header of `current` is available and returns it.
 ///
 /// When the node uses GRANDPA warp sync it initially downloads only the mandatory GRANDPA headers.
@@ -415,7 +371,7 @@ async fn wait_for_parent_header<B, BC>(
 	blockchain: &BC,
 	current: <B as Block>::Header,
 	delay: Duration,
-) -> ClientResult<<B as Block>::Header>
+) -> Result<<B as Block>::Header, Error>
 where
 	B: Block,
 	BC: BlockchainBackend<B>,
@@ -423,10 +379,13 @@ where
 	if *current.number() == Zero::zero() {
 		let msg = format!("header {} is Genesis, there is no parent for it", current.hash());
 		warn!(target: LOG_TARGET, "{}", msg);
-		return Err(ClientError::UnknownBlock(msg))
+		return Err(Error::Backend(msg));
 	}
 	loop {
-		match blockchain.header(*current.parent_hash())? {
+		match blockchain
+			.header(*current.parent_hash())
+			.map_err(|e| Error::Backend(e.to_string()))?
+		{
 			Some(parent) => return Ok(parent),
 			None => {
 				info!(
@@ -441,108 +400,6 @@ where
 	}
 }
 
-// If no persisted state present, walk back the chain from first GRANDPA notification to either:
-//  - latest BEEFY finalized block, or if none found on the way,
-//  - BEEFY pallet genesis;
-// Enqueue any BEEFY mandatory blocks (session boundaries) found on the way, for voter to finalize.
-async fn initialize_voter_state<B, BE, R>(
-	backend: &BE,
-	runtime: &R,
-	beefy_genesis: NumberFor<B>,
-	best_grandpa: <B as Block>::Header,
-	min_block_delta: u32,
-) -> ClientResult<PersistedState<B>>
-where
-	B: Block,
-	BE: Backend<B>,
-	R: ProvideRuntimeApi<B>,
-	R::Api: BeefyApi<B, AuthorityId>,
-{
-	let blockchain = backend.blockchain();
-
-	let beefy_genesis = runtime
-		.runtime_api()
-		.beefy_genesis(best_grandpa.hash())
-		.ok()
-		.flatten()
-		.filter(|genesis| *genesis == beefy_genesis)
-		.ok_or_else(|| ClientError::Backend("BEEFY pallet expected to be active.".into()))?;
-	// Walk back the imported blocks and initialize voter either, at the last block with
-	// a BEEFY justification, or at pallet genesis block; voter will resume from there.
-	let mut sessions = VecDeque::new();
-	let mut header = best_grandpa.clone();
-	let state = loop {
-		if let Some(true) = blockchain
-			.justifications(header.hash())
-			.ok()
-			.flatten()
-			.map(|justifs| justifs.get(BEEFY_ENGINE_ID).is_some())
-		{
-			info!(
-				target: LOG_TARGET,
-				"🥩 Initialize BEEFY voter at last BEEFY finalized block: {:?}.",
-				*header.number()
-			);
-			let best_beefy = *header.number();
-			// If no session boundaries detected so far, just initialize new rounds here.
-			if sessions.is_empty() {
-				let active_set = expect_validator_set(runtime, backend, &header).await?;
-				let mut rounds = Rounds::new(best_beefy, active_set);
-				// Mark the round as already finalized.
-				rounds.conclude(best_beefy);
-				sessions.push_front(rounds);
-			}
-			let state = PersistedState::checked_new(
-				best_grandpa,
-				best_beefy,
-				sessions,
-				min_block_delta,
-				beefy_genesis,
-			)
-			.ok_or_else(|| ClientError::Backend("Invalid BEEFY chain".into()))?;
-			break state
-		}
-
-		if *header.number() == beefy_genesis {
-			// We've reached BEEFY genesis, initialize voter here.
-			let genesis_set = expect_validator_set(runtime, backend, &header).await?;
-			info!(
-				target: LOG_TARGET,
-				"🥩 Loading BEEFY voter state from genesis on what appears to be first startup. \
-				Starting voting rounds at block {:?}, genesis validator set {:?}.",
-				beefy_genesis,
-				genesis_set,
-			);
-
-			sessions.push_front(Rounds::new(beefy_genesis, genesis_set));
-			break PersistedState::checked_new(
-				best_grandpa,
-				Zero::zero(),
-				sessions,
-				min_block_delta,
-				beefy_genesis,
-			)
-			.ok_or_else(|| ClientError::Backend("Invalid BEEFY chain".into()))?
-		}
-
-		if let Some(active) = worker::find_authorities_change::<B>(&header) {
-			info!(
-				target: LOG_TARGET,
-				"🥩 Marking block {:?} as BEEFY Mandatory.",
-				*header.number()
-			);
-			sessions.push_front(Rounds::new(*header.number(), active));
-		}
-
-		// Move up the chain.
-		header = wait_for_parent_header(blockchain, header, HEADER_SYNC_DELAY).await?;
-	};
-
-	aux_schema::write_current_version(backend)?;
-	aux_schema::write_voter_state(backend, &state)?;
-	Ok(state)
-}
-
 /// Wait for BEEFY runtime pallet to be available, return active validator set.
 /// Should be called only once during worker initialization.
 async fn wait_for_runtime_pallet<B, R>(
@@ -595,7 +452,7 @@ async fn expect_validator_set<B, BE, R>(
 	runtime: &R,
 	backend: &BE,
 	at_header: &B::Header,
-) -> ClientResult<ValidatorSet<AuthorityId>>
+) -> Result<ValidatorSet<AuthorityId>, Error>
 where
 	B: Block,
 	BE: Backend<B>,
@@ -618,7 +475,9 @@ where
 				// Move up the chain. Ultimately we'll get it from chain genesis state, or error out
 				// there.
 				None =>
-					header = wait_for_parent_header(blockchain, header, HEADER_SYNC_DELAY).await?,
+					header = wait_for_parent_header(blockchain, header, HEADER_SYNC_DELAY)
+						.await
+						.map_err(|e| Error::Backend(e.to_string()))?,
 			}
 		}
 	}
diff --git a/substrate/client/consensus/beefy/src/metrics.rs b/substrate/client/consensus/beefy/src/metrics.rs
index 031748bdcea..ef3928d79fa 100644
--- a/substrate/client/consensus/beefy/src/metrics.rs
+++ b/substrate/client/consensus/beefy/src/metrics.rs
@@ -305,10 +305,10 @@ pub(crate) fn register_metrics<T: PrometheusRegister>(
 // if expr does not derive `Display`.
 #[macro_export]
 macro_rules! metric_set {
-	($self:ident, $m:ident, $v:expr) => {{
+	($metrics:expr, $m:ident, $v:expr) => {{
 		let val: u64 = format!("{}", $v).parse().unwrap();
 
-		if let Some(metrics) = $self.metrics.as_ref() {
+		if let Some(metrics) = $metrics.as_ref() {
 			metrics.$m.set(val);
 		}
 	}};
@@ -316,8 +316,8 @@ macro_rules! metric_set {
 
 #[macro_export]
 macro_rules! metric_inc {
-	($self:ident, $m:ident) => {{
-		if let Some(metrics) = $self.metrics.as_ref() {
+	($metrics:expr, $m:ident) => {{
+		if let Some(metrics) = $metrics.as_ref() {
 			metrics.$m.inc();
 		}
 	}};
@@ -325,8 +325,8 @@ macro_rules! metric_inc {
 
 #[macro_export]
 macro_rules! metric_get {
-	($self:ident, $m:ident) => {{
-		$self.metrics.as_ref().map(|metrics| metrics.$m.clone())
+	($metrics:expr, $m:ident) => {{
+		$metrics.as_ref().map(|metrics| metrics.$m.clone())
 	}};
 }
 
diff --git a/substrate/client/consensus/beefy/src/tests.rs b/substrate/client/consensus/beefy/src/tests.rs
index 17065432564..7e61e877c1d 100644
--- a/substrate/client/consensus/beefy/src/tests.rs
+++ b/substrate/client/consensus/beefy/src/tests.rs
@@ -28,10 +28,12 @@ use crate::{
 		},
 		request_response::{on_demand_justifications_protocol_config, BeefyJustifsRequestHandler},
 	},
+	error::Error,
 	gossip_protocol_name,
 	justification::*,
-	load_or_init_voter_state, wait_for_runtime_pallet, BeefyRPCLinks, BeefyVoterLinks, KnownPeers,
-	PersistedState,
+	wait_for_runtime_pallet,
+	worker::{BeefyWorkerBase, PersistedState},
+	BeefyRPCLinks, BeefyVoterLinks, KnownPeers,
 };
 use futures::{future, stream::FuturesUnordered, Future, FutureExt, StreamExt};
 use parking_lot::Mutex;
@@ -363,7 +365,7 @@ async fn voter_init_setup(
 	net: &mut BeefyTestNet,
 	finality: &mut futures::stream::Fuse<FinalityNotifications<Block>>,
 	api: &TestApi,
-) -> sp_blockchain::Result<PersistedState<Block>> {
+) -> Result<PersistedState<Block>, Error> {
 	let backend = net.peer(0).client().as_backend();
 	let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
 	let (gossip_validator, _) = GossipValidator::new(known_peers);
@@ -378,7 +380,14 @@ async fn voter_init_setup(
 	);
 	let (beefy_genesis, best_grandpa) =
 		wait_for_runtime_pallet(api, &mut gossip_engine, finality).await.unwrap();
-	load_or_init_voter_state(&*backend, api, beefy_genesis, best_grandpa, 1).await
+	let mut worker_base = BeefyWorkerBase {
+		backend,
+		runtime: Arc::new(api.clone()),
+		key_store: None.into(),
+		metrics: None,
+		_phantom: Default::default(),
+	};
+	worker_base.load_or_init_state(beefy_genesis, best_grandpa, 1).await
 }
 
 // Spawns beefy voters. Returns a future to spawn on the runtime.
@@ -1072,9 +1081,15 @@ async fn should_initialize_voter_at_custom_genesis() {
 	);
 	let (beefy_genesis, best_grandpa) =
 		wait_for_runtime_pallet(&api, &mut gossip_engine, &mut finality).await.unwrap();
-	let persisted_state = load_or_init_voter_state(&*backend, &api, beefy_genesis, best_grandpa, 1)
-		.await
-		.unwrap();
+	let mut worker_base = BeefyWorkerBase {
+		backend: backend.clone(),
+		runtime: Arc::new(api),
+		key_store: None.into(),
+		metrics: None,
+		_phantom: Default::default(),
+	};
+	let persisted_state =
+		worker_base.load_or_init_state(beefy_genesis, best_grandpa, 1).await.unwrap();
 
 	// Test initialization at session boundary.
 	// verify voter initialized with single session starting at block `custom_pallet_genesis` (7)
@@ -1107,10 +1122,15 @@ async fn should_initialize_voter_at_custom_genesis() {
 	// the network state persists and uses the old `GossipEngine` initialized for `peer(0)`
 	let (beefy_genesis, best_grandpa) =
 		wait_for_runtime_pallet(&api, &mut gossip_engine, &mut finality).await.unwrap();
+	let mut worker_base = BeefyWorkerBase {
+		backend: backend.clone(),
+		runtime: Arc::new(api),
+		key_store: None.into(),
+		metrics: None,
+		_phantom: Default::default(),
+	};
 	let new_persisted_state =
-		load_or_init_voter_state(&*backend, &api, beefy_genesis, best_grandpa, 1)
-			.await
-			.unwrap();
+		worker_base.load_or_init_state(beefy_genesis, best_grandpa, 1).await.unwrap();
 
 	// verify voter initialized with single session starting at block `new_pallet_genesis` (10)
 	let sessions = new_persisted_state.voting_oracle().sessions();
@@ -1285,6 +1305,104 @@ async fn should_initialize_voter_at_custom_genesis_when_state_unavailable() {
 	assert_eq!(state, persisted_state);
 }
 
+#[tokio::test]
+async fn should_catch_up_when_loading_saved_voter_state() {
+	let keys = &[BeefyKeyring::Alice];
+	let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap();
+	let mut net = BeefyTestNet::new(1);
+	let backend = net.peer(0).client().as_backend();
+
+	// push 30 blocks with `AuthorityChange` digests every 10 blocks
+	let hashes = net.generate_blocks_and_sync(30, 10, &validator_set, false).await;
+	let mut finality = net.peer(0).client().as_client().finality_notification_stream().fuse();
+	// finalize 13 without justifications
+	net.peer(0).client().as_client().finalize_block(hashes[13], None).unwrap();
+
+	let api = TestApi::with_validator_set(&validator_set);
+
+	// load persistent state - nothing in DB, should init at genesis
+	//
+	// NOTE: code from `voter_init_setup()` is moved here because the new network event system
+	// doesn't allow creating a new `GossipEngine` as the notification handle is consumed by the
+	// first `GossipEngine`
+	let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
+	let (gossip_validator, _) = GossipValidator::new(known_peers);
+	let gossip_validator = Arc::new(gossip_validator);
+	let mut gossip_engine = sc_network_gossip::GossipEngine::new(
+		net.peer(0).network_service().clone(),
+		net.peer(0).sync_service().clone(),
+		net.peer(0).take_notification_service(&beefy_gossip_proto_name()).unwrap(),
+		"/beefy/whatever",
+		gossip_validator,
+		None,
+	);
+	let (beefy_genesis, best_grandpa) =
+		wait_for_runtime_pallet(&api, &mut gossip_engine, &mut finality).await.unwrap();
+	let mut worker_base = BeefyWorkerBase {
+		backend: backend.clone(),
+		runtime: Arc::new(api.clone()),
+		key_store: None.into(),
+		metrics: None,
+		_phantom: Default::default(),
+	};
+	let persisted_state =
+		worker_base.load_or_init_state(beefy_genesis, best_grandpa, 1).await.unwrap();
+
+	// Test initialization at session boundary.
+	// verify voter initialized with two sessions starting at blocks 1 and 10
+	let sessions = persisted_state.voting_oracle().sessions();
+	assert_eq!(sessions.len(), 2);
+	assert_eq!(sessions[0].session_start(), 1);
+	assert_eq!(sessions[1].session_start(), 10);
+	let rounds = persisted_state.active_round().unwrap();
+	assert_eq!(rounds.session_start(), 1);
+	assert_eq!(rounds.validator_set_id(), validator_set.id());
+
+	// verify next vote target is mandatory block 1
+	assert_eq!(persisted_state.best_beefy(), 0);
+	assert_eq!(persisted_state.best_grandpa_number(), 13);
+	assert_eq!(persisted_state.voting_oracle().voting_target(), Some(1));
+
+	// verify state also saved to db
+	assert!(verify_persisted_version(&*backend));
+	let state = load_persistent(&*backend).unwrap().unwrap();
+	assert_eq!(state, persisted_state);
+
+	// now let's consider that the node goes offline, and then it restarts after a while
+
+	// finalize 25 without justifications
+	net.peer(0).client().as_client().finalize_block(hashes[25], None).unwrap();
+	// load persistent state - state preset in DB
+	// the network state persists and uses the old `GossipEngine` initialized for `peer(0)`
+	let (beefy_genesis, best_grandpa) =
+		wait_for_runtime_pallet(&api, &mut gossip_engine, &mut finality).await.unwrap();
+	let mut worker_base = BeefyWorkerBase {
+		backend: backend.clone(),
+		runtime: Arc::new(api),
+		key_store: None.into(),
+		metrics: None,
+		_phantom: Default::default(),
+	};
+	let persisted_state =
+		worker_base.load_or_init_state(beefy_genesis, best_grandpa, 1).await.unwrap();
+
+	// Verify voter initialized with old sessions plus a new one starting at block 20.
+	// There shouldn't be any duplicates.
+	let sessions = persisted_state.voting_oracle().sessions();
+	assert_eq!(sessions.len(), 3);
+	assert_eq!(sessions[0].session_start(), 1);
+	assert_eq!(sessions[1].session_start(), 10);
+	assert_eq!(sessions[2].session_start(), 20);
+	let rounds = persisted_state.active_round().unwrap();
+	assert_eq!(rounds.session_start(), 1);
+	assert_eq!(rounds.validator_set_id(), validator_set.id());
+
+	// verify next vote target is mandatory block 1
+	assert_eq!(persisted_state.best_beefy(), 0);
+	assert_eq!(persisted_state.best_grandpa_number(), 25);
+	assert_eq!(persisted_state.voting_oracle().voting_target(), Some(1));
+}
+
 #[tokio::test]
 async fn beefy_finalizing_after_pallet_genesis() {
 	sp_tracing::try_init_simple();
diff --git a/substrate/client/consensus/beefy/src/worker.rs b/substrate/client/consensus/beefy/src/worker.rs
index 26f940f05f1..e67e3e0f76a 100644
--- a/substrate/client/consensus/beefy/src/worker.rs
+++ b/substrate/client/consensus/beefy/src/worker.rs
@@ -17,18 +17,20 @@
 // along with this program. If not, see <https://www.gnu.org/licenses/>.
 
 use crate::{
+	aux_schema,
 	communication::{
 		gossip::{proofs_topic, votes_topic, GossipFilterCfg, GossipMessage, GossipValidator},
 		peers::PeerReport,
 		request_response::outgoing_requests_engine::{OnDemandJustificationsEngine, ResponseInfo},
 	},
 	error::Error,
+	expect_validator_set,
 	justification::BeefyVersionedFinalityProof,
 	keystore::{BeefyKeystore, BeefySignatureHasher},
 	metric_inc, metric_set,
 	metrics::VoterMetrics,
 	round::{Rounds, VoteImportResult},
-	BeefyVoterLinks, LOG_TARGET,
+	wait_for_parent_header, BeefyVoterLinks, HEADER_SYNC_DELAY, LOG_TARGET,
 };
 use codec::{Codec, Decode, DecodeAll, Encode};
 use futures::{stream::Fuse, FutureExt, StreamExt};
@@ -38,6 +40,7 @@ use sc_network_gossip::GossipEngine;
 use sc_utils::{mpsc::TracingUnboundedReceiver, notification::NotificationReceiver};
 use sp_api::ProvideRuntimeApi;
 use sp_arithmetic::traits::{AtLeast32Bit, Saturating};
+use sp_blockchain::Backend as BlockchainBackend;
 use sp_consensus::SyncOracle;
 use sp_consensus_beefy::{
 	check_equivocation_proof,
@@ -53,6 +56,7 @@ use sp_runtime::{
 use std::{
 	collections::{BTreeMap, BTreeSet, VecDeque},
 	fmt::Debug,
+	marker::PhantomData,
 	sync::Arc,
 };
 
@@ -176,6 +180,13 @@ impl<B: Block> VoterOracle<B> {
 		}
 	}
 
+	// Check if an observed session can be added to the Oracle.
+	fn can_add_session(&self, session_start: NumberFor<B>) -> bool {
+		let latest_known_session_start =
+			self.sessions.back().map(|session| session.session_start());
+		Some(session_start) > latest_known_session_start
+	}
+
 	/// Add new observed session to the Oracle.
 	pub fn add_session(&mut self, rounds: Rounds<B>) {
 		self.sessions.push_back(rounds);
@@ -236,12 +247,10 @@ impl<B: Block> VoterOracle<B> {
 	/// Return `Some(number)` if we should be voting on block `number`,
 	/// return `None` if there is no block we should vote on.
 	pub fn voting_target(&self) -> Option<NumberFor<B>> {
-		let rounds = if let Some(r) = self.sessions.front() {
-			r
-		} else {
+		let rounds = self.sessions.front().or_else(|| {
 			debug!(target: LOG_TARGET, "🥩 No voting round started");
-			return None
-		};
+			None
+		})?;
 		let best_grandpa = *self.best_grandpa_block_header.number();
 		let best_beefy = self.best_beefy_block;
 
@@ -327,50 +336,171 @@ pub(crate) struct BeefyComms<B: Block> {
 	pub on_demand_justifications: OnDemandJustificationsEngine<B>,
 }
 
-/// A BEEFY worker plays the BEEFY protocol
-pub(crate) struct BeefyWorker<B: Block, BE, P, RuntimeApi, S> {
+pub(crate) struct BeefyWorkerBase<B: Block, BE, RuntimeApi> {
 	// utilities
 	pub backend: Arc<BE>,
-	pub payload_provider: P,
 	pub runtime: Arc<RuntimeApi>,
-	pub sync: Arc<S>,
 	pub key_store: BeefyKeystore,
 
-	// communication (created once, but returned and reused if worker is restarted/reinitialized)
-	pub comms: BeefyComms<B>,
-
-	// channels
-	/// Links between the block importer, the background voter and the RPC layer.
-	pub links: BeefyVoterLinks<B>,
-
-	// voter state
 	/// BEEFY client metrics.
 	pub metrics: Option<VoterMetrics>,
-	/// Buffer holding justifications for future processing.
-	pub pending_justifications: BTreeMap<NumberFor<B>, BeefyVersionedFinalityProof<B>>,
-	/// Persisted voter state.
-	pub persisted_state: PersistedState<B>,
+
+	pub _phantom: PhantomData<B>,
 }
 
-impl<B, BE, P, R, S> BeefyWorker<B, BE, P, R, S>
+impl<B, BE, R> BeefyWorkerBase<B, BE, R>
 where
 	B: Block + Codec,
 	BE: Backend<B>,
-	P: PayloadProvider<B>,
-	S: SyncOracle,
 	R: ProvideRuntimeApi<B>,
 	R::Api: BeefyApi<B, AuthorityId>,
 {
-	fn best_grandpa_block(&self) -> NumberFor<B> {
-		*self.persisted_state.voting_oracle.best_grandpa_block_header.number()
-	}
+	// If no persisted state present, walk back the chain from first GRANDPA notification to either:
+	//  - latest BEEFY finalized block, or if none found on the way,
+	//  - BEEFY pallet genesis;
+	// Enqueue any BEEFY mandatory blocks (session boundaries) found on the way, for voter to
+	// finalize.
+	async fn init_state(
+		&self,
+		beefy_genesis: NumberFor<B>,
+		best_grandpa: <B as Block>::Header,
+		min_block_delta: u32,
+	) -> Result<PersistedState<B>, Error> {
+		let blockchain = self.backend.blockchain();
 
-	fn voting_oracle(&self) -> &VoterOracle<B> {
-		&self.persisted_state.voting_oracle
+		let beefy_genesis = self
+			.runtime
+			.runtime_api()
+			.beefy_genesis(best_grandpa.hash())
+			.ok()
+			.flatten()
+			.filter(|genesis| *genesis == beefy_genesis)
+			.ok_or_else(|| Error::Backend("BEEFY pallet expected to be active.".into()))?;
+		// Walk back the imported blocks and initialize voter either, at the last block with
+		// a BEEFY justification, or at pallet genesis block; voter will resume from there.
+		let mut sessions = VecDeque::new();
+		let mut header = best_grandpa.clone();
+		let state = loop {
+			if let Some(true) = blockchain
+				.justifications(header.hash())
+				.ok()
+				.flatten()
+				.map(|justifs| justifs.get(BEEFY_ENGINE_ID).is_some())
+			{
+				info!(
+					target: LOG_TARGET,
+					"🥩 Initialize BEEFY voter at last BEEFY finalized block: {:?}.",
+					*header.number()
+				);
+				let best_beefy = *header.number();
+				// If no session boundaries detected so far, just initialize new rounds here.
+				if sessions.is_empty() {
+					let active_set =
+						expect_validator_set(self.runtime.as_ref(), self.backend.as_ref(), &header)
+							.await?;
+					let mut rounds = Rounds::new(best_beefy, active_set);
+					// Mark the round as already finalized.
+					rounds.conclude(best_beefy);
+					sessions.push_front(rounds);
+				}
+				let state = PersistedState::checked_new(
+					best_grandpa,
+					best_beefy,
+					sessions,
+					min_block_delta,
+					beefy_genesis,
+				)
+				.ok_or_else(|| Error::Backend("Invalid BEEFY chain".into()))?;
+				break state
+			}
+
+			if *header.number() == beefy_genesis {
+				// We've reached BEEFY genesis, initialize voter here.
+				let genesis_set =
+					expect_validator_set(self.runtime.as_ref(), self.backend.as_ref(), &header)
+						.await?;
+				info!(
+					target: LOG_TARGET,
+					"🥩 Loading BEEFY voter state from genesis on what appears to be first startup. \
+					Starting voting rounds at block {:?}, genesis validator set {:?}.",
+					beefy_genesis,
+					genesis_set,
+				);
+
+				sessions.push_front(Rounds::new(beefy_genesis, genesis_set));
+				break PersistedState::checked_new(
+					best_grandpa,
+					Zero::zero(),
+					sessions,
+					min_block_delta,
+					beefy_genesis,
+				)
+				.ok_or_else(|| Error::Backend("Invalid BEEFY chain".into()))?
+			}
+
+			if let Some(active) = find_authorities_change::<B>(&header) {
+				info!(
+					target: LOG_TARGET,
+					"🥩 Marking block {:?} as BEEFY Mandatory.",
+					*header.number()
+				);
+				sessions.push_front(Rounds::new(*header.number(), active));
+			}
+
+			// Move up the chain.
+			header = wait_for_parent_header(blockchain, header, HEADER_SYNC_DELAY).await?;
+		};
+
+		aux_schema::write_current_version(self.backend.as_ref())?;
+		aux_schema::write_voter_state(self.backend.as_ref(), &state)?;
+		Ok(state)
 	}
 
-	fn active_rounds(&mut self) -> Result<&Rounds<B>, Error> {
-		self.persisted_state.voting_oracle.active_rounds()
+	pub async fn load_or_init_state(
+		&mut self,
+		beefy_genesis: NumberFor<B>,
+		best_grandpa: <B as Block>::Header,
+		min_block_delta: u32,
+	) -> Result<PersistedState<B>, Error> {
+		// Initialize voter state from AUX DB if compatible.
+		if let Some(mut state) = crate::aux_schema::load_persistent(self.backend.as_ref())?
+			// Verify state pallet genesis matches runtime.
+			.filter(|state| state.pallet_genesis() == beefy_genesis)
+		{
+			// Overwrite persisted state with current best GRANDPA block.
+			state.set_best_grandpa(best_grandpa.clone());
+			// Overwrite persisted data with newly provided `min_block_delta`.
+			state.set_min_block_delta(min_block_delta);
+			info!(target: LOG_TARGET, "🥩 Loading BEEFY voter state from db: {:?}.", state);
+
+			// Make sure that all the headers that we need have been synced.
+			let mut new_sessions = vec![];
+			let mut header = best_grandpa.clone();
+			while *header.number() > state.best_beefy() {
+				if state.voting_oracle.can_add_session(*header.number()) {
+					if let Some(active) = find_authorities_change::<B>(&header) {
+						new_sessions.push((active, *header.number()));
+					}
+				}
+				header =
+					wait_for_parent_header(self.backend.blockchain(), header, HEADER_SYNC_DELAY)
+						.await?;
+			}
+
+			// Make sure we didn't miss any sessions during node restart.
+			for (validator_set, new_session_start) in new_sessions.drain(..).rev() {
+				info!(
+					target: LOG_TARGET,
+					"🥩 Handling missed BEEFY session after node restart: {:?}.",
+					new_session_start
+				);
+				self.init_session_at(&mut state, validator_set, new_session_start);
+			}
+			return Ok(state)
+		}
+
+		// No valid voter-state persisted, re-initialize from pallet genesis.
+		self.init_state(beefy_genesis, best_grandpa, min_block_delta).await
 	}
 
 	/// Verify `active` validator set for `block` against the key store
@@ -394,7 +524,7 @@ where
 		if store.intersection(&active).count() == 0 {
 			let msg = "no authority public key found in store".to_string();
 			debug!(target: LOG_TARGET, "🥩 for block {:?} {}", block, msg);
-			metric_inc!(self, beefy_no_authority_found_in_store);
+			metric_inc!(self.metrics, beefy_no_authority_found_in_store);
 			Err(Error::Keystore(msg))
 		} else {
 			Ok(())
@@ -404,13 +534,14 @@ where
 	/// Handle session changes by starting new voting round for mandatory blocks.
 	fn init_session_at(
 		&mut self,
+		persisted_state: &mut PersistedState<B>,
 		validator_set: ValidatorSet<AuthorityId>,
 		new_session_start: NumberFor<B>,
 	) {
 		debug!(target: LOG_TARGET, "🥩 New active validator set: {:?}", validator_set);
 
 		// BEEFY should finalize a mandatory block during each session.
-		if let Ok(active_session) = self.active_rounds() {
+		if let Ok(active_session) = persisted_state.voting_oracle.active_rounds() {
 			if !active_session.mandatory_done() {
 				debug!(
 					target: LOG_TARGET,
@@ -418,7 +549,7 @@ where
 					validator_set.id(),
 					active_session.validator_set_id(),
 				);
-				metric_inc!(self, beefy_lagging_sessions);
+				metric_inc!(self.metrics, beefy_lagging_sessions);
 			}
 		}
 
@@ -428,10 +559,10 @@ where
 		}
 
 		let id = validator_set.id();
-		self.persisted_state
+		persisted_state
 			.voting_oracle
 			.add_session(Rounds::new(new_session_start, validator_set));
-		metric_set!(self, beefy_validator_set_id, id);
+		metric_set!(self.metrics, beefy_validator_set_id, id);
 		info!(
 			target: LOG_TARGET,
 			"🥩 New Rounds for validator set id: {:?} with session_start {:?}",
@@ -439,6 +570,61 @@ where
 			new_session_start
 		);
 	}
+}
+
+/// A BEEFY worker/voter that follows the BEEFY protocol
+pub(crate) struct BeefyWorker<B: Block, BE, P, RuntimeApi, S> {
+	pub base: BeefyWorkerBase<B, BE, RuntimeApi>,
+
+	// utils
+	pub payload_provider: P,
+	pub sync: Arc<S>,
+
+	// communication (created once, but returned and reused if worker is restarted/reinitialized)
+	pub comms: BeefyComms<B>,
+
+	// channels
+	/// Links between the block importer, the background voter and the RPC layer.
+	pub links: BeefyVoterLinks<B>,
+
+	// voter state
+	/// Buffer holding justifications for future processing.
+	pub pending_justifications: BTreeMap<NumberFor<B>, BeefyVersionedFinalityProof<B>>,
+	/// Persisted voter state.
+	pub persisted_state: PersistedState<B>,
+}
+
+impl<B, BE, P, R, S> BeefyWorker<B, BE, P, R, S>
+where
+	B: Block + Codec,
+	BE: Backend<B>,
+	P: PayloadProvider<B>,
+	S: SyncOracle,
+	R: ProvideRuntimeApi<B>,
+	R::Api: BeefyApi<B, AuthorityId>,
+{
+	fn best_grandpa_block(&self) -> NumberFor<B> {
+		*self.persisted_state.voting_oracle.best_grandpa_block_header.number()
+	}
+
+	fn voting_oracle(&self) -> &VoterOracle<B> {
+		&self.persisted_state.voting_oracle
+	}
+
+	#[cfg(test)]
+	fn active_rounds(&mut self) -> Result<&Rounds<B>, Error> {
+		self.persisted_state.voting_oracle.active_rounds()
+	}
+
+	/// Handle session changes by starting new voting round for mandatory blocks.
+	fn init_session_at(
+		&mut self,
+		validator_set: ValidatorSet<AuthorityId>,
+		new_session_start: NumberFor<B>,
+	) {
+		self.base
+			.init_session_at(&mut self.persisted_state, validator_set, new_session_start);
+	}
 
 	fn handle_finality_notification(
 		&mut self,
@@ -452,7 +638,8 @@ where
 		);
 		let header = &notification.header;
 
-		self.runtime
+		self.base
+			.runtime
 			.runtime_api()
 			.beefy_genesis(header.hash())
 			.ok()
@@ -466,7 +653,7 @@ where
 			self.persisted_state.set_best_grandpa(header.clone());
 
 			// Check all (newly) finalized blocks for new session(s).
-			let backend = self.backend.clone();
+			let backend = self.base.backend.clone();
 			for header in notification
 				.tree_route
 				.iter()
@@ -485,7 +672,7 @@ where
 			}
 
 			if new_session_added {
-				crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state)
+				crate::aux_schema::write_voter_state(&*self.base.backend, &self.persisted_state)
 					.map_err(|e| Error::Backend(e.to_string()))?;
 			}
 
@@ -519,7 +706,7 @@ where
 						true,
 					);
 				},
-			RoundAction::Drop => metric_inc!(self, beefy_stale_votes),
+			RoundAction::Drop => metric_inc!(self.base.metrics, beefy_stale_votes),
 			RoundAction::Enqueue => error!(target: LOG_TARGET, "🥩 unexpected vote: {:?}.", vote),
 		};
 		Ok(())
@@ -539,23 +726,23 @@ where
 		match self.voting_oracle().triage_round(block_num)? {
 			RoundAction::Process => {
 				debug!(target: LOG_TARGET, "🥩 Process justification for round: {:?}.", block_num);
-				metric_inc!(self, beefy_imported_justifications);
+				metric_inc!(self.base.metrics, beefy_imported_justifications);
 				self.finalize(justification)?
 			},
 			RoundAction::Enqueue => {
 				debug!(target: LOG_TARGET, "🥩 Buffer justification for round: {:?}.", block_num);
 				if self.pending_justifications.len() < MAX_BUFFERED_JUSTIFICATIONS {
 					self.pending_justifications.entry(block_num).or_insert(justification);
-					metric_inc!(self, beefy_buffered_justifications);
+					metric_inc!(self.base.metrics, beefy_buffered_justifications);
 				} else {
-					metric_inc!(self, beefy_buffered_justifications_dropped);
+					metric_inc!(self.base.metrics, beefy_buffered_justifications_dropped);
 					warn!(
 						target: LOG_TARGET,
 						"🥩 Buffer justification dropped for round: {:?}.", block_num
 					);
 				}
 			},
-			RoundAction::Drop => metric_inc!(self, beefy_stale_justifications),
+			RoundAction::Drop => metric_inc!(self.base.metrics, beefy_stale_justifications),
 		};
 		Ok(())
 	}
@@ -577,7 +764,7 @@ where
 				// We created the `finality_proof` and know to be valid.
 				// New state is persisted after finalization.
 				self.finalize(finality_proof.clone())?;
-				metric_inc!(self, beefy_good_votes_processed);
+				metric_inc!(self.base.metrics, beefy_good_votes_processed);
 				return Ok(Some(finality_proof))
 			},
 			VoteImportResult::Ok => {
@@ -588,17 +775,20 @@ where
 					.map(|(mandatory_num, _)| mandatory_num == block_number)
 					.unwrap_or(false)
 				{
-					crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state)
-						.map_err(|e| Error::Backend(e.to_string()))?;
+					crate::aux_schema::write_voter_state(
+						&*self.base.backend,
+						&self.persisted_state,
+					)
+					.map_err(|e| Error::Backend(e.to_string()))?;
 				}
-				metric_inc!(self, beefy_good_votes_processed);
+				metric_inc!(self.base.metrics, beefy_good_votes_processed);
 			},
 			VoteImportResult::Equivocation(proof) => {
-				metric_inc!(self, beefy_equivocation_votes);
+				metric_inc!(self.base.metrics, beefy_equivocation_votes);
 				self.report_equivocation(proof)?;
 			},
-			VoteImportResult::Invalid => metric_inc!(self, beefy_invalid_votes),
-			VoteImportResult::Stale => metric_inc!(self, beefy_stale_votes),
+			VoteImportResult::Invalid => metric_inc!(self.base.metrics, beefy_invalid_votes),
+			VoteImportResult::Stale => metric_inc!(self.base.metrics, beefy_stale_votes),
 		};
 		Ok(None)
 	}
@@ -625,14 +815,15 @@ where
 
 		// Set new best BEEFY block number.
 		self.persisted_state.set_best_beefy(block_num);
-		crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state)
+		crate::aux_schema::write_voter_state(&*self.base.backend, &self.persisted_state)
 			.map_err(|e| Error::Backend(e.to_string()))?;
 
-		metric_set!(self, beefy_best_block, block_num);
+		metric_set!(self.base.metrics, beefy_best_block, block_num);
 
 		self.comms.on_demand_justifications.cancel_requests_older_than(block_num);
 
 		if let Err(e) = self
+			.base
 			.backend
 			.blockchain()
 			.expect_block_hash_from_id(&BlockId::Number(block_num))
@@ -642,7 +833,8 @@ where
 					.notify(|| Ok::<_, ()>(hash))
 					.expect("forwards closure result; the closure always returns Ok; qed.");
 
-				self.backend
+				self.base
+					.backend
 					.append_justification(hash, (BEEFY_ENGINE_ID, finality_proof.encode()))
 			}) {
 			debug!(
@@ -679,12 +871,16 @@ where
 
 			for (num, justification) in justifs_to_process.into_iter() {
 				debug!(target: LOG_TARGET, "🥩 Handle buffered justification for: {:?}.", num);
-				metric_inc!(self, beefy_imported_justifications);
+				metric_inc!(self.base.metrics, beefy_imported_justifications);
 				if let Err(err) = self.finalize(justification) {
 					error!(target: LOG_TARGET, "🥩 Error finalizing block: {}", err);
 				}
 			}
-			metric_set!(self, beefy_buffered_justifications, self.pending_justifications.len());
+			metric_set!(
+				self.base.metrics,
+				beefy_buffered_justifications,
+				self.pending_justifications.len()
+			);
 		}
 		Ok(())
 	}
@@ -693,7 +889,7 @@ where
 	fn try_to_vote(&mut self) -> Result<(), Error> {
 		// Vote if there's now a new vote target.
 		if let Some(target) = self.voting_oracle().voting_target() {
-			metric_set!(self, beefy_should_vote_on, target);
+			metric_set!(self.base.metrics, beefy_should_vote_on, target);
 			if target > self.persisted_state.best_voted {
 				self.do_vote(target)?;
 			}
@@ -713,6 +909,7 @@ where
 			self.persisted_state.voting_oracle.best_grandpa_block_header.clone()
 		} else {
 			let hash = self
+				.base
 				.backend
 				.blockchain()
 				.expect_block_hash_from_id(&BlockId::Number(target_number))
@@ -724,7 +921,7 @@ where
 					Error::Backend(err_msg)
 				})?;
 
-			self.backend.blockchain().expect_header(hash).map_err(|err| {
+			self.base.backend.blockchain().expect_header(hash).map_err(|err| {
 				let err_msg = format!(
 					"Couldn't get header for block #{:?} ({:?}) (error: {:?}), skipping vote..",
 					target_number, hash, err
@@ -744,7 +941,7 @@ where
 		let rounds = self.persisted_state.voting_oracle.active_rounds_mut()?;
 		let (validators, validator_set_id) = (rounds.validators(), rounds.validator_set_id());
 
-		let authority_id = if let Some(id) = self.key_store.authority_id(validators) {
+		let authority_id = if let Some(id) = self.base.key_store.authority_id(validators) {
 			debug!(target: LOG_TARGET, "🥩 Local authority id: {:?}", id);
 			id
 		} else {
@@ -758,7 +955,7 @@ where
 		let commitment = Commitment { payload, block_number: target_number, validator_set_id };
 		let encoded_commitment = commitment.encode();
 
-		let signature = match self.key_store.sign(&authority_id, &encoded_commitment) {
+		let signature = match self.base.key_store.sign(&authority_id, &encoded_commitment) {
 			Ok(sig) => sig,
 			Err(err) => {
 				warn!(target: LOG_TARGET, "🥩 Error signing commitment: {:?}", err);
@@ -783,7 +980,7 @@ where
 				.gossip_engine
 				.gossip_message(proofs_topic::<B>(), encoded_proof, true);
 		} else {
-			metric_inc!(self, beefy_votes_sent);
+			metric_inc!(self.base.metrics, beefy_votes_sent);
 			debug!(target: LOG_TARGET, "🥩 Sent vote message: {:?}", vote);
 			let encoded_vote = GossipMessage::<B>::Vote(vote).encode();
 			self.comms.gossip_engine.gossip_message(votes_topic::<B>(), encoded_vote, false);
@@ -791,8 +988,8 @@ where
 
 		// Persist state after vote to avoid double voting in case of voter restarts.
 		self.persisted_state.best_voted = target_number;
-		metric_set!(self, beefy_best_voted, target_number);
-		crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state)
+		metric_set!(self.base.metrics, beefy_best_voted, target_number);
+		crate::aux_schema::write_voter_state(&*self.base.backend, &self.persisted_state)
 			.map_err(|e| Error::Backend(e.to_string()))
 	}
 
@@ -966,7 +1163,7 @@ where
 		if !check_equivocation_proof::<_, _, BeefySignatureHasher>(&proof) {
 			debug!(target: LOG_TARGET, "🥩 Skip report for bad equivocation {:?}", proof);
 			return Ok(())
-		} else if let Some(local_id) = self.key_store.authority_id(validators) {
+		} else if let Some(local_id) = self.base.key_store.authority_id(validators) {
 			if offender_id == local_id {
 				debug!(target: LOG_TARGET, "🥩 Skip equivocation report for own equivocation");
 				return Ok(())
@@ -975,6 +1172,7 @@ where
 
 		let number = *proof.round_number();
 		let hash = self
+			.base
 			.backend
 			.blockchain()
 			.expect_block_hash_from_id(&BlockId::Number(number))
@@ -985,7 +1183,7 @@ where
 				);
 				Error::Backend(err_msg)
 			})?;
-		let runtime_api = self.runtime.runtime_api();
+		let runtime_api = self.base.runtime.runtime_api();
 		// generate key ownership proof at that block
 		let key_owner_proof = match runtime_api
 			.generate_key_ownership_proof(hash, validator_set_id, offender_id)
@@ -1002,7 +1200,7 @@ where
 		};
 
 		// submit equivocation report at **best** block
-		let best_block_hash = self.backend.blockchain().info().best_hash;
+		let best_block_hash = self.base.backend.blockchain().info().best_hash;
 		runtime_api
 			.submit_report_equivocation_unsigned_extrinsic(best_block_hash, proof, key_owner_proof)
 			.map_err(Error::RuntimeApi)?;
@@ -1028,7 +1226,7 @@ where
 
 /// Calculate next block number to vote on.
 ///
-/// Return `None` if there is no voteable target yet.
+/// Return `None` if there is no votable target yet.
 fn vote_target<N>(best_grandpa: N, best_beefy: N, session_start: N, min_delta: u32) -> Option<N>
 where
 	N: AtLeast32Bit + Copy + Debug,
@@ -1189,14 +1387,17 @@ pub(crate) mod tests {
 			on_demand_justifications,
 		};
 		BeefyWorker {
-			backend,
+			base: BeefyWorkerBase {
+				backend,
+				runtime: api,
+				key_store: Some(keystore).into(),
+				metrics,
+				_phantom: Default::default(),
+			},
 			payload_provider,
-			runtime: api,
-			key_store: Some(keystore).into(),
+			sync: Arc::new(sync),
 			links,
 			comms,
-			metrics,
-			sync: Arc::new(sync),
 			pending_justifications: BTreeMap::new(),
 			persisted_state,
 		}
@@ -1470,19 +1671,19 @@ pub(crate) mod tests {
 		let mut worker = create_beefy_worker(net.peer(0), &keys[0], 1, validator_set.clone());
 
 		// keystore doesn't contain other keys than validators'
-		assert_eq!(worker.verify_validator_set(&1, &validator_set), Ok(()));
+		assert_eq!(worker.base.verify_validator_set(&1, &validator_set), Ok(()));
 
 		// unknown `Bob` key
 		let keys = &[Keyring::Bob];
 		let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap();
 		let err_msg = "no authority public key found in store".to_string();
 		let expected = Err(Error::Keystore(err_msg));
-		assert_eq!(worker.verify_validator_set(&1, &validator_set), expected);
+		assert_eq!(worker.base.verify_validator_set(&1, &validator_set), expected);
 
 		// worker has no keystore
-		worker.key_store = None.into();
+		worker.base.key_store = None.into();
 		let expected_err = Err(Error::Keystore("no Keystore".into()));
-		assert_eq!(worker.verify_validator_set(&1, &validator_set), expected_err);
+		assert_eq!(worker.base.verify_validator_set(&1, &validator_set), expected_err);
 	}
 
 	#[tokio::test]
@@ -1634,7 +1835,7 @@ pub(crate) mod tests {
 
 		let mut net = BeefyTestNet::new(1);
 		let mut worker = create_beefy_worker(net.peer(0), &keys[0], 1, validator_set.clone());
-		worker.runtime = api_alice.clone();
+		worker.base.runtime = api_alice.clone();
 
 		// let there be a block with num = 1:
 		let _ = net.peer(0).push_blocks(1, false);
-- 
GitLab