diff --git a/prdoc/pr_3435.prdoc b/prdoc/pr_3435.prdoc
new file mode 100644
index 0000000000000000000000000000000000000000..7d7896bff7d405a0a1dd34f09bc2aff0834b9ff2
--- /dev/null
+++ b/prdoc/pr_3435.prdoc
@@ -0,0 +1,16 @@
+# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
+# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json
+
+title: Fix BEEFY-related gossip messages error logs
+
+doc:
+  - audience: Node Operator
+    description: |
+      Added logic to pump the gossip engine while waiting for other things
+      to make sure gossiped messages get consumed (practically discarded
+      until worker is fully initialized).
+      This fixes an issue where node operators saw error logs, and fixes
+      potential RAM bloat when BEEFY initialization takes a long time
+      (i.e. during clean sync).
+crates:
+  - name: sc-consensus-beefy
diff --git a/substrate/client/consensus/beefy/src/communication/gossip.rs b/substrate/client/consensus/beefy/src/communication/gossip.rs
index 8a0b0a74308f1a6d6bf3bae327b87fac6fb6cdde..eb43c9173d751bf75bc4973aca248d80fb68ca48 100644
--- a/substrate/client/consensus/beefy/src/communication/gossip.rs
+++ b/substrate/client/consensus/beefy/src/communication/gossip.rs
@@ -56,6 +56,8 @@ pub(super) enum Action<H> {
 	Keep(H, ReputationChange),
 	// discard, applying cost/benefit to originator.
 	Discard(ReputationChange),
+	// ignore, no cost/benefit applied to originator.
+	DiscardNoReport,
 }
 
 /// An outcome of examining a message.
@@ -68,7 +70,7 @@ enum Consider {
 	/// Message is from the future. Reject.
 	RejectFuture,
 	/// Message cannot be evaluated. Reject.
-	RejectOutOfScope,
+	CannotEvaluate,
 }
 
 /// BEEFY gossip message type that gets encoded and sent on the network.
@@ -168,18 +170,14 @@ impl<B: Block> Filter<B> {
 			.as_ref()
 			.map(|f|
 				// only from current set and only [filter.start, filter.end]
-				if set_id < f.validator_set.id() {
+				if set_id < f.validator_set.id() || round < f.start {
 					Consider::RejectPast
-				} else if set_id > f.validator_set.id() {
-					Consider::RejectFuture
-				} else if round < f.start {
-					Consider::RejectPast
-				} else if round > f.end {
+				} else if set_id > f.validator_set.id() || round > f.end {
 					Consider::RejectFuture
 				} else {
 					Consider::Accept
 				})
-			.unwrap_or(Consider::RejectOutOfScope)
+			.unwrap_or(Consider::CannotEvaluate)
 	}
 
 	/// Return true if `round` is >= than `max(session_start, best_beefy)`,
@@ -199,7 +197,7 @@ impl<B: Block> Filter<B> {
 					Consider::Accept
 				}
 			)
-			.unwrap_or(Consider::RejectOutOfScope)
+			.unwrap_or(Consider::CannotEvaluate)
 	}
 
 	/// Add new _known_ `round` to the set of seen valid justifications.
@@ -244,7 +242,7 @@ where
 	pub(crate) fn new(
 		known_peers: Arc<Mutex<KnownPeers<B>>>,
 	) -> (GossipValidator<B>, TracingUnboundedReceiver<PeerReport>) {
-		let (tx, rx) = tracing_unbounded("mpsc_beefy_gossip_validator", 10_000);
+		let (tx, rx) = tracing_unbounded("mpsc_beefy_gossip_validator", 100_000);
 		let val = GossipValidator {
 			votes_topic: votes_topic::<B>(),
 			justifs_topic: proofs_topic::<B>(),
@@ -289,7 +287,9 @@ where
 			match filter.consider_vote(round, set_id) {
 				Consider::RejectPast => return Action::Discard(cost::OUTDATED_MESSAGE),
 				Consider::RejectFuture => return Action::Discard(cost::FUTURE_MESSAGE),
-				Consider::RejectOutOfScope => return Action::Discard(cost::OUT_OF_SCOPE_MESSAGE),
+				// When we can't evaluate, it's our fault (e.g. filter not initialized yet), we
+				// discard the vote without punishing or rewarding the sending peer.
+				Consider::CannotEvaluate => return Action::DiscardNoReport,
 				Consider::Accept => {},
 			}
 
@@ -330,7 +330,9 @@ where
 			match guard.consider_finality_proof(round, set_id) {
 				Consider::RejectPast => return Action::Discard(cost::OUTDATED_MESSAGE),
 				Consider::RejectFuture => return Action::Discard(cost::FUTURE_MESSAGE),
-				Consider::RejectOutOfScope => return Action::Discard(cost::OUT_OF_SCOPE_MESSAGE),
+				// When we can't evaluate, it's our fault (e.g. filter not initialized yet), we
+				// discard the proof without punishing or rewarding the sending peer.
+				Consider::CannotEvaluate => return Action::DiscardNoReport,
 				Consider::Accept => {},
 			}
 
@@ -357,7 +359,9 @@ where
 						Action::Keep(self.justifs_topic, benefit::VALIDATED_PROOF)
 					}
 				})
-				.unwrap_or(Action::Discard(cost::OUT_OF_SCOPE_MESSAGE))
+				// When we can't evaluate, it's our fault (e.g. filter not initialized yet), we
+				// discard the proof without punishing or rewarding the sending peer.
+				.unwrap_or(Action::DiscardNoReport)
 		};
 		if matches!(action, Action::Keep(_, _)) {
 			self.gossip_filter.write().mark_round_as_proven(round);
@@ -404,6 +408,7 @@ where
 				self.report(*sender, cb);
 				ValidationResult::Discard
 			},
+			Action::DiscardNoReport => ValidationResult::Discard,
 		}
 	}
 
@@ -579,8 +584,8 @@ pub(crate) mod tests {
 		// filter not initialized
 		let res = gv.validate(&mut context, &sender, &encoded);
 		assert!(matches!(res, ValidationResult::Discard));
-		expected_report.cost_benefit = cost::OUT_OF_SCOPE_MESSAGE;
-		assert_eq!(report_stream.try_recv().unwrap(), expected_report);
+		// nothing reported
+		assert!(report_stream.try_recv().is_err());
 
 		gv.update_filter(GossipFilterCfg { start: 0, end: 10, validator_set: &validator_set });
 		// nothing in cache first time
diff --git a/substrate/client/consensus/beefy/src/communication/mod.rs b/substrate/client/consensus/beefy/src/communication/mod.rs
index 3827559057dde856d201ebb9c9ff71a1d7d11f47..6fda63688e6952839c18e1bbc9ff50ff0c5f7c21 100644
--- a/substrate/client/consensus/beefy/src/communication/mod.rs
+++ b/substrate/client/consensus/beefy/src/communication/mod.rs
@@ -90,8 +90,6 @@ mod cost {
 	pub(super) const BAD_SIGNATURE: Rep = Rep::new(-100, "BEEFY: Bad signature");
 	// Message received with vote from voter not in validator set.
 	pub(super) const UNKNOWN_VOTER: Rep = Rep::new(-150, "BEEFY: Unknown voter");
-	// A message received that cannot be evaluated relative to our current state.
-	pub(super) const OUT_OF_SCOPE_MESSAGE: Rep = Rep::new(-500, "BEEFY: Out-of-scope message");
 	// Message containing invalid proof.
 	pub(super) const INVALID_PROOF: Rep = Rep::new(-5000, "BEEFY: Invalid commit");
 	// Reputation cost per signature checked for invalid proof.
diff --git a/substrate/client/consensus/beefy/src/import.rs b/substrate/client/consensus/beefy/src/import.rs
index fc19ecc301422d74d5da5a289ed641810b1022f1..ed8ed68c4e8d0d378728ba87d1ffe726f6c4c11a 100644
--- a/substrate/client/consensus/beefy/src/import.rs
+++ b/substrate/client/consensus/beefy/src/import.rs
@@ -159,7 +159,7 @@ where
 						// The proof is valid and the block is imported and final, we can import.
 						debug!(
 							target: LOG_TARGET,
-							"🥩 import justif {:?} for block number {:?}.", proof, number
+							"🥩 import justif {} for block number {:?}.", proof, number
 						);
 						// Send the justification to the BEEFY voter for processing.
 						self.justification_sender
diff --git a/substrate/client/consensus/beefy/src/lib.rs b/substrate/client/consensus/beefy/src/lib.rs
index 11a105561e47f095f22f1db7db58d9b47ebd5ed5..323af1bc8305c38bf7a34b4690b212954b7b0864 100644
--- a/substrate/client/consensus/beefy/src/lib.rs
+++ b/substrate/client/consensus/beefy/src/lib.rs
@@ -31,7 +31,7 @@ use crate::{
 	import::BeefyBlockImport,
 	metrics::register_metrics,
 };
-use futures::{stream::Fuse, StreamExt};
+use futures::{stream::Fuse, FutureExt, StreamExt};
 use log::{debug, error, info, warn};
 use parking_lot::Mutex;
 use prometheus::Registry;
@@ -40,17 +40,21 @@ use sc_consensus::BlockImport;
 use sc_network::{NetworkRequest, NotificationService, ProtocolName};
 use sc_network_gossip::{GossipEngine, Network as GossipNetwork, Syncing as GossipSyncing};
 use sp_api::ProvideRuntimeApi;
-use sp_blockchain::{
-	Backend as BlockchainBackend, Error as ClientError, HeaderBackend, Result as ClientResult,
-};
+use sp_blockchain::{Backend as BlockchainBackend, HeaderBackend};
 use sp_consensus::{Error as ConsensusError, SyncOracle};
 use sp_consensus_beefy::{
-	ecdsa_crypto::AuthorityId, BeefyApi, MmrRootHash, PayloadProvider, ValidatorSet,
+	ecdsa_crypto::AuthorityId, BeefyApi, ConsensusLog, MmrRootHash, PayloadProvider, ValidatorSet,
+	BEEFY_ENGINE_ID,
 };
 use sp_keystore::KeystorePtr;
 use sp_mmr_primitives::MmrApi;
 use sp_runtime::traits::{Block, Header as HeaderT, NumberFor, Zero};
-use std::{collections::BTreeMap, marker::PhantomData, sync::Arc, time::Duration};
+use std::{
+	collections::{BTreeMap, VecDeque},
+	marker::PhantomData,
+	sync::Arc,
+	time::Duration,
+};
 
 mod aux_schema;
 mod error;
@@ -63,9 +67,19 @@ pub mod communication;
 pub mod import;
 pub mod justification;
 
+use crate::{
+	communication::{gossip::GossipValidator, peers::PeerReport},
+	justification::BeefyVersionedFinalityProof,
+	keystore::BeefyKeystore,
+	metrics::VoterMetrics,
+	round::Rounds,
+	worker::{BeefyWorker, PersistedState},
+};
 pub use communication::beefy_protocol_name::{
 	gossip_protocol_name, justifications_protocol_name as justifs_protocol_name,
 };
+use sc_utils::mpsc::TracingUnboundedReceiver;
+use sp_runtime::generic::OpaqueDigestItemId;
 
 #[cfg(test)]
 mod tests;
@@ -209,6 +223,247 @@ pub struct BeefyParams<B: Block, BE, C, N, P, R, S> {
 	/// Handler for incoming BEEFY justifications requests from a remote peer.
 	pub on_demand_justifications_handler: BeefyJustifsRequestHandler<B, C>,
 }
+/// Helper object holding BEEFY worker communication/gossip components.
+///
+/// These are created once, but will be reused if worker is restarted/reinitialized.
+pub(crate) struct BeefyComms<B: Block> {
+	pub gossip_engine: GossipEngine<B>,
+	pub gossip_validator: Arc<GossipValidator<B>>,
+	pub gossip_report_stream: TracingUnboundedReceiver<PeerReport>,
+	pub on_demand_justifications: OnDemandJustificationsEngine<B>,
+}
+
+/// Helper builder object for building [worker::BeefyWorker].
+///
+/// It has to do it in two steps: initialization and build, because the first step can sleep waiting
+/// for certain chain and backend conditions, and while sleeping we still need to pump the
+/// GossipEngine. Once initialization is done, the GossipEngine (and other pieces) are added to get
+/// the complete [worker::BeefyWorker] object.
+pub(crate) struct BeefyWorkerBuilder<B: Block, BE, RuntimeApi> {
+	// utilities
+	backend: Arc<BE>,
+	runtime: Arc<RuntimeApi>,
+	key_store: BeefyKeystore<AuthorityId>,
+	// voter metrics
+	metrics: Option<VoterMetrics>,
+	persisted_state: PersistedState<B>,
+}
+
+impl<B, BE, R> BeefyWorkerBuilder<B, BE, R>
+where
+	B: Block + codec::Codec,
+	BE: Backend<B>,
+	R: ProvideRuntimeApi<B>,
+	R::Api: BeefyApi<B, AuthorityId>,
+{
+	/// This will wait for the chain to enable BEEFY (if not yet enabled) and also wait for the
+	/// backend to sync all headers required by the voter to build a contiguous chain of mandatory
+	/// justifications. Then it builds the initial voter state using a combination of previously
+	/// persisted state in AUX DB and latest chain information/progress.
+	///
+	/// Returns a sane `BeefyWorkerBuilder` that can build the `BeefyWorker`.
+	pub async fn async_initialize(
+		backend: Arc<BE>,
+		runtime: Arc<R>,
+		key_store: BeefyKeystore<AuthorityId>,
+		metrics: Option<VoterMetrics>,
+		min_block_delta: u32,
+		gossip_validator: Arc<GossipValidator<B>>,
+		finality_notifications: &mut Fuse<FinalityNotifications<B>>,
+	) -> Result<Self, Error> {
+		// Wait for BEEFY pallet to be active before starting voter.
+		let (beefy_genesis, best_grandpa) =
+			wait_for_runtime_pallet(&*runtime, finality_notifications).await?;
+
+		let persisted_state = Self::load_or_init_state(
+			beefy_genesis,
+			best_grandpa,
+			min_block_delta,
+			backend.clone(),
+			runtime.clone(),
+			&key_store,
+			&metrics,
+		)
+		.await?;
+		// Update the gossip validator with the right starting round and set id.
+		persisted_state
+			.gossip_filter_config()
+			.map(|f| gossip_validator.update_filter(f))?;
+
+		Ok(BeefyWorkerBuilder { backend, runtime, key_store, metrics, persisted_state })
+	}
+
+	/// Takes rest of missing pieces as params and builds the `BeefyWorker`.
+	pub fn build<P, S>(
+		self,
+		payload_provider: P,
+		sync: Arc<S>,
+		comms: BeefyComms<B>,
+		links: BeefyVoterLinks<B>,
+		pending_justifications: BTreeMap<NumberFor<B>, BeefyVersionedFinalityProof<B>>,
+	) -> BeefyWorker<B, BE, P, R, S> {
+		BeefyWorker {
+			backend: self.backend,
+			runtime: self.runtime,
+			key_store: self.key_store,
+			metrics: self.metrics,
+			persisted_state: self.persisted_state,
+			payload_provider,
+			sync,
+			comms,
+			links,
+			pending_justifications,
+		}
+	}
+
+	// If no persisted state present, walk back the chain from first GRANDPA notification to either:
+	//  - latest BEEFY finalized block, or if none found on the way,
+	//  - BEEFY pallet genesis;
+	// Enqueue any BEEFY mandatory blocks (session boundaries) found on the way, for voter to
+	// finalize.
+	async fn init_state(
+		beefy_genesis: NumberFor<B>,
+		best_grandpa: <B as Block>::Header,
+		min_block_delta: u32,
+		backend: Arc<BE>,
+		runtime: Arc<R>,
+	) -> Result<PersistedState<B>, Error> {
+		let blockchain = backend.blockchain();
+
+		let beefy_genesis = runtime
+			.runtime_api()
+			.beefy_genesis(best_grandpa.hash())
+			.ok()
+			.flatten()
+			.filter(|genesis| *genesis == beefy_genesis)
+			.ok_or_else(|| Error::Backend("BEEFY pallet expected to be active.".into()))?;
+		// Walk back the imported blocks and initialize voter either, at the last block with
+		// a BEEFY justification, or at pallet genesis block; voter will resume from there.
+		let mut sessions = VecDeque::new();
+		let mut header = best_grandpa.clone();
+		let state = loop {
+			if let Some(true) = blockchain
+				.justifications(header.hash())
+				.ok()
+				.flatten()
+				.map(|justifs| justifs.get(BEEFY_ENGINE_ID).is_some())
+			{
+				debug!(
+					target: LOG_TARGET,
+					"🥩 Initialize BEEFY voter at last BEEFY finalized block: {:?}.",
+					*header.number()
+				);
+				let best_beefy = *header.number();
+				// If no session boundaries detected so far, just initialize new rounds here.
+				if sessions.is_empty() {
+					let active_set =
+						expect_validator_set(runtime.as_ref(), backend.as_ref(), &header).await?;
+					let mut rounds = Rounds::new(best_beefy, active_set);
+					// Mark the round as already finalized.
+					rounds.conclude(best_beefy);
+					sessions.push_front(rounds);
+				}
+				let state = PersistedState::checked_new(
+					best_grandpa,
+					best_beefy,
+					sessions,
+					min_block_delta,
+					beefy_genesis,
+				)
+				.ok_or_else(|| Error::Backend("Invalid BEEFY chain".into()))?;
+				break state
+			}
+
+			if *header.number() == beefy_genesis {
+				// We've reached BEEFY genesis, initialize voter here.
+				let genesis_set =
+					expect_validator_set(runtime.as_ref(), backend.as_ref(), &header).await?;
+				info!(
+					target: LOG_TARGET,
+					"🥩 Loading BEEFY voter state from genesis on what appears to be first startup. \
+					Starting voting rounds at block {:?}, genesis validator set {:?}.",
+					beefy_genesis,
+					genesis_set,
+				);
+
+				sessions.push_front(Rounds::new(beefy_genesis, genesis_set));
+				break PersistedState::checked_new(
+					best_grandpa,
+					Zero::zero(),
+					sessions,
+					min_block_delta,
+					beefy_genesis,
+				)
+				.ok_or_else(|| Error::Backend("Invalid BEEFY chain".into()))?
+			}
+
+			if let Some(active) = find_authorities_change::<B>(&header) {
+				debug!(
+					target: LOG_TARGET,
+					"🥩 Marking block {:?} as BEEFY Mandatory.",
+					*header.number()
+				);
+				sessions.push_front(Rounds::new(*header.number(), active));
+			}
+
+			// Move up the chain.
+			header = wait_for_parent_header(blockchain, header, HEADER_SYNC_DELAY).await?;
+		};
+
+		aux_schema::write_current_version(backend.as_ref())?;
+		aux_schema::write_voter_state(backend.as_ref(), &state)?;
+		Ok(state)
+	}
+
+	async fn load_or_init_state(
+		beefy_genesis: NumberFor<B>,
+		best_grandpa: <B as Block>::Header,
+		min_block_delta: u32,
+		backend: Arc<BE>,
+		runtime: Arc<R>,
+		key_store: &BeefyKeystore<AuthorityId>,
+		metrics: &Option<VoterMetrics>,
+	) -> Result<PersistedState<B>, Error> {
+		// Initialize voter state from AUX DB if compatible.
+		if let Some(mut state) = crate::aux_schema::load_persistent(backend.as_ref())?
+			// Verify state pallet genesis matches runtime.
+			.filter(|state| state.pallet_genesis() == beefy_genesis)
+		{
+			// Overwrite persisted state with current best GRANDPA block.
+			state.set_best_grandpa(best_grandpa.clone());
+			// Overwrite persisted data with newly provided `min_block_delta`.
+			state.set_min_block_delta(min_block_delta);
+			debug!(target: LOG_TARGET, "🥩 Loading BEEFY voter state from db: {:?}.", state);
+
+			// Make sure that all the headers that we need have been synced.
+			let mut new_sessions = vec![];
+			let mut header = best_grandpa.clone();
+			while *header.number() > state.best_beefy() {
+				if state.voting_oracle().can_add_session(*header.number()) {
+					if let Some(active) = find_authorities_change::<B>(&header) {
+						new_sessions.push((active, *header.number()));
+					}
+				}
+				header =
+					wait_for_parent_header(backend.blockchain(), header, HEADER_SYNC_DELAY).await?;
+			}
+
+			// Make sure we didn't miss any sessions during node restart.
+			for (validator_set, new_session_start) in new_sessions.drain(..).rev() {
+				debug!(
+					target: LOG_TARGET,
+					"🥩 Handling missed BEEFY session after node restart: {:?}.",
+					new_session_start
+				);
+				state.init_session_at(new_session_start, validator_set, key_store, metrics);
+			}
+			return Ok(state)
+		}
+
+		// No valid voter-state persisted, re-initialize from pallet genesis.
+		Self::init_state(beefy_genesis, best_grandpa, min_block_delta, backend, runtime).await
+	}
+}
 
 /// Start the BEEFY gadget.
 ///
@@ -277,7 +532,7 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
 		known_peers,
 		prometheus_registry.clone(),
 	);
-	let mut beefy_comms = worker::BeefyComms {
+	let mut beefy_comms = BeefyComms {
 		gossip_engine,
 		gossip_validator,
 		gossip_report_stream,
@@ -287,57 +542,45 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
 	// We re-create and re-run the worker in this loop in order to quickly reinit and resume after
 	// select recoverable errors.
 	loop {
-		// Wait for BEEFY pallet to be active before starting voter.
-		let (beefy_genesis, best_grandpa) = match wait_for_runtime_pallet(
-			&*runtime,
-			&mut beefy_comms.gossip_engine,
-			&mut finality_notifications,
-		)
-		.await
-		{
-			Ok(res) => res,
-			Err(e) => {
-				error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e);
-				return
-			},
-		};
-
-		let mut worker_base = worker::BeefyWorkerBase {
-			backend: backend.clone(),
-			runtime: runtime.clone(),
-			key_store: key_store.clone().into(),
-			metrics: metrics.clone(),
-			_phantom: Default::default(),
-		};
-
-		let persisted_state = match worker_base
-			.load_or_init_state(beefy_genesis, best_grandpa, min_block_delta)
-			.await
-		{
-			Ok(state) => state,
-			Err(e) => {
-				error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e);
-				return
-			},
+		// Make sure to pump gossip engine while waiting for initialization conditions.
+		let worker_builder = loop {
+			futures::select! {
+				builder_init_result = BeefyWorkerBuilder::async_initialize(
+					backend.clone(),
+					runtime.clone(),
+					key_store.clone().into(),
+					metrics.clone(),
+					min_block_delta,
+					beefy_comms.gossip_validator.clone(),
+					&mut finality_notifications,
+				).fuse() => {
+					match builder_init_result {
+						Ok(builder) => break builder,
+						Err(e) => {
+							error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", e);
+							return
+						},
+					}
+				},
+				// Pump peer reports
+				_ = &mut beefy_comms.gossip_report_stream.next() => {
+					continue
+				},
+				// Pump gossip engine.
+				_ = &mut beefy_comms.gossip_engine => {
+					error!(target: LOG_TARGET, "🥩 Gossip engine has unexpectedly terminated.");
+					return
+				}
+			}
 		};
-		// Update the gossip validator with the right starting round and set id.
-		if let Err(e) = persisted_state
-			.gossip_filter_config()
-			.map(|f| beefy_comms.gossip_validator.update_filter(f))
-		{
-			error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e);
-			return
-		}
 
-		let worker = worker::BeefyWorker {
-			base: worker_base,
-			payload_provider: payload_provider.clone(),
-			sync: sync.clone(),
-			comms: beefy_comms,
-			links: links.clone(),
-			pending_justifications: BTreeMap::new(),
-			persisted_state,
-		};
+		let worker = worker_builder.build(
+			payload_provider.clone(),
+			sync.clone(),
+			beefy_comms,
+			links.clone(),
+			BTreeMap::new(),
+		);
 
 		match futures::future::select(
 			Box::pin(worker.run(&mut block_import_justif, &mut finality_notifications)),
@@ -404,9 +647,8 @@ where
 /// Should be called only once during worker initialization.
 async fn wait_for_runtime_pallet<B, R>(
 	runtime: &R,
-	mut gossip_engine: &mut GossipEngine<B>,
 	finality: &mut Fuse<FinalityNotifications<B>>,
-) -> ClientResult<(NumberFor<B>, <B as Block>::Header)>
+) -> Result<(NumberFor<B>, <B as Block>::Header), Error>
 where
 	B: Block,
 	R: ProvideRuntimeApi<B>,
@@ -414,33 +656,24 @@ where
 {
 	info!(target: LOG_TARGET, "🥩 BEEFY gadget waiting for BEEFY pallet to become available...");
 	loop {
-		futures::select! {
-			notif = finality.next() => {
-				let notif = match notif {
-					Some(notif) => notif,
-					None => break
-				};
-				let at = notif.header.hash();
-				if let Some(start) = runtime.runtime_api().beefy_genesis(at).ok().flatten() {
-					if *notif.header.number() >= start {
-						// Beefy pallet available, return header for best grandpa at the time.
-						info!(
-							target: LOG_TARGET,
-							"🥩 BEEFY pallet available: block {:?} beefy genesis {:?}",
-							notif.header.number(), start
-						);
-						return Ok((start, notif.header))
-					}
-				}
-			},
-			_ = gossip_engine => {
-				break
+		let notif = finality.next().await.ok_or_else(|| {
+			let err_msg = "🥩 Finality stream has unexpectedly terminated.".into();
+			error!(target: LOG_TARGET, "{}", err_msg);
+			Error::Backend(err_msg)
+		})?;
+		let at = notif.header.hash();
+		if let Some(start) = runtime.runtime_api().beefy_genesis(at).ok().flatten() {
+			if *notif.header.number() >= start {
+				// Beefy pallet available, return header for best grandpa at the time.
+				info!(
+					target: LOG_TARGET,
+					"🥩 BEEFY pallet available: block {:?} beefy genesis {:?}",
+					notif.header.number(), start
+				);
+				return Ok((start, notif.header))
 			}
 		}
 	}
-	let err_msg = "🥩 Gossip engine has unexpectedly terminated.".into();
-	error!(target: LOG_TARGET, "{}", err_msg);
-	Err(ClientError::Backend(err_msg))
 }
 
 /// Provides validator set active `at_header`. It tries to get it from state, otherwise falls
@@ -474,7 +707,7 @@ where
 		if let Ok(Some(active)) = runtime.runtime_api().validator_set(header.hash()) {
 			return Ok(active)
 		} else {
-			match worker::find_authorities_change::<B>(&header) {
+			match find_authorities_change::<B>(&header) {
 				Some(active) => return Ok(active),
 				// Move up the chain. Ultimately we'll get it from chain genesis state, or error out
 				// there.
@@ -486,3 +719,18 @@ where
 		}
 	}
 }
+
+/// Scan the `header` digest log for a BEEFY validator set change. Return either the new
+/// validator set or `None` in case no validator set change has been signaled.
+pub(crate) fn find_authorities_change<B>(header: &B::Header) -> Option<ValidatorSet<AuthorityId>>
+where
+	B: Block,
+{
+	let id = OpaqueDigestItemId::Consensus(&BEEFY_ENGINE_ID);
+
+	let filter = |log: ConsensusLog<AuthorityId>| match log {
+		ConsensusLog::AuthoritiesChange(validator_set) => Some(validator_set),
+		_ => None,
+	};
+	header.digest().convert_first(|l| l.try_to(id).and_then(filter))
+}
diff --git a/substrate/client/consensus/beefy/src/tests.rs b/substrate/client/consensus/beefy/src/tests.rs
index 0e6ff210b158c18334a749992d096e2d54c77a01..d106c9dcd88165f929085972483e090dbf78c559 100644
--- a/substrate/client/consensus/beefy/src/tests.rs
+++ b/substrate/client/consensus/beefy/src/tests.rs
@@ -32,8 +32,8 @@ use crate::{
 	gossip_protocol_name,
 	justification::*,
 	wait_for_runtime_pallet,
-	worker::{BeefyWorkerBase, PersistedState},
-	BeefyRPCLinks, BeefyVoterLinks, KnownPeers,
+	worker::PersistedState,
+	BeefyRPCLinks, BeefyVoterLinks, BeefyWorkerBuilder, KnownPeers,
 };
 use futures::{future, stream::FuturesUnordered, Future, FutureExt, StreamExt};
 use parking_lot::Mutex;
@@ -368,27 +368,19 @@ async fn voter_init_setup(
 	api: &TestApi,
 ) -> Result<PersistedState<Block>, Error> {
 	let backend = net.peer(0).client().as_backend();
-	let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
-	let (gossip_validator, _) = GossipValidator::new(known_peers);
-	let gossip_validator = Arc::new(gossip_validator);
-	let mut gossip_engine = sc_network_gossip::GossipEngine::new(
-		net.peer(0).network_service().clone(),
-		net.peer(0).sync_service().clone(),
-		net.peer(0).take_notification_service(&beefy_gossip_proto_name()).unwrap(),
-		"/beefy/whatever",
-		gossip_validator,
-		None,
-	);
-	let (beefy_genesis, best_grandpa) =
-		wait_for_runtime_pallet(api, &mut gossip_engine, finality).await.unwrap();
-	let mut worker_base = BeefyWorkerBase {
+	let (beefy_genesis, best_grandpa) = wait_for_runtime_pallet(api, finality).await.unwrap();
+	let key_store = None.into();
+	let metrics = None;
+	BeefyWorkerBuilder::load_or_init_state(
+		beefy_genesis,
+		best_grandpa,
+		1,
 		backend,
-		runtime: Arc::new(api.clone()),
-		key_store: None.into(),
-		metrics: None,
-		_phantom: Default::default(),
-	};
-	worker_base.load_or_init_state(beefy_genesis, best_grandpa, 1).await
+		Arc::new(api.clone()),
+		&key_store,
+		&metrics,
+	)
+	.await
 }
 
 // Spawns beefy voters. Returns a future to spawn on the runtime.
@@ -1065,32 +1057,7 @@ async fn should_initialize_voter_at_custom_genesis() {
 	net.peer(0).client().as_client().finalize_block(hashes[8], None).unwrap();
 
 	// load persistent state - nothing in DB, should init at genesis
-	//
-	// NOTE: code from `voter_init_setup()` is moved here because the new network event system
-	// doesn't allow creating a new `GossipEngine` as the notification handle is consumed by the
-	// first `GossipEngine`
-	let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
-	let (gossip_validator, _) = GossipValidator::new(known_peers);
-	let gossip_validator = Arc::new(gossip_validator);
-	let mut gossip_engine = sc_network_gossip::GossipEngine::new(
-		net.peer(0).network_service().clone(),
-		net.peer(0).sync_service().clone(),
-		net.peer(0).take_notification_service(&beefy_gossip_proto_name()).unwrap(),
-		"/beefy/whatever",
-		gossip_validator,
-		None,
-	);
-	let (beefy_genesis, best_grandpa) =
-		wait_for_runtime_pallet(&api, &mut gossip_engine, &mut finality).await.unwrap();
-	let mut worker_base = BeefyWorkerBase {
-		backend: backend.clone(),
-		runtime: Arc::new(api),
-		key_store: None.into(),
-		metrics: None,
-		_phantom: Default::default(),
-	};
-	let persisted_state =
-		worker_base.load_or_init_state(beefy_genesis, best_grandpa, 1).await.unwrap();
+	let persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap();
 
 	// Test initialization at session boundary.
 	// verify voter initialized with single session starting at block `custom_pallet_genesis` (7)
@@ -1120,18 +1087,7 @@ async fn should_initialize_voter_at_custom_genesis() {
 
 	net.peer(0).client().as_client().finalize_block(hashes[10], None).unwrap();
 	// load persistent state - state preset in DB, but with different pallet genesis
-	// the network state persists and uses the old `GossipEngine` initialized for `peer(0)`
-	let (beefy_genesis, best_grandpa) =
-		wait_for_runtime_pallet(&api, &mut gossip_engine, &mut finality).await.unwrap();
-	let mut worker_base = BeefyWorkerBase {
-		backend: backend.clone(),
-		runtime: Arc::new(api),
-		key_store: None.into(),
-		metrics: None,
-		_phantom: Default::default(),
-	};
-	let new_persisted_state =
-		worker_base.load_or_init_state(beefy_genesis, best_grandpa, 1).await.unwrap();
+	let new_persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap();
 
 	// verify voter initialized with single session starting at block `new_pallet_genesis` (10)
 	let sessions = new_persisted_state.voting_oracle().sessions();
@@ -1322,32 +1278,7 @@ async fn should_catch_up_when_loading_saved_voter_state() {
 	let api = TestApi::with_validator_set(&validator_set);
 
 	// load persistent state - nothing in DB, should init at genesis
-	//
-	// NOTE: code from `voter_init_setup()` is moved here because the new network event system
-	// doesn't allow creating a new `GossipEngine` as the notification handle is consumed by the
-	// first `GossipEngine`
-	let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
-	let (gossip_validator, _) = GossipValidator::new(known_peers);
-	let gossip_validator = Arc::new(gossip_validator);
-	let mut gossip_engine = sc_network_gossip::GossipEngine::new(
-		net.peer(0).network_service().clone(),
-		net.peer(0).sync_service().clone(),
-		net.peer(0).take_notification_service(&beefy_gossip_proto_name()).unwrap(),
-		"/beefy/whatever",
-		gossip_validator,
-		None,
-	);
-	let (beefy_genesis, best_grandpa) =
-		wait_for_runtime_pallet(&api, &mut gossip_engine, &mut finality).await.unwrap();
-	let mut worker_base = BeefyWorkerBase {
-		backend: backend.clone(),
-		runtime: Arc::new(api.clone()),
-		key_store: None.into(),
-		metrics: None,
-		_phantom: Default::default(),
-	};
-	let persisted_state =
-		worker_base.load_or_init_state(beefy_genesis, best_grandpa, 1).await.unwrap();
+	let persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap();
 
 	// Test initialization at session boundary.
 	// verify voter initialized with two sessions starting at blocks 1 and 10
@@ -1374,18 +1305,7 @@ async fn should_catch_up_when_loading_saved_voter_state() {
 	// finalize 25 without justifications
 	net.peer(0).client().as_client().finalize_block(hashes[25], None).unwrap();
 	// load persistent state - state preset in DB
-	// the network state persists and uses the old `GossipEngine` initialized for `peer(0)`
-	let (beefy_genesis, best_grandpa) =
-		wait_for_runtime_pallet(&api, &mut gossip_engine, &mut finality).await.unwrap();
-	let mut worker_base = BeefyWorkerBase {
-		backend: backend.clone(),
-		runtime: Arc::new(api),
-		key_store: None.into(),
-		metrics: None,
-		_phantom: Default::default(),
-	};
-	let persisted_state =
-		worker_base.load_or_init_state(beefy_genesis, best_grandpa, 1).await.unwrap();
+	let persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap();
 
 	// Verify voter initialized with old sessions plus a new one starting at block 20.
 	// There shouldn't be any duplicates.
diff --git a/substrate/client/consensus/beefy/src/worker.rs b/substrate/client/consensus/beefy/src/worker.rs
index d7a229003cc46e7a33f2b9aa72242c734194d612..c8eb19621ba5a6e45ea2be93e1e2e830492a9b81 100644
--- a/substrate/client/consensus/beefy/src/worker.rs
+++ b/substrate/client/consensus/beefy/src/worker.rs
@@ -17,46 +17,42 @@
 // along with this program. If not, see <https://www.gnu.org/licenses/>.
 
 use crate::{
-	aux_schema,
 	communication::{
-		gossip::{proofs_topic, votes_topic, GossipFilterCfg, GossipMessage, GossipValidator},
+		gossip::{proofs_topic, votes_topic, GossipFilterCfg, GossipMessage},
 		peers::PeerReport,
-		request_response::outgoing_requests_engine::{OnDemandJustificationsEngine, ResponseInfo},
+		request_response::outgoing_requests_engine::ResponseInfo,
 	},
 	error::Error,
-	expect_validator_set,
+	find_authorities_change,
 	justification::BeefyVersionedFinalityProof,
 	keystore::BeefyKeystore,
 	metric_inc, metric_set,
 	metrics::VoterMetrics,
 	round::{Rounds, VoteImportResult},
-	wait_for_parent_header, BeefyVoterLinks, HEADER_SYNC_DELAY, LOG_TARGET,
+	BeefyComms, BeefyVoterLinks, LOG_TARGET,
 };
 use codec::{Codec, Decode, DecodeAll, Encode};
 use futures::{stream::Fuse, FutureExt, StreamExt};
 use log::{debug, error, info, log_enabled, trace, warn};
 use sc_client_api::{Backend, FinalityNotification, FinalityNotifications, HeaderBackend};
-use sc_network_gossip::GossipEngine;
-use sc_utils::{mpsc::TracingUnboundedReceiver, notification::NotificationReceiver};
+use sc_utils::notification::NotificationReceiver;
 use sp_api::ProvideRuntimeApi;
 use sp_arithmetic::traits::{AtLeast32Bit, Saturating};
-use sp_blockchain::Backend as BlockchainBackend;
 use sp_consensus::SyncOracle;
 use sp_consensus_beefy::{
 	check_equivocation_proof,
 	ecdsa_crypto::{AuthorityId, Signature},
-	BeefyApi, BeefySignatureHasher, Commitment, ConsensusLog, EquivocationProof, PayloadProvider,
-	ValidatorSet, VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID,
+	BeefyApi, BeefySignatureHasher, Commitment, EquivocationProof, PayloadProvider, ValidatorSet,
+	VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID,
 };
 use sp_runtime::{
-	generic::{BlockId, OpaqueDigestItemId},
+	generic::BlockId,
 	traits::{Block, Header, NumberFor, Zero},
 	SaturatedConversion,
 };
 use std::{
 	collections::{BTreeMap, BTreeSet, VecDeque},
 	fmt::Debug,
-	marker::PhantomData,
 	sync::Arc,
 };
 
@@ -180,8 +176,8 @@ impl<B: Block> VoterOracle<B> {
 		}
 	}
 
-	// Check if an observed session can be added to the Oracle.
-	fn can_add_session(&self, session_start: NumberFor<B>) -> bool {
+	/// Check if an observed session can be added to the Oracle.
+	pub fn can_add_session(&self, session_start: NumberFor<B>) -> bool {
 		let latest_known_session_start =
 			self.sessions.back().map(|session| session.session_start());
 		Some(session_start) > latest_known_session_start
@@ -319,229 +315,28 @@ impl<B: Block> PersistedState<B> {
 		self.voting_oracle.best_grandpa_block_header = best_grandpa;
 	}
 
+	pub fn voting_oracle(&self) -> &VoterOracle<B> {
+		&self.voting_oracle
+	}
+
 	pub(crate) fn gossip_filter_config(&self) -> Result<GossipFilterCfg<B>, Error> {
 		let (start, end) = self.voting_oracle.accepted_interval()?;
 		let validator_set = self.voting_oracle.current_validator_set()?;
 		Ok(GossipFilterCfg { start, end, validator_set })
 	}
-}
-
-/// Helper object holding BEEFY worker communication/gossip components.
-///
-/// These are created once, but will be reused if worker is restarted/reinitialized.
-pub(crate) struct BeefyComms<B: Block> {
-	pub gossip_engine: GossipEngine<B>,
-	pub gossip_validator: Arc<GossipValidator<B>>,
-	pub gossip_report_stream: TracingUnboundedReceiver<PeerReport>,
-	pub on_demand_justifications: OnDemandJustificationsEngine<B>,
-}
-
-pub(crate) struct BeefyWorkerBase<B: Block, BE, RuntimeApi> {
-	// utilities
-	pub backend: Arc<BE>,
-	pub runtime: Arc<RuntimeApi>,
-	pub key_store: BeefyKeystore<AuthorityId>,
-
-	/// BEEFY client metrics.
-	pub metrics: Option<VoterMetrics>,
-
-	pub _phantom: PhantomData<B>,
-}
-
-impl<B, BE, R> BeefyWorkerBase<B, BE, R>
-where
-	B: Block + Codec,
-	BE: Backend<B>,
-	R: ProvideRuntimeApi<B>,
-	R::Api: BeefyApi<B, AuthorityId>,
-{
-	// If no persisted state present, walk back the chain from first GRANDPA notification to either:
-	//  - latest BEEFY finalized block, or if none found on the way,
-	//  - BEEFY pallet genesis;
-	// Enqueue any BEEFY mandatory blocks (session boundaries) found on the way, for voter to
-	// finalize.
-	async fn init_state(
-		&self,
-		beefy_genesis: NumberFor<B>,
-		best_grandpa: <B as Block>::Header,
-		min_block_delta: u32,
-	) -> Result<PersistedState<B>, Error> {
-		let blockchain = self.backend.blockchain();
-
-		let beefy_genesis = self
-			.runtime
-			.runtime_api()
-			.beefy_genesis(best_grandpa.hash())
-			.ok()
-			.flatten()
-			.filter(|genesis| *genesis == beefy_genesis)
-			.ok_or_else(|| Error::Backend("BEEFY pallet expected to be active.".into()))?;
-		// Walk back the imported blocks and initialize voter either, at the last block with
-		// a BEEFY justification, or at pallet genesis block; voter will resume from there.
-		let mut sessions = VecDeque::new();
-		let mut header = best_grandpa.clone();
-		let state = loop {
-			if let Some(true) = blockchain
-				.justifications(header.hash())
-				.ok()
-				.flatten()
-				.map(|justifs| justifs.get(BEEFY_ENGINE_ID).is_some())
-			{
-				debug!(
-					target: LOG_TARGET,
-					"🥩 Initialize BEEFY voter at last BEEFY finalized block: {:?}.",
-					*header.number()
-				);
-				let best_beefy = *header.number();
-				// If no session boundaries detected so far, just initialize new rounds here.
-				if sessions.is_empty() {
-					let active_set =
-						expect_validator_set(self.runtime.as_ref(), self.backend.as_ref(), &header)
-							.await?;
-					let mut rounds = Rounds::new(best_beefy, active_set);
-					// Mark the round as already finalized.
-					rounds.conclude(best_beefy);
-					sessions.push_front(rounds);
-				}
-				let state = PersistedState::checked_new(
-					best_grandpa,
-					best_beefy,
-					sessions,
-					min_block_delta,
-					beefy_genesis,
-				)
-				.ok_or_else(|| Error::Backend("Invalid BEEFY chain".into()))?;
-				break state
-			}
-
-			if *header.number() == beefy_genesis {
-				// We've reached BEEFY genesis, initialize voter here.
-				let genesis_set =
-					expect_validator_set(self.runtime.as_ref(), self.backend.as_ref(), &header)
-						.await?;
-				info!(
-					target: LOG_TARGET,
-					"🥩 Loading BEEFY voter state from genesis on what appears to be first startup. \
-					Starting voting rounds at block {:?}, genesis validator set {:?}.",
-					beefy_genesis,
-					genesis_set,
-				);
-
-				sessions.push_front(Rounds::new(beefy_genesis, genesis_set));
-				break PersistedState::checked_new(
-					best_grandpa,
-					Zero::zero(),
-					sessions,
-					min_block_delta,
-					beefy_genesis,
-				)
-				.ok_or_else(|| Error::Backend("Invalid BEEFY chain".into()))?
-			}
-
-			if let Some(active) = find_authorities_change::<B>(&header) {
-				debug!(
-					target: LOG_TARGET,
-					"🥩 Marking block {:?} as BEEFY Mandatory.",
-					*header.number()
-				);
-				sessions.push_front(Rounds::new(*header.number(), active));
-			}
-
-			// Move up the chain.
-			header = wait_for_parent_header(blockchain, header, HEADER_SYNC_DELAY).await?;
-		};
-
-		aux_schema::write_current_version(self.backend.as_ref())?;
-		aux_schema::write_voter_state(self.backend.as_ref(), &state)?;
-		Ok(state)
-	}
-
-	pub async fn load_or_init_state(
-		&mut self,
-		beefy_genesis: NumberFor<B>,
-		best_grandpa: <B as Block>::Header,
-		min_block_delta: u32,
-	) -> Result<PersistedState<B>, Error> {
-		// Initialize voter state from AUX DB if compatible.
-		if let Some(mut state) = crate::aux_schema::load_persistent(self.backend.as_ref())?
-			// Verify state pallet genesis matches runtime.
-			.filter(|state| state.pallet_genesis() == beefy_genesis)
-		{
-			// Overwrite persisted state with current best GRANDPA block.
-			state.set_best_grandpa(best_grandpa.clone());
-			// Overwrite persisted data with newly provided `min_block_delta`.
-			state.set_min_block_delta(min_block_delta);
-			debug!(target: LOG_TARGET, "🥩 Loading BEEFY voter state from db: {:?}.", state);
-
-			// Make sure that all the headers that we need have been synced.
-			let mut new_sessions = vec![];
-			let mut header = best_grandpa.clone();
-			while *header.number() > state.best_beefy() {
-				if state.voting_oracle.can_add_session(*header.number()) {
-					if let Some(active) = find_authorities_change::<B>(&header) {
-						new_sessions.push((active, *header.number()));
-					}
-				}
-				header =
-					wait_for_parent_header(self.backend.blockchain(), header, HEADER_SYNC_DELAY)
-						.await?;
-			}
-
-			// Make sure we didn't miss any sessions during node restart.
-			for (validator_set, new_session_start) in new_sessions.drain(..).rev() {
-				debug!(
-					target: LOG_TARGET,
-					"🥩 Handling missed BEEFY session after node restart: {:?}.",
-					new_session_start
-				);
-				self.init_session_at(&mut state, validator_set, new_session_start);
-			}
-			return Ok(state)
-		}
-
-		// No valid voter-state persisted, re-initialize from pallet genesis.
-		self.init_state(beefy_genesis, best_grandpa, min_block_delta).await
-	}
-
-	/// Verify `active` validator set for `block` against the key store
-	///
-	/// We want to make sure that we have _at least one_ key in our keystore that
-	/// is part of the validator set, that's because if there are no local keys
-	/// then we can't perform our job as a validator.
-	///
-	/// Note that for a non-authority node there will be no keystore, and we will
-	/// return an error and don't check. The error can usually be ignored.
-	fn verify_validator_set(
-		&self,
-		block: &NumberFor<B>,
-		active: &ValidatorSet<AuthorityId>,
-	) -> Result<(), Error> {
-		let active: BTreeSet<&AuthorityId> = active.validators().iter().collect();
-
-		let public_keys = self.key_store.public_keys()?;
-		let store: BTreeSet<&AuthorityId> = public_keys.iter().collect();
-
-		if store.intersection(&active).count() == 0 {
-			let msg = "no authority public key found in store".to_string();
-			debug!(target: LOG_TARGET, "🥩 for block {:?} {}", block, msg);
-			metric_inc!(self.metrics, beefy_no_authority_found_in_store);
-			Err(Error::Keystore(msg))
-		} else {
-			Ok(())
-		}
-	}
 
 	/// Handle session changes by starting new voting round for mandatory blocks.
-	fn init_session_at(
+	pub fn init_session_at(
 		&mut self,
-		persisted_state: &mut PersistedState<B>,
-		validator_set: ValidatorSet<AuthorityId>,
 		new_session_start: NumberFor<B>,
+		validator_set: ValidatorSet<AuthorityId>,
+		key_store: &BeefyKeystore<AuthorityId>,
+		metrics: &Option<VoterMetrics>,
 	) {
 		debug!(target: LOG_TARGET, "🥩 New active validator set: {:?}", validator_set);
 
 		// BEEFY should finalize a mandatory block during each session.
-		if let Ok(active_session) = persisted_state.voting_oracle.active_rounds() {
+		if let Ok(active_session) = self.voting_oracle.active_rounds() {
 			if !active_session.mandatory_done() {
 				debug!(
 					target: LOG_TARGET,
@@ -549,20 +344,20 @@ where
 					validator_set.id(),
 					active_session.validator_set_id(),
 				);
-				metric_inc!(self.metrics, beefy_lagging_sessions);
+				metric_inc!(metrics, beefy_lagging_sessions);
 			}
 		}
 
 		if log_enabled!(target: LOG_TARGET, log::Level::Debug) {
 			// verify the new validator set - only do it if we're also logging the warning
-			let _ = self.verify_validator_set(&new_session_start, &validator_set);
+			if verify_validator_set::<B>(&new_session_start, &validator_set, key_store).is_err() {
+				metric_inc!(metrics, beefy_no_authority_found_in_store);
+			}
 		}
 
 		let id = validator_set.id();
-		persisted_state
-			.voting_oracle
-			.add_session(Rounds::new(new_session_start, validator_set));
-		metric_set!(self.metrics, beefy_validator_set_id, id);
+		self.voting_oracle.add_session(Rounds::new(new_session_start, validator_set));
+		metric_set!(metrics, beefy_validator_set_id, id);
 		info!(
 			target: LOG_TARGET,
 			"🥩 New Rounds for validator set id: {:?} with session_start {:?}",
@@ -574,9 +369,10 @@ where
 
 /// A BEEFY worker/voter that follows the BEEFY protocol
 pub(crate) struct BeefyWorker<B: Block, BE, P, RuntimeApi, S> {
-	pub base: BeefyWorkerBase<B, BE, RuntimeApi>,
-
-	// utils
+	// utilities
+	pub backend: Arc<BE>,
+	pub runtime: Arc<RuntimeApi>,
+	pub key_store: BeefyKeystore<AuthorityId>,
 	pub payload_provider: P,
 	pub sync: Arc<S>,
 
@@ -592,6 +388,8 @@ pub(crate) struct BeefyWorker<B: Block, BE, P, RuntimeApi, S> {
 	pub pending_justifications: BTreeMap<NumberFor<B>, BeefyVersionedFinalityProof<B>>,
 	/// Persisted voter state.
 	pub persisted_state: PersistedState<B>,
+	/// BEEFY voter metrics
+	pub metrics: Option<VoterMetrics>,
 }
 
 impl<B, BE, P, R, S> BeefyWorker<B, BE, P, R, S>
@@ -622,8 +420,12 @@ where
 		validator_set: ValidatorSet<AuthorityId>,
 		new_session_start: NumberFor<B>,
 	) {
-		self.base
-			.init_session_at(&mut self.persisted_state, validator_set, new_session_start);
+		self.persisted_state.init_session_at(
+			new_session_start,
+			validator_set,
+			&self.key_store,
+			&self.metrics,
+		);
 	}
 
 	fn handle_finality_notification(
@@ -639,8 +441,7 @@ where
 			notification.tree_route,
 		);
 
-		self.base
-			.runtime
+		self.runtime
 			.runtime_api()
 			.beefy_genesis(header.hash())
 			.ok()
@@ -654,7 +455,7 @@ where
 			self.persisted_state.set_best_grandpa(header.clone());
 
 			// Check all (newly) finalized blocks for new session(s).
-			let backend = self.base.backend.clone();
+			let backend = self.backend.clone();
 			for header in notification
 				.tree_route
 				.iter()
@@ -673,7 +474,7 @@ where
 			}
 
 			if new_session_added {
-				crate::aux_schema::write_voter_state(&*self.base.backend, &self.persisted_state)
+				crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state)
 					.map_err(|e| Error::Backend(e.to_string()))?;
 			}
 
@@ -707,7 +508,7 @@ where
 						true,
 					);
 				},
-			RoundAction::Drop => metric_inc!(self.base.metrics, beefy_stale_votes),
+			RoundAction::Drop => metric_inc!(self.metrics, beefy_stale_votes),
 			RoundAction::Enqueue => error!(target: LOG_TARGET, "🥩 unexpected vote: {:?}.", vote),
 		};
 		Ok(())
@@ -727,23 +528,23 @@ where
 		match self.voting_oracle().triage_round(block_num)? {
 			RoundAction::Process => {
 				debug!(target: LOG_TARGET, "🥩 Process justification for round: {:?}.", block_num);
-				metric_inc!(self.base.metrics, beefy_imported_justifications);
+				metric_inc!(self.metrics, beefy_imported_justifications);
 				self.finalize(justification)?
 			},
 			RoundAction::Enqueue => {
 				debug!(target: LOG_TARGET, "🥩 Buffer justification for round: {:?}.", block_num);
 				if self.pending_justifications.len() < MAX_BUFFERED_JUSTIFICATIONS {
 					self.pending_justifications.entry(block_num).or_insert(justification);
-					metric_inc!(self.base.metrics, beefy_buffered_justifications);
+					metric_inc!(self.metrics, beefy_buffered_justifications);
 				} else {
-					metric_inc!(self.base.metrics, beefy_buffered_justifications_dropped);
+					metric_inc!(self.metrics, beefy_buffered_justifications_dropped);
 					warn!(
 						target: LOG_TARGET,
 						"🥩 Buffer justification dropped for round: {:?}.", block_num
 					);
 				}
 			},
-			RoundAction::Drop => metric_inc!(self.base.metrics, beefy_stale_justifications),
+			RoundAction::Drop => metric_inc!(self.metrics, beefy_stale_justifications),
 		};
 		Ok(())
 	}
@@ -765,7 +566,7 @@ where
 				// We created the `finality_proof` and know to be valid.
 				// New state is persisted after finalization.
 				self.finalize(finality_proof.clone())?;
-				metric_inc!(self.base.metrics, beefy_good_votes_processed);
+				metric_inc!(self.metrics, beefy_good_votes_processed);
 				return Ok(Some(finality_proof))
 			},
 			VoteImportResult::Ok => {
@@ -776,20 +577,17 @@ where
 					.map(|(mandatory_num, _)| mandatory_num == block_number)
 					.unwrap_or(false)
 				{
-					crate::aux_schema::write_voter_state(
-						&*self.base.backend,
-						&self.persisted_state,
-					)
-					.map_err(|e| Error::Backend(e.to_string()))?;
+					crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state)
+						.map_err(|e| Error::Backend(e.to_string()))?;
 				}
-				metric_inc!(self.base.metrics, beefy_good_votes_processed);
+				metric_inc!(self.metrics, beefy_good_votes_processed);
 			},
 			VoteImportResult::Equivocation(proof) => {
-				metric_inc!(self.base.metrics, beefy_equivocation_votes);
+				metric_inc!(self.metrics, beefy_equivocation_votes);
 				self.report_equivocation(proof)?;
 			},
-			VoteImportResult::Invalid => metric_inc!(self.base.metrics, beefy_invalid_votes),
-			VoteImportResult::Stale => metric_inc!(self.base.metrics, beefy_stale_votes),
+			VoteImportResult::Invalid => metric_inc!(self.metrics, beefy_invalid_votes),
+			VoteImportResult::Stale => metric_inc!(self.metrics, beefy_stale_votes),
 		};
 		Ok(None)
 	}
@@ -816,15 +614,14 @@ where
 
 		// Set new best BEEFY block number.
 		self.persisted_state.set_best_beefy(block_num);
-		crate::aux_schema::write_voter_state(&*self.base.backend, &self.persisted_state)
+		crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state)
 			.map_err(|e| Error::Backend(e.to_string()))?;
 
-		metric_set!(self.base.metrics, beefy_best_block, block_num);
+		metric_set!(self.metrics, beefy_best_block, block_num);
 
 		self.comms.on_demand_justifications.cancel_requests_older_than(block_num);
 
 		if let Err(e) = self
-			.base
 			.backend
 			.blockchain()
 			.expect_block_hash_from_id(&BlockId::Number(block_num))
@@ -834,8 +631,7 @@ where
 					.notify(|| Ok::<_, ()>(hash))
 					.expect("forwards closure result; the closure always returns Ok; qed.");
 
-				self.base
-					.backend
+				self.backend
 					.append_justification(hash, (BEEFY_ENGINE_ID, finality_proof.encode()))
 			}) {
 			debug!(
@@ -872,13 +668,13 @@ where
 
 			for (num, justification) in justifs_to_process.into_iter() {
 				debug!(target: LOG_TARGET, "🥩 Handle buffered justification for: {:?}.", num);
-				metric_inc!(self.base.metrics, beefy_imported_justifications);
+				metric_inc!(self.metrics, beefy_imported_justifications);
 				if let Err(err) = self.finalize(justification) {
 					error!(target: LOG_TARGET, "🥩 Error finalizing block: {}", err);
 				}
 			}
 			metric_set!(
-				self.base.metrics,
+				self.metrics,
 				beefy_buffered_justifications,
 				self.pending_justifications.len()
 			);
@@ -890,7 +686,7 @@ where
 	fn try_to_vote(&mut self) -> Result<(), Error> {
 		// Vote if there's now a new vote target.
 		if let Some(target) = self.voting_oracle().voting_target() {
-			metric_set!(self.base.metrics, beefy_should_vote_on, target);
+			metric_set!(self.metrics, beefy_should_vote_on, target);
 			if target > self.persisted_state.best_voted {
 				self.do_vote(target)?;
 			}
@@ -910,7 +706,6 @@ where
 			self.persisted_state.voting_oracle.best_grandpa_block_header.clone()
 		} else {
 			let hash = self
-				.base
 				.backend
 				.blockchain()
 				.expect_block_hash_from_id(&BlockId::Number(target_number))
@@ -922,7 +717,7 @@ where
 					Error::Backend(err_msg)
 				})?;
 
-			self.base.backend.blockchain().expect_header(hash).map_err(|err| {
+			self.backend.blockchain().expect_header(hash).map_err(|err| {
 				let err_msg = format!(
 					"Couldn't get header for block #{:?} ({:?}) (error: {:?}), skipping vote..",
 					target_number, hash, err
@@ -942,7 +737,7 @@ where
 		let rounds = self.persisted_state.voting_oracle.active_rounds_mut()?;
 		let (validators, validator_set_id) = (rounds.validators(), rounds.validator_set_id());
 
-		let authority_id = if let Some(id) = self.base.key_store.authority_id(validators) {
+		let authority_id = if let Some(id) = self.key_store.authority_id(validators) {
 			debug!(target: LOG_TARGET, "🥩 Local authority id: {:?}", id);
 			id
 		} else {
@@ -956,7 +751,7 @@ where
 		let commitment = Commitment { payload, block_number: target_number, validator_set_id };
 		let encoded_commitment = commitment.encode();
 
-		let signature = match self.base.key_store.sign(&authority_id, &encoded_commitment) {
+		let signature = match self.key_store.sign(&authority_id, &encoded_commitment) {
 			Ok(sig) => sig,
 			Err(err) => {
 				warn!(target: LOG_TARGET, "🥩 Error signing commitment: {:?}", err);
@@ -981,7 +776,7 @@ where
 				.gossip_engine
 				.gossip_message(proofs_topic::<B>(), encoded_proof, true);
 		} else {
-			metric_inc!(self.base.metrics, beefy_votes_sent);
+			metric_inc!(self.metrics, beefy_votes_sent);
 			debug!(target: LOG_TARGET, "🥩 Sent vote message: {:?}", vote);
 			let encoded_vote = GossipMessage::<B>::Vote(vote).encode();
 			self.comms.gossip_engine.gossip_message(votes_topic::<B>(), encoded_vote, false);
@@ -989,8 +784,8 @@ where
 
 		// Persist state after vote to avoid double voting in case of voter restarts.
 		self.persisted_state.best_voted = target_number;
-		metric_set!(self.base.metrics, beefy_best_voted, target_number);
-		crate::aux_schema::write_voter_state(&*self.base.backend, &self.persisted_state)
+		metric_set!(self.metrics, beefy_best_voted, target_number);
+		crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state)
 			.map_err(|e| Error::Backend(e.to_string()))
 	}
 
@@ -1164,7 +959,7 @@ where
 		if !check_equivocation_proof::<_, _, BeefySignatureHasher>(&proof) {
 			debug!(target: LOG_TARGET, "🥩 Skip report for bad equivocation {:?}", proof);
 			return Ok(())
-		} else if let Some(local_id) = self.base.key_store.authority_id(validators) {
+		} else if let Some(local_id) = self.key_store.authority_id(validators) {
 			if offender_id == local_id {
 				warn!(target: LOG_TARGET, "🥩 Skip equivocation report for own equivocation");
 				return Ok(())
@@ -1173,7 +968,6 @@ where
 
 		let number = *proof.round_number();
 		let hash = self
-			.base
 			.backend
 			.blockchain()
 			.expect_block_hash_from_id(&BlockId::Number(number))
@@ -1184,7 +978,7 @@ where
 				);
 				Error::Backend(err_msg)
 			})?;
-		let runtime_api = self.base.runtime.runtime_api();
+		let runtime_api = self.runtime.runtime_api();
 		// generate key ownership proof at that block
 		let key_owner_proof = match runtime_api
 			.generate_key_ownership_proof(hash, validator_set_id, offender_id)
@@ -1201,7 +995,7 @@ where
 		};
 
 		// submit equivocation report at **best** block
-		let best_block_hash = self.base.backend.blockchain().info().best_hash;
+		let best_block_hash = self.backend.blockchain().info().best_hash;
 		runtime_api
 			.submit_report_equivocation_unsigned_extrinsic(best_block_hash, proof, key_owner_proof)
 			.map_err(Error::RuntimeApi)?;
@@ -1210,21 +1004,6 @@ where
 	}
 }
 
-/// Scan the `header` digest log for a BEEFY validator set change. Return either the new
-/// validator set or `None` in case no validator set change has been signaled.
-pub(crate) fn find_authorities_change<B>(header: &B::Header) -> Option<ValidatorSet<AuthorityId>>
-where
-	B: Block,
-{
-	let id = OpaqueDigestItemId::Consensus(&BEEFY_ENGINE_ID);
-
-	let filter = |log: ConsensusLog<AuthorityId>| match log {
-		ConsensusLog::AuthoritiesChange(validator_set) => Some(validator_set),
-		_ => None,
-	};
-	header.digest().convert_first(|l| l.try_to(id).and_then(filter))
-}
-
 /// Calculate next block number to vote on.
 ///
 /// Return `None` if there is no votable target yet.
@@ -1261,11 +1040,42 @@ where
 	}
 }
 
+/// Verify `active` validator set for `block` against the key store
+///
+/// We want to make sure that we have _at least one_ key in our keystore that
+/// is part of the validator set, that's because if there are no local keys
+/// then we can't perform our job as a validator.
+///
+/// Note that for a non-authority node there will be no keystore, and we will
+/// return an error and don't check. The error can usually be ignored.
+fn verify_validator_set<B: Block>(
+	block: &NumberFor<B>,
+	active: &ValidatorSet<AuthorityId>,
+	key_store: &BeefyKeystore<AuthorityId>,
+) -> Result<(), Error> {
+	let active: BTreeSet<&AuthorityId> = active.validators().iter().collect();
+
+	let public_keys = key_store.public_keys()?;
+	let store: BTreeSet<&AuthorityId> = public_keys.iter().collect();
+
+	if store.intersection(&active).count() == 0 {
+		let msg = "no authority public key found in store".to_string();
+		debug!(target: LOG_TARGET, "🥩 for block {:?} {}", block, msg);
+		Err(Error::Keystore(msg))
+	} else {
+		Ok(())
+	}
+}
+
 #[cfg(test)]
 pub(crate) mod tests {
 	use super::*;
 	use crate::{
-		communication::notification::{BeefyBestBlockStream, BeefyVersionedFinalityProofStream},
+		communication::{
+			gossip::GossipValidator,
+			notification::{BeefyBestBlockStream, BeefyVersionedFinalityProofStream},
+			request_response::outgoing_requests_engine::OnDemandJustificationsEngine,
+		},
 		tests::{
 			create_beefy_keystore, get_beefy_streams, make_beefy_ids, BeefyPeer, BeefyTestNet,
 			TestApi,
@@ -1275,6 +1085,7 @@ pub(crate) mod tests {
 	use futures::{future::poll_fn, task::Poll};
 	use parking_lot::Mutex;
 	use sc_client_api::{Backend as BackendT, HeaderBackend};
+	use sc_network_gossip::GossipEngine;
 	use sc_network_sync::SyncingService;
 	use sc_network_test::TestNetFactory;
 	use sp_blockchain::Backend as BlockchainBackendT;
@@ -1283,7 +1094,7 @@ pub(crate) mod tests {
 		known_payloads::MMR_ROOT_ID,
 		mmr::MmrRootProvider,
 		test_utils::{generate_equivocation_proof, Keyring},
-		Payload, SignedCommitment,
+		ConsensusLog, Payload, SignedCommitment,
 	};
 	use sp_runtime::traits::{Header as HeaderT, One};
 	use substrate_test_runtime_client::{
@@ -1292,10 +1103,6 @@ pub(crate) mod tests {
 	};
 
 	impl<B: super::Block> PersistedState<B> {
-		pub fn voting_oracle(&self) -> &VoterOracle<B> {
-			&self.voting_oracle
-		}
-
 		pub fn active_round(&self) -> Result<&Rounds<B>, Error> {
 			self.voting_oracle.active_rounds()
 		}
@@ -1391,13 +1198,10 @@ pub(crate) mod tests {
 			on_demand_justifications,
 		};
 		BeefyWorker {
-			base: BeefyWorkerBase {
-				backend,
-				runtime: api,
-				key_store: Some(keystore).into(),
-				metrics,
-				_phantom: Default::default(),
-			},
+			backend,
+			runtime: api,
+			key_store: Some(keystore).into(),
+			metrics,
 			payload_provider,
 			sync: Arc::new(sync),
 			links,
@@ -1675,19 +1479,22 @@ pub(crate) mod tests {
 		let mut worker = create_beefy_worker(net.peer(0), &keys[0], 1, validator_set.clone());
 
 		// keystore doesn't contain other keys than validators'
-		assert_eq!(worker.base.verify_validator_set(&1, &validator_set), Ok(()));
+		assert_eq!(verify_validator_set::<Block>(&1, &validator_set, &worker.key_store), Ok(()));
 
 		// unknown `Bob` key
 		let keys = &[Keyring::Bob];
 		let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap();
 		let err_msg = "no authority public key found in store".to_string();
 		let expected = Err(Error::Keystore(err_msg));
-		assert_eq!(worker.base.verify_validator_set(&1, &validator_set), expected);
+		assert_eq!(verify_validator_set::<Block>(&1, &validator_set, &worker.key_store), expected);
 
 		// worker has no keystore
-		worker.base.key_store = None.into();
+		worker.key_store = None.into();
 		let expected_err = Err(Error::Keystore("no Keystore".into()));
-		assert_eq!(worker.base.verify_validator_set(&1, &validator_set), expected_err);
+		assert_eq!(
+			verify_validator_set::<Block>(&1, &validator_set, &worker.key_store),
+			expected_err
+		);
 	}
 
 	#[tokio::test]
@@ -1839,7 +1646,7 @@ pub(crate) mod tests {
 
 		let mut net = BeefyTestNet::new(1);
 		let mut worker = create_beefy_worker(net.peer(0), &keys[0], 1, validator_set.clone());
-		worker.base.runtime = api_alice.clone();
+		worker.runtime = api_alice.clone();
 
 		// let there be a block with num = 1:
 		let _ = net.peer(0).push_blocks(1, false);
diff --git a/substrate/primitives/consensus/beefy/src/commitment.rs b/substrate/primitives/consensus/beefy/src/commitment.rs
index 1f0fb34ebf10b3e357429bf9bb501677dbea4695..335c6b604f044872ca5dc20319d0e2de94313606 100644
--- a/substrate/primitives/consensus/beefy/src/commitment.rs
+++ b/substrate/primitives/consensus/beefy/src/commitment.rs
@@ -97,6 +97,19 @@ pub struct SignedCommitment<TBlockNumber, TSignature> {
 	pub signatures: Vec<Option<TSignature>>,
 }
 
+impl<TBlockNumber: sp_std::fmt::Debug, TSignature> sp_std::fmt::Display
+	for SignedCommitment<TBlockNumber, TSignature>
+{
+	fn fmt(&self, f: &mut sp_std::fmt::Formatter<'_>) -> sp_std::fmt::Result {
+		let signatures_count = self.signatures.iter().filter(|s| s.is_some()).count();
+		write!(
+			f,
+			"SignedCommitment(commitment: {:?}, signatures_count: {})",
+			self.commitment, signatures_count
+		)
+	}
+}
+
 impl<TBlockNumber, TSignature> SignedCommitment<TBlockNumber, TSignature> {
 	/// Return the number of collected signatures.
 	pub fn no_of_signatures(&self) -> usize {
@@ -241,6 +254,14 @@ pub enum VersionedFinalityProof<N, S> {
 	V1(SignedCommitment<N, S>),
 }
 
+impl<N: sp_std::fmt::Debug, S> sp_std::fmt::Display for VersionedFinalityProof<N, S> {
+	fn fmt(&self, f: &mut sp_std::fmt::Formatter<'_>) -> sp_std::fmt::Result {
+		match self {
+			VersionedFinalityProof::V1(sc) => write!(f, "VersionedFinalityProof::V1({})", sc),
+		}
+	}
+}
+
 impl<N, S> From<SignedCommitment<N, S>> for VersionedFinalityProof<N, S> {
 	fn from(commitment: SignedCommitment<N, S>) -> Self {
 		VersionedFinalityProof::V1(commitment)