diff --git a/substrate/client/consensus/beefy/src/communication/gossip.rs b/substrate/client/consensus/beefy/src/communication/gossip.rs
index 376172fc23370c45e6c59c044f11e61ddc56a6a8..9be648f8796c3d429601b9efbb29c0382e530411 100644
--- a/substrate/client/consensus/beefy/src/communication/gossip.rs
+++ b/substrate/client/consensus/beefy/src/communication/gossip.rs
@@ -18,7 +18,7 @@
 
 use std::{collections::BTreeMap, sync::Arc, time::Duration};
 
-use sc_network::PeerId;
+use sc_network::{PeerId, ReputationChange};
 use sc_network_gossip::{MessageIntent, ValidationResult, Validator, ValidatorContext};
 use sp_core::hashing::twox_64;
 use sp_runtime::traits::{Block, Hash, Header, NumberFor};
@@ -26,10 +26,14 @@ use sp_runtime::traits::{Block, Hash, Header, NumberFor};
 use codec::{Decode, Encode};
 use log::{debug, trace};
 use parking_lot::{Mutex, RwLock};
+use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
 use wasm_timer::Instant;
 
 use crate::{
-	communication::peers::KnownPeers,
+	communication::{
+		benefit, cost,
+		peers::{KnownPeers, PeerReport},
+	},
 	justification::{
 		proof_block_num_and_set_id, verify_with_validator_set, BeefyVersionedFinalityProof,
 	},
@@ -47,6 +51,27 @@ const REBROADCAST_AFTER: Duration = Duration::from_secs(60);
 #[cfg(test)]
 const REBROADCAST_AFTER: Duration = Duration::from_secs(5);
 
+#[derive(Debug, PartialEq)]
+pub(super) enum Action<H> {
+	// repropagate under given topic, to the given peers, applying cost/benefit to originator.
+	Keep(H, ReputationChange),
+	// discard, applying cost/benefit to originator.
+	Discard(ReputationChange),
+}
+
+/// An outcome of examining a message.
+#[derive(Debug, PartialEq, Clone, Copy)]
+enum Consider {
+	/// Accept the message.
+	Accept,
+	/// Message is too early. Reject.
+	RejectPast,
+	/// Message is from the future. Reject.
+	RejectFuture,
+	/// Message cannot be evaluated. Reject.
+	RejectOutOfScope,
+}
+
 /// BEEFY gossip message type that gets encoded and sent on the network.
 #[derive(Debug, Encode, Decode)]
 pub(crate) enum GossipMessage<B: Block> {
@@ -135,26 +160,47 @@ impl<B: Block> Filter<B> {
 		}
 	}
 
-	/// Return true if `max(session_start, best_beefy) <= round <= best_grandpa`,
+	/// Accept if `max(session_start, best_beefy) <= round <= best_grandpa`,
 	/// and vote `set_id` matches session set id.
 	///
 	/// Latest concluded round is still considered alive to allow proper gossiping for it.
-	fn is_vote_accepted(&self, round: NumberFor<B>, set_id: ValidatorSetId) -> bool {
+	fn consider_vote(&self, round: NumberFor<B>, set_id: ValidatorSetId) -> Consider {
 		self.inner
 			.as_ref()
-			.map(|f| set_id == f.validator_set.id() && round >= f.start && round <= f.end)
-			.unwrap_or(false)
+			.map(|f|
+				// only from current set and only [filter.start, filter.end]
+				if set_id < f.validator_set.id() {
+					Consider::RejectPast
+				} else if set_id > f.validator_set.id() {
+					Consider::RejectFuture
+				} else if round < f.start {
+					Consider::RejectPast
+				} else if round > f.end {
+					Consider::RejectFuture
+				} else {
+					Consider::Accept
+				})
+			.unwrap_or(Consider::RejectOutOfScope)
 	}
 
 	/// Return true if `round` is >= than `max(session_start, best_beefy)`,
 	/// and proof `set_id` matches session set id.
 	///
 	/// Latest concluded round is still considered alive to allow proper gossiping for it.
-	fn is_finality_proof_accepted(&self, round: NumberFor<B>, set_id: ValidatorSetId) -> bool {
+	fn consider_finality_proof(&self, round: NumberFor<B>, set_id: ValidatorSetId) -> Consider {
 		self.inner
 			.as_ref()
-			.map(|f| set_id == f.validator_set.id() && round >= f.start)
-			.unwrap_or(false)
+			.map(|f|
+				// only from current set and only >= filter.start
+				if round < f.start || set_id < f.validator_set.id() {
+					Consider::RejectPast
+				} else if set_id > f.validator_set.id() {
+					Consider::RejectFuture
+				} else {
+					Consider::Accept
+				}
+			)
+			.unwrap_or(Consider::RejectOutOfScope)
 	}
 
 	/// Add new _known_ `hash` to the round's known votes.
@@ -189,20 +235,26 @@ where
 	gossip_filter: RwLock<Filter<B>>,
 	next_rebroadcast: Mutex<Instant>,
 	known_peers: Arc<Mutex<KnownPeers<B>>>,
+	report_sender: TracingUnboundedSender<PeerReport>,
 }
 
 impl<B> GossipValidator<B>
 where
 	B: Block,
 {
-	pub fn new(known_peers: Arc<Mutex<KnownPeers<B>>>) -> GossipValidator<B> {
-		GossipValidator {
+	pub(crate) fn new(
+		known_peers: Arc<Mutex<KnownPeers<B>>>,
+	) -> (GossipValidator<B>, TracingUnboundedReceiver<PeerReport>) {
+		let (tx, rx) = tracing_unbounded("mpsc_beefy_gossip_validator", 10_000);
+		let val = GossipValidator {
 			votes_topic: votes_topic::<B>(),
 			justifs_topic: proofs_topic::<B>(),
 			gossip_filter: RwLock::new(Filter::new()),
 			next_rebroadcast: Mutex::new(Instant::now() + REBROADCAST_AFTER),
 			known_peers,
-		}
+			report_sender: tx,
+		};
+		(val, rx)
 	}
 
 	/// Update gossip validator filter.
@@ -213,12 +265,16 @@ where
 		self.gossip_filter.write().update(filter);
 	}
 
+	fn report(&self, who: PeerId, cost_benefit: ReputationChange) {
+		let _ = self.report_sender.unbounded_send(PeerReport { who, cost_benefit });
+	}
+
 	fn validate_vote(
 		&self,
 		vote: VoteMessage<NumberFor<B>, AuthorityId, Signature>,
 		sender: &PeerId,
 		data: &[u8],
-	) -> ValidationResult<B::Hash> {
+	) -> Action<B::Hash> {
 		let msg_hash = twox_64(data);
 		let round = vote.commitment.block_number;
 		let set_id = vote.commitment.validator_set_id;
@@ -230,25 +286,37 @@ where
 		{
 			let filter = self.gossip_filter.read();
 
-			if !filter.is_vote_accepted(round, set_id) {
-				return ValidationResult::Discard
+			match filter.consider_vote(round, set_id) {
+				Consider::RejectPast => return Action::Discard(cost::OUTDATED_MESSAGE),
+				Consider::RejectFuture => return Action::Discard(cost::FUTURE_MESSAGE),
+				Consider::RejectOutOfScope => return Action::Discard(cost::OUT_OF_SCOPE_MESSAGE),
+				Consider::Accept => {},
 			}
 
 			if filter.is_known_vote(round, &msg_hash) {
-				return ValidationResult::ProcessAndKeep(self.votes_topic)
+				return Action::Keep(self.votes_topic, benefit::KNOWN_VOTE_MESSAGE)
+			}
+
+			// ensure authority is part of the set.
+			if !filter
+				.validator_set()
+				.map(|set| set.validators().contains(&vote.id))
+				.unwrap_or(false)
+			{
+				debug!(target: LOG_TARGET, "Message from voter not in validator set: {}", vote.id);
+				return Action::Discard(cost::UNKNOWN_VOTER)
 			}
 		}
 
 		if BeefyKeystore::verify(&vote.id, &vote.signature, &vote.commitment.encode()) {
 			self.gossip_filter.write().add_known_vote(round, msg_hash);
-			ValidationResult::ProcessAndKeep(self.votes_topic)
+			Action::Keep(self.votes_topic, benefit::VOTE_MESSAGE)
 		} else {
-			// TODO: report peer
 			debug!(
 				target: LOG_TARGET,
 				"🥩 Bad signature on message: {:?}, from: {:?}", vote, sender
 			);
-			ValidationResult::Discard
+			Action::Discard(cost::BAD_SIGNATURE)
 		}
 	}
 
@@ -256,31 +324,38 @@ where
 		&self,
 		proof: BeefyVersionedFinalityProof<B>,
 		sender: &PeerId,
-	) -> ValidationResult<B::Hash> {
+	) -> Action<B::Hash> {
 		let (round, set_id) = proof_block_num_and_set_id::<B>(&proof);
 		self.known_peers.lock().note_vote_for(*sender, round);
 
 		let guard = self.gossip_filter.read();
-		// Verify general usefulness of the justifications.
-		if !guard.is_finality_proof_accepted(round, set_id) {
-			return ValidationResult::Discard
+		// Verify general usefulness of the justification.
+		match guard.consider_finality_proof(round, set_id) {
+			Consider::RejectPast => return Action::Discard(cost::OUTDATED_MESSAGE),
+			Consider::RejectFuture => return Action::Discard(cost::FUTURE_MESSAGE),
+			Consider::RejectOutOfScope => return Action::Discard(cost::OUT_OF_SCOPE_MESSAGE),
+			Consider::Accept => {},
 		}
 		// Verify justification signatures.
 		guard
 			.validator_set()
 			.map(|validator_set| {
-				if let Ok(()) = verify_with_validator_set::<B>(round, validator_set, &proof) {
-					ValidationResult::ProcessAndKeep(self.justifs_topic)
-				} else {
-					// TODO: report peer
+				if let Err((_, signatures_checked)) =
+					verify_with_validator_set::<B>(round, validator_set, &proof)
+				{
 					debug!(
 						target: LOG_TARGET,
 						"🥩 Bad signatures on message: {:?}, from: {:?}", proof, sender
 					);
-					ValidationResult::Discard
+					let mut cost = cost::INVALID_PROOF;
+					cost.value +=
+						cost::PER_SIGNATURE_CHECKED.saturating_mul(signatures_checked as i32);
+					Action::Discard(cost)
+				} else {
+					Action::Keep(self.justifs_topic, benefit::VALIDATED_PROOF)
 				}
 			})
-			.unwrap_or(ValidationResult::Discard)
+			.unwrap_or(Action::Discard(cost::OUT_OF_SCOPE_MESSAGE))
 	}
 }
 
@@ -294,15 +369,32 @@ where
 
 	fn validate(
 		&self,
-		_context: &mut dyn ValidatorContext<B>,
+		context: &mut dyn ValidatorContext<B>,
 		sender: &PeerId,
 		mut data: &[u8],
 	) -> ValidationResult<B::Hash> {
-		match GossipMessage::<B>::decode(&mut data) {
-			Ok(GossipMessage::Vote(msg)) => self.validate_vote(msg, sender, data),
+		let raw = data;
+		let action = match GossipMessage::<B>::decode(&mut data) {
+			Ok(GossipMessage::Vote(msg)) => self.validate_vote(msg, sender, raw),
 			Ok(GossipMessage::FinalityProof(proof)) => self.validate_finality_proof(proof, sender),
 			Err(e) => {
 				debug!(target: LOG_TARGET, "Error decoding message: {}", e);
+				let bytes = raw.len().min(i32::MAX as usize) as i32;
+				let cost = ReputationChange::new(
+					bytes.saturating_mul(cost::PER_UNDECODABLE_BYTE),
+					"BEEFY: Bad packet",
+				);
+				Action::Discard(cost)
+			},
+		};
+		match action {
+			Action::Keep(topic, cb) => {
+				self.report(*sender, cb);
+				context.broadcast_message(topic, data.to_vec(), false);
+				ValidationResult::ProcessAndKeep(topic)
+			},
+			Action::Discard(cb) => {
+				self.report(*sender, cb);
 				ValidationResult::Discard
 			},
 		}
@@ -314,13 +406,13 @@ where
 			Ok(GossipMessage::Vote(msg)) => {
 				let round = msg.commitment.block_number;
 				let set_id = msg.commitment.validator_set_id;
-				let expired = !filter.is_vote_accepted(round, set_id);
+				let expired = filter.consider_vote(round, set_id) != Consider::Accept;
 				trace!(target: LOG_TARGET, "🥩 Vote for round #{} expired: {}", round, expired);
 				expired
 			},
 			Ok(GossipMessage::FinalityProof(proof)) => {
 				let (round, set_id) = proof_block_num_and_set_id::<B>(&proof);
-				let expired = !filter.is_finality_proof_accepted(round, set_id);
+				let expired = filter.consider_finality_proof(round, set_id) != Consider::Accept;
 				trace!(
 					target: LOG_TARGET,
 					"🥩 Finality proof for round #{} expired: {}",
@@ -358,13 +450,13 @@ where
 				Ok(GossipMessage::Vote(msg)) => {
 					let round = msg.commitment.block_number;
 					let set_id = msg.commitment.validator_set_id;
-					let allowed = filter.is_vote_accepted(round, set_id);
+					let allowed = filter.consider_vote(round, set_id) == Consider::Accept;
 					trace!(target: LOG_TARGET, "🥩 Vote for round #{} allowed: {}", round, allowed);
 					allowed
 				},
 				Ok(GossipMessage::FinalityProof(proof)) => {
 					let (round, set_id) = proof_block_num_and_set_id::<B>(&proof);
-					let allowed = filter.is_finality_proof_accepted(round, set_id);
+					let allowed = filter.consider_finality_proof(round, set_id) == Consider::Accept;
 					trace!(
 						target: LOG_TARGET,
 						"🥩 Finality proof for round #{} allowed: {}",
@@ -409,15 +501,16 @@ pub(crate) mod tests {
 		assert_eq!(filter.live_votes.len(), 3);
 
 		assert!(filter.inner.is_none());
-		assert!(!filter.is_vote_accepted(1, 1));
+		assert_eq!(filter.consider_vote(1, 1), Consider::RejectOutOfScope);
 
 		filter.update(GossipFilterCfg { start: 3, end: 10, validator_set: &validator_set });
 		assert_eq!(filter.live_votes.len(), 1);
 		assert!(filter.live_votes.contains_key(&3));
-		assert!(!filter.is_vote_accepted(2, 1));
-		assert!(filter.is_vote_accepted(3, 1));
-		assert!(filter.is_vote_accepted(4, 1));
-		assert!(!filter.is_vote_accepted(4, 2));
+		assert_eq!(filter.consider_vote(2, 1), Consider::RejectPast);
+		assert_eq!(filter.consider_vote(3, 1), Consider::Accept);
+		assert_eq!(filter.consider_vote(4, 1), Consider::Accept);
+		assert_eq!(filter.consider_vote(20, 1), Consider::RejectFuture);
+		assert_eq!(filter.consider_vote(4, 2), Consider::RejectFuture);
 
 		let validator_set = ValidatorSet::<AuthorityId>::new(keys, 2).unwrap();
 		filter.update(GossipFilterCfg { start: 5, end: 10, validator_set: &validator_set });
@@ -430,9 +523,7 @@ pub(crate) mod tests {
 			todo!()
 		}
 
-		fn broadcast_message(&mut self, _topic: B::Hash, _message: Vec<u8>, _force: bool) {
-			todo!()
-		}
+		fn broadcast_message(&mut self, _topic: B::Hash, _message: Vec<u8>, _force: bool) {}
 
 		fn send_message(&mut self, _who: &sc_network::PeerId, _message: Vec<u8>) {
 			todo!()
@@ -485,18 +576,39 @@ pub(crate) mod tests {
 	fn should_validate_messages() {
 		let keys = vec![Keyring::Alice.public()];
 		let validator_set = ValidatorSet::<AuthorityId>::new(keys.clone(), 0).unwrap();
-		let gv = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
-		gv.update_filter(GossipFilterCfg { start: 0, end: 10, validator_set: &validator_set });
-		let sender = sc_network::PeerId::random();
+		let (gv, mut report_stream) =
+			GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
+		let sender = PeerId::random();
 		let mut context = TestContext;
 
+		// reject message, decoding error
+		let bad_encoding = b"0000000000".as_slice();
+		let expected_cost = ReputationChange::new(
+			(bad_encoding.len() as i32).saturating_mul(cost::PER_UNDECODABLE_BYTE),
+			"BEEFY: Bad packet",
+		);
+		let mut expected_report = PeerReport { who: sender, cost_benefit: expected_cost };
+		let res = gv.validate(&mut context, &sender, bad_encoding);
+		assert!(matches!(res, ValidationResult::Discard));
+		assert_eq!(report_stream.try_recv().unwrap(), expected_report);
+
+		// verify votes validation
+
 		let vote = dummy_vote(3);
-		let gossip_vote = GossipMessage::<Block>::Vote(vote.clone());
+		let encoded = GossipMessage::<Block>::Vote(vote.clone()).encode();
 
-		// first time the cache should be populated
-		let res = gv.validate(&mut context, &sender, &gossip_vote.encode());
+		// filter not initialized
+		let res = gv.validate(&mut context, &sender, &encoded);
+		assert!(matches!(res, ValidationResult::Discard));
+		expected_report.cost_benefit = cost::OUT_OF_SCOPE_MESSAGE;
+		assert_eq!(report_stream.try_recv().unwrap(), expected_report);
 
+		gv.update_filter(GossipFilterCfg { start: 0, end: 10, validator_set: &validator_set });
+		// nothing in cache first time
+		let res = gv.validate(&mut context, &sender, &encoded);
 		assert!(matches!(res, ValidationResult::ProcessAndKeep(_)));
+		expected_report.cost_benefit = benefit::VOTE_MESSAGE;
+		assert_eq!(report_stream.try_recv().unwrap(), expected_report);
 		assert_eq!(
 			gv.gossip_filter
 				.read()
@@ -507,43 +619,74 @@ pub(crate) mod tests {
 		);
 
 		// second time we should hit the cache
-		let res = gv.validate(&mut context, &sender, &gossip_vote.encode());
+		let res = gv.validate(&mut context, &sender, &encoded);
 		assert!(matches!(res, ValidationResult::ProcessAndKeep(_)));
+		expected_report.cost_benefit = benefit::KNOWN_VOTE_MESSAGE;
+		assert_eq!(report_stream.try_recv().unwrap(), expected_report);
+
+		// reject vote, voter not in validator set
+		let mut bad_vote = vote.clone();
+		bad_vote.id = Keyring::Bob.public();
+		let bad_vote = GossipMessage::<Block>::Vote(bad_vote).encode();
+		let res = gv.validate(&mut context, &sender, &bad_vote);
+		assert!(matches!(res, ValidationResult::Discard));
+		expected_report.cost_benefit = cost::UNKNOWN_VOTER;
+		assert_eq!(report_stream.try_recv().unwrap(), expected_report);
 
-		// next we should quickly reject if the round is not live
-		gv.update_filter(GossipFilterCfg { start: 7, end: 10, validator_set: &validator_set });
-
+		// reject if the round is not GRANDPA finalized
+		gv.update_filter(GossipFilterCfg { start: 1, end: 2, validator_set: &validator_set });
 		let number = vote.commitment.block_number;
 		let set_id = vote.commitment.validator_set_id;
-		assert!(!gv.gossip_filter.read().is_vote_accepted(number, set_id));
+		assert_eq!(gv.gossip_filter.read().consider_vote(number, set_id), Consider::RejectFuture);
+		let res = gv.validate(&mut context, &sender, &encoded);
+		assert!(matches!(res, ValidationResult::Discard));
+		expected_report.cost_benefit = cost::FUTURE_MESSAGE;
+		assert_eq!(report_stream.try_recv().unwrap(), expected_report);
 
-		let res = gv.validate(&mut context, &sender, &vote.encode());
+		// reject if the round is not live anymore
+		gv.update_filter(GossipFilterCfg { start: 7, end: 10, validator_set: &validator_set });
+		let number = vote.commitment.block_number;
+		let set_id = vote.commitment.validator_set_id;
+		assert_eq!(gv.gossip_filter.read().consider_vote(number, set_id), Consider::RejectPast);
+		let res = gv.validate(&mut context, &sender, &encoded);
 		assert!(matches!(res, ValidationResult::Discard));
+		expected_report.cost_benefit = cost::OUTDATED_MESSAGE;
+		assert_eq!(report_stream.try_recv().unwrap(), expected_report);
+
+		// now verify proofs validation
 
 		// reject old proof
 		let proof = dummy_proof(5, &validator_set);
 		let encoded_proof = GossipMessage::<Block>::FinalityProof(proof).encode();
 		let res = gv.validate(&mut context, &sender, &encoded_proof);
 		assert!(matches!(res, ValidationResult::Discard));
+		expected_report.cost_benefit = cost::OUTDATED_MESSAGE;
+		assert_eq!(report_stream.try_recv().unwrap(), expected_report);
 
 		// accept next proof with good set_id
 		let proof = dummy_proof(7, &validator_set);
 		let encoded_proof = GossipMessage::<Block>::FinalityProof(proof).encode();
 		let res = gv.validate(&mut context, &sender, &encoded_proof);
 		assert!(matches!(res, ValidationResult::ProcessAndKeep(_)));
+		expected_report.cost_benefit = benefit::VALIDATED_PROOF;
+		assert_eq!(report_stream.try_recv().unwrap(), expected_report);
 
 		// accept future proof with good set_id
 		let proof = dummy_proof(20, &validator_set);
 		let encoded_proof = GossipMessage::<Block>::FinalityProof(proof).encode();
 		let res = gv.validate(&mut context, &sender, &encoded_proof);
 		assert!(matches!(res, ValidationResult::ProcessAndKeep(_)));
+		expected_report.cost_benefit = benefit::VALIDATED_PROOF;
+		assert_eq!(report_stream.try_recv().unwrap(), expected_report);
 
-		// reject proof, wrong set_id
+		// reject proof, future set_id
 		let bad_validator_set = ValidatorSet::<AuthorityId>::new(keys, 1).unwrap();
 		let proof = dummy_proof(20, &bad_validator_set);
 		let encoded_proof = GossipMessage::<Block>::FinalityProof(proof).encode();
 		let res = gv.validate(&mut context, &sender, &encoded_proof);
 		assert!(matches!(res, ValidationResult::Discard));
+		expected_report.cost_benefit = cost::FUTURE_MESSAGE;
+		assert_eq!(report_stream.try_recv().unwrap(), expected_report);
 
 		// reject proof, bad signatures (Bob instead of Alice)
 		let bad_validator_set =
@@ -552,13 +695,16 @@ pub(crate) mod tests {
 		let encoded_proof = GossipMessage::<Block>::FinalityProof(proof).encode();
 		let res = gv.validate(&mut context, &sender, &encoded_proof);
 		assert!(matches!(res, ValidationResult::Discard));
+		expected_report.cost_benefit = cost::INVALID_PROOF;
+		expected_report.cost_benefit.value += cost::PER_SIGNATURE_CHECKED;
+		assert_eq!(report_stream.try_recv().unwrap(), expected_report);
 	}
 
 	#[test]
 	fn messages_allowed_and_expired() {
 		let keys = vec![Keyring::Alice.public()];
 		let validator_set = ValidatorSet::<AuthorityId>::new(keys.clone(), 0).unwrap();
-		let gv = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
+		let (gv, _) = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
 		gv.update_filter(GossipFilterCfg { start: 0, end: 10, validator_set: &validator_set });
 		let sender = sc_network::PeerId::random();
 		let topic = Default::default();
@@ -635,7 +781,7 @@ pub(crate) mod tests {
 	fn messages_rebroadcast() {
 		let keys = vec![Keyring::Alice.public()];
 		let validator_set = ValidatorSet::<AuthorityId>::new(keys.clone(), 0).unwrap();
-		let gv = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
+		let (gv, _) = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
 		gv.update_filter(GossipFilterCfg { start: 0, end: 10, validator_set: &validator_set });
 		let sender = sc_network::PeerId::random();
 		let topic = Default::default();
diff --git a/substrate/client/consensus/beefy/src/communication/mod.rs b/substrate/client/consensus/beefy/src/communication/mod.rs
index 13735a9d3211bb3d715c159f592f39b1928d908c..d8e4d22053628a6838fe65acfe937132484614e0 100644
--- a/substrate/client/consensus/beefy/src/communication/mod.rs
+++ b/substrate/client/consensus/beefy/src/communication/mod.rs
@@ -73,6 +73,39 @@ pub fn beefy_peers_set_config(
 	cfg
 }
 
+// cost scalars for reporting peers.
+mod cost {
+	use sc_network::ReputationChange as Rep;
+	// Message that's for an outdated round.
+	pub(super) const OUTDATED_MESSAGE: Rep = Rep::new(-50, "BEEFY: Past message");
+	// Message that's from the future relative to our current set-id.
+	pub(super) const FUTURE_MESSAGE: Rep = Rep::new(-100, "BEEFY: Future message");
+	// Vote message containing bad signature.
+	pub(super) const BAD_SIGNATURE: Rep = Rep::new(-100, "BEEFY: Bad signature");
+	// Message received with vote from voter not in validator set.
+	pub(super) const UNKNOWN_VOTER: Rep = Rep::new(-150, "BEEFY: Unknown voter");
+	// A message received that cannot be evaluated relative to our current state.
+	pub(super) const OUT_OF_SCOPE_MESSAGE: Rep = Rep::new(-500, "BEEFY: Out-of-scope message");
+	// Message containing invalid proof.
+	pub(super) const INVALID_PROOF: Rep = Rep::new(-5000, "BEEFY: Invalid commit");
+	// Reputation cost per signature checked for invalid proof.
+	pub(super) const PER_SIGNATURE_CHECKED: i32 = -25;
+	// Reputation cost per byte for un-decodable message.
+	pub(super) const PER_UNDECODABLE_BYTE: i32 = -5;
+	// On-demand request was refused by peer.
+	pub(super) const REFUSAL_RESPONSE: Rep = Rep::new(-100, "BEEFY: Proof request refused");
+	// On-demand request for a proof that can't be found in the backend.
+	pub(super) const UNKOWN_PROOF_REQUEST: Rep = Rep::new(-150, "BEEFY: Unknown proof request");
+}
+
+// benefit scalars for reporting peers.
+mod benefit {
+	use sc_network::ReputationChange as Rep;
+	pub(super) const VOTE_MESSAGE: Rep = Rep::new(100, "BEEFY: Round vote message");
+	pub(super) const KNOWN_VOTE_MESSAGE: Rep = Rep::new(50, "BEEFY: Known vote");
+	pub(super) const VALIDATED_PROOF: Rep = Rep::new(100, "BEEFY: Justification");
+}
+
 #[cfg(test)]
 mod tests {
 	use super::*;
diff --git a/substrate/client/consensus/beefy/src/communication/peers.rs b/substrate/client/consensus/beefy/src/communication/peers.rs
index c2fb06faddf0cd4e3e39b4e4027e6247a1806350..4704b8dcf45765786a4a5a88e1db593114b9dbb9 100644
--- a/substrate/client/consensus/beefy/src/communication/peers.rs
+++ b/substrate/client/consensus/beefy/src/communication/peers.rs
@@ -18,13 +18,17 @@
 
 //! Logic for keeping track of BEEFY peers.
 
-// TODO (issue #12296): replace this naive peer tracking with generic one that infers data
-// from multiple network protocols.
-
-use sc_network::PeerId;
+use sc_network::{PeerId, ReputationChange};
 use sp_runtime::traits::{Block, NumberFor, Zero};
 use std::collections::{HashMap, VecDeque};
 
+/// Report specifying a reputation change for a given peer.
+#[derive(Debug, PartialEq)]
+pub(crate) struct PeerReport {
+	pub who: PeerId,
+	pub cost_benefit: ReputationChange,
+}
+
 struct PeerData<B: Block> {
 	last_voted_on: NumberFor<B>,
 }
diff --git a/substrate/client/consensus/beefy/src/communication/request_response/incoming_requests_handler.rs b/substrate/client/consensus/beefy/src/communication/request_response/incoming_requests_handler.rs
index 1670e99828831c2d999584c5ec5dcbf3e5a5a183..d4f4b59f0195e0d273600081e77ba2e774809f51 100644
--- a/substrate/client/consensus/beefy/src/communication/request_response/incoming_requests_handler.rs
+++ b/substrate/client/consensus/beefy/src/communication/request_response/incoming_requests_handler.rs
@@ -32,9 +32,12 @@ use sp_runtime::traits::Block;
 use std::{marker::PhantomData, sync::Arc};
 
 use crate::{
-	communication::request_response::{
-		on_demand_justifications_protocol_config, Error, JustificationRequest,
-		BEEFY_SYNC_LOG_TARGET,
+	communication::{
+		cost,
+		request_response::{
+			on_demand_justifications_protocol_config, Error, JustificationRequest,
+			BEEFY_SYNC_LOG_TARGET,
+		},
 	},
 	metric_inc,
 	metrics::{register_metrics, OnDemandIncomingRequestsMetrics},
@@ -69,17 +72,20 @@ impl<B: Block> IncomingRequest<B> {
 	/// Params:
 	/// 	- The raw request to decode
 	/// 	- Reputation changes to apply for the peer in case decoding fails.
-	pub fn try_from_raw(
+	pub fn try_from_raw<F>(
 		raw: netconfig::IncomingRequest,
-		reputation_changes: Vec<ReputationChange>,
-	) -> Result<Self, Error> {
+		reputation_changes_on_err: F,
+	) -> Result<Self, Error>
+	where
+		F: FnOnce(usize) -> Vec<ReputationChange>,
+	{
 		let netconfig::IncomingRequest { payload, peer, pending_response } = raw;
 		let payload = match JustificationRequest::decode(&mut payload.as_ref()) {
 			Ok(payload) => payload,
 			Err(err) => {
 				let response = netconfig::OutgoingResponse {
 					result: Err(()),
-					reputation_changes,
+					reputation_changes: reputation_changes_on_err(payload.len()),
 					sent_feedback: None,
 				};
 				if let Err(_) = pending_response.send(response) {
@@ -111,11 +117,11 @@ impl IncomingRequestReceiver {
 	pub async fn recv<B, F>(&mut self, reputation_changes: F) -> Result<IncomingRequest<B>, Error>
 	where
 		B: Block,
-		F: FnOnce() -> Vec<ReputationChange>,
+		F: FnOnce(usize) -> Vec<ReputationChange>,
 	{
 		let req = match self.raw.next().await {
 			None => return Err(Error::RequestChannelExhausted),
-			Some(raw) => IncomingRequest::<B>::try_from_raw(raw, reputation_changes())?,
+			Some(raw) => IncomingRequest::<B>::try_from_raw(raw, reputation_changes)?,
 		};
 		Ok(req)
 	}
@@ -159,26 +165,20 @@ where
 
 	// Sends back justification response if justification found in client backend.
 	fn handle_request(&self, request: IncomingRequest<B>) -> Result<(), Error> {
-		// TODO (issue #12293): validate `request` and change peer reputation for invalid requests.
-
-		let maybe_encoded_proof = if let Some(hash) =
-			self.client.block_hash(request.payload.begin).map_err(Error::Client)?
-		{
-			self.client
-				.justifications(hash)
-				.map_err(Error::Client)?
-				.and_then(|justifs| justifs.get(BEEFY_ENGINE_ID).cloned())
-				// No BEEFY justification present.
-				.ok_or(())
-		} else {
-			Err(())
-		};
-
+		let mut reputation_changes = vec![];
+		let maybe_encoded_proof = self
+			.client
+			.block_hash(request.payload.begin)
+			.ok()
+			.flatten()
+			.and_then(|hash| self.client.justifications(hash).ok().flatten())
+			.and_then(|justifs| justifs.get(BEEFY_ENGINE_ID).cloned())
+			.ok_or_else(|| reputation_changes.push(cost::UNKOWN_PROOF_REQUEST));
 		request
 			.pending_response
 			.send(netconfig::OutgoingResponse {
 				result: maybe_encoded_proof,
-				reputation_changes: Vec::new(),
+				reputation_changes,
 				sent_feedback: None,
 			})
 			.map_err(|_| Error::SendResponse)
@@ -188,7 +188,17 @@ where
 	pub async fn run(mut self) {
 		trace!(target: BEEFY_SYNC_LOG_TARGET, "🥩 Running BeefyJustifsRequestHandler");
 
-		while let Ok(request) = self.request_receiver.recv(|| vec![]).await {
+		while let Ok(request) = self
+			.request_receiver
+			.recv(|bytes| {
+				let bytes = bytes.min(i32::MAX as usize) as i32;
+				vec![ReputationChange::new(
+					bytes.saturating_mul(cost::PER_UNDECODABLE_BYTE),
+					"BEEFY: Bad request payload",
+				)]
+			})
+			.await
+		{
 			let peer = request.peer;
 			match self.handle_request(request) {
 				Ok(()) => {
@@ -199,8 +209,8 @@ where
 					)
 				},
 				Err(e) => {
+					// peer reputation changes already applied in `self.handle_request()`
 					metric_inc!(self, beefy_failed_justification_responses);
-					// TODO (issue #12293): apply reputation changes here based on error type.
 					debug!(
 						target: BEEFY_SYNC_LOG_TARGET,
 						"🥩 Failed to handle BEEFY justification request from {:?}: {}", peer, e,
diff --git a/substrate/client/consensus/beefy/src/communication/request_response/mod.rs b/substrate/client/consensus/beefy/src/communication/request_response/mod.rs
index c528d06bbe0c5392411f12244611996d3801a0e9..545ab18cf1d34860bb93febc68db3aa13b7de138 100644
--- a/substrate/client/consensus/beefy/src/communication/request_response/mod.rs
+++ b/substrate/client/consensus/beefy/src/communication/request_response/mod.rs
@@ -30,7 +30,7 @@ use codec::{Decode, Encode, Error as CodecError};
 use sc_network::{config::RequestResponseConfig, PeerId};
 use sp_runtime::traits::{Block, NumberFor};
 
-use crate::communication::beefy_protocol_name::justifications_protocol_name;
+use crate::communication::{beefy_protocol_name::justifications_protocol_name, peers::PeerReport};
 use incoming_requests_handler::IncomingRequestReceiver;
 
 // 10 seems reasonable, considering justifs are explicitly requested only
@@ -76,7 +76,7 @@ pub struct JustificationRequest<B: Block> {
 }
 
 #[derive(Debug, thiserror::Error)]
-pub enum Error {
+pub(crate) enum Error {
 	#[error(transparent)]
 	Client(#[from] sp_blockchain::Error),
 
@@ -99,5 +99,8 @@ pub enum Error {
 	SendResponse,
 
 	#[error("Received invalid response.")]
-	InvalidResponse,
+	InvalidResponse(PeerReport),
+
+	#[error("Internal error while getting response.")]
+	ResponseError,
 }
diff --git a/substrate/client/consensus/beefy/src/communication/request_response/outgoing_requests_engine.rs b/substrate/client/consensus/beefy/src/communication/request_response/outgoing_requests_engine.rs
index fbf464bd639d9b01010d1b81d4f68eb36e3dcb08..10105ff2d417d5c62a7a64bdc850690a7be8f14b 100644
--- a/substrate/client/consensus/beefy/src/communication/request_response/outgoing_requests_engine.rs
+++ b/substrate/client/consensus/beefy/src/communication/request_response/outgoing_requests_engine.rs
@@ -31,7 +31,11 @@ use sp_runtime::traits::{Block, NumberFor};
 use std::{collections::VecDeque, result::Result, sync::Arc};
 
 use crate::{
-	communication::request_response::{Error, JustificationRequest, BEEFY_SYNC_LOG_TARGET},
+	communication::{
+		benefit, cost,
+		peers::PeerReport,
+		request_response::{Error, JustificationRequest, BEEFY_SYNC_LOG_TARGET},
+	},
 	justification::{decode_and_verify_finality_proof, BeefyVersionedFinalityProof},
 	metric_inc,
 	metrics::{register_metrics, OnDemandOutgoingRequestsMetrics},
@@ -54,6 +58,16 @@ enum State<B: Block> {
 	AwaitingResponse(PeerId, RequestInfo<B>, ResponseReceiver),
 }
 
+/// Possible engine responses.
+pub(crate) enum ResponseInfo<B: Block> {
+	/// No peer response available yet.
+	Pending,
+	/// Valid justification provided alongside peer reputation changes.
+	ValidProof(BeefyVersionedFinalityProof<B>, PeerReport),
+	/// No justification yet, only peer reputation changes.
+	PeerReport(PeerReport),
+}
+
 pub struct OnDemandJustificationsEngine<B: Block> {
 	network: Arc<dyn NetworkRequest + Send + Sync>,
 	protocol_name: ProtocolName,
@@ -84,12 +98,10 @@ impl<B: Block> OnDemandJustificationsEngine<B> {
 	}
 
 	fn reset_peers_cache_for_block(&mut self, block: NumberFor<B>) {
-		// TODO (issue #12296): replace peer selection with generic one that involves all protocols.
 		self.peers_cache = self.live_peers.lock().further_than(block);
 	}
 
 	fn try_next_peer(&mut self) -> Option<PeerId> {
-		// TODO (issue #12296): replace peer selection with generic one that involves all protocols.
 		let live = self.live_peers.lock();
 		while let Some(peer) = self.peers_cache.pop_front() {
 			if live.contains(&peer) {
@@ -159,24 +171,19 @@ impl<B: Block> OnDemandJustificationsEngine<B> {
 
 	fn process_response(
 		&mut self,
-		peer: PeerId,
+		peer: &PeerId,
 		req_info: &RequestInfo<B>,
 		response: Result<Response, Canceled>,
 	) -> Result<BeefyVersionedFinalityProof<B>, Error> {
 		response
 			.map_err(|e| {
-				metric_inc!(self, beefy_on_demand_justification_peer_hang_up);
 				debug!(
 					target: BEEFY_SYNC_LOG_TARGET,
-					"🥩 for on demand justification #{:?}, peer {:?} hung up: {:?}",
-					req_info.block,
-					peer,
-					e
+					"🥩 on-demand sc-network channel sender closed, err: {:?}", e
 				);
-				Error::InvalidResponse
+				Error::ResponseError
 			})?
 			.map_err(|e| {
-				metric_inc!(self, beefy_on_demand_justification_peer_error);
 				debug!(
 					target: BEEFY_SYNC_LOG_TARGET,
 					"🥩 for on demand justification #{:?}, peer {:?} error: {:?}",
@@ -184,7 +191,18 @@ impl<B: Block> OnDemandJustificationsEngine<B> {
 					peer,
 					e
 				);
-				Error::InvalidResponse
+				match e {
+					RequestFailure::Refused => {
+						metric_inc!(self, beefy_on_demand_justification_peer_refused);
+						let peer_report =
+							PeerReport { who: *peer, cost_benefit: cost::REFUSAL_RESPONSE };
+						Error::InvalidResponse(peer_report)
+					},
+					_ => {
+						metric_inc!(self, beefy_on_demand_justification_peer_error);
+						Error::ResponseError
+					},
+				}
 			})
 			.and_then(|encoded| {
 				decode_and_verify_finality_proof::<B>(
@@ -192,23 +210,26 @@ impl<B: Block> OnDemandJustificationsEngine<B> {
 					req_info.block,
 					&req_info.active_set,
 				)
-				.map_err(|e| {
+				.map_err(|(err, signatures_checked)| {
 					metric_inc!(self, beefy_on_demand_justification_invalid_proof);
 					debug!(
 						target: BEEFY_SYNC_LOG_TARGET,
 						"🥩 for on demand justification #{:?}, peer {:?} responded with invalid proof: {:?}",
-						req_info.block, peer, e
+						req_info.block, peer, err
 					);
-					Error::InvalidResponse
+					let mut cost = cost::INVALID_PROOF;
+					cost.value +=
+						cost::PER_SIGNATURE_CHECKED.saturating_mul(signatures_checked as i32);
+					Error::InvalidResponse(PeerReport { who: *peer, cost_benefit: cost })
 				})
 			})
 	}
 
-	pub async fn next(&mut self) -> Option<BeefyVersionedFinalityProof<B>> {
+	pub(crate) async fn next(&mut self) -> ResponseInfo<B> {
 		let (peer, req_info, resp) = match &mut self.state {
 			State::Idle => {
 				futures::future::pending::<()>().await;
-				return None
+				return ResponseInfo::Pending
 			},
 			State::AwaitingResponse(peer, req_info, receiver) => {
 				let resp = receiver.await;
@@ -220,8 +241,8 @@ impl<B: Block> OnDemandJustificationsEngine<B> {
 		self.state = State::Idle;
 
 		let block = req_info.block;
-		self.process_response(peer, &req_info, resp)
-			.map_err(|_| {
+		match self.process_response(&peer, &req_info, resp) {
+			Err(err) => {
 				// No valid justification received, try next peer in our set.
 				if let Some(peer) = self.try_next_peer() {
 					self.request_from_peer(peer, req_info);
@@ -231,15 +252,22 @@ impl<B: Block> OnDemandJustificationsEngine<B> {
 						"🥩 ran out of peers to request justif #{:?} from", block
 					);
 				}
-			})
-			.map(|proof| {
+				// Report peer based on error type.
+				if let Error::InvalidResponse(peer_report) = err {
+					ResponseInfo::PeerReport(peer_report)
+				} else {
+					ResponseInfo::Pending
+				}
+			},
+			Ok(proof) => {
 				metric_inc!(self, beefy_on_demand_justification_good_proof);
 				debug!(
 					target: BEEFY_SYNC_LOG_TARGET,
 					"🥩 received valid on-demand justif #{:?} from {:?}", block, peer
 				);
-				proof
-			})
-			.ok()
+				let peer_report = PeerReport { who: peer, cost_benefit: benefit::VALIDATED_PROOF };
+				ResponseInfo::ValidProof(proof, peer_report)
+			},
+		}
 	}
 }
diff --git a/substrate/client/consensus/beefy/src/import.rs b/substrate/client/consensus/beefy/src/import.rs
index dd2ed92ef83536005cfbb8ca71c1fd3ae41e2f29..bda8169d950139103f9d1d949c8129b990f1a0e0 100644
--- a/substrate/client/consensus/beefy/src/import.rs
+++ b/substrate/client/consensus/beefy/src/import.rs
@@ -109,6 +109,7 @@ where
 			.ok_or_else(|| ImportError("Unknown validator set".to_string()))?;
 
 		decode_and_verify_finality_proof::<Block>(&encoded[..], number, &validator_set)
+			.map_err(|(err, _)| err)
 	}
 }
 
diff --git a/substrate/client/consensus/beefy/src/justification.rs b/substrate/client/consensus/beefy/src/justification.rs
index 5175fd17d4ea34b12df398e8e0c86bb73be7b631..731acdfa63389f6a71370d75e14030d9ebdb8029 100644
--- a/substrate/client/consensus/beefy/src/justification.rs
+++ b/substrate/client/consensus/beefy/src/justification.rs
@@ -42,9 +42,9 @@ pub(crate) fn decode_and_verify_finality_proof<Block: BlockT>(
 	encoded: &[u8],
 	target_number: NumberFor<Block>,
 	validator_set: &ValidatorSet<AuthorityId>,
-) -> Result<BeefyVersionedFinalityProof<Block>, ConsensusError> {
+) -> Result<BeefyVersionedFinalityProof<Block>, (ConsensusError, u32)> {
 	let proof = <BeefyVersionedFinalityProof<Block>>::decode(&mut &*encoded)
-		.map_err(|_| ConsensusError::InvalidJustification)?;
+		.map_err(|_| (ConsensusError::InvalidJustification, 0))?;
 	verify_with_validator_set::<Block>(target_number, validator_set, &proof).map(|_| proof)
 }
 
@@ -53,14 +53,15 @@ pub(crate) fn verify_with_validator_set<Block: BlockT>(
 	target_number: NumberFor<Block>,
 	validator_set: &ValidatorSet<AuthorityId>,
 	proof: &BeefyVersionedFinalityProof<Block>,
-) -> Result<(), ConsensusError> {
+) -> Result<(), (ConsensusError, u32)> {
+	let mut signatures_checked = 0u32;
 	match proof {
 		VersionedFinalityProof::V1(signed_commitment) => {
 			if signed_commitment.signatures.len() != validator_set.len() ||
 				signed_commitment.commitment.validator_set_id != validator_set.id() ||
 				signed_commitment.commitment.block_number != target_number
 			{
-				return Err(ConsensusError::InvalidJustification)
+				return Err((ConsensusError::InvalidJustification, 0))
 			}
 
 			// Arrangement of signatures in the commitment should be in the same order
@@ -73,14 +74,17 @@ pub(crate) fn verify_with_validator_set<Block: BlockT>(
 				.filter(|(id, signature)| {
 					signature
 						.as_ref()
-						.map(|sig| BeefyKeystore::verify(id, sig, &message[..]))
+						.map(|sig| {
+							signatures_checked += 1;
+							BeefyKeystore::verify(id, sig, &message[..])
+						})
 						.unwrap_or(false)
 				})
 				.count();
 			if valid_signatures >= crate::round::threshold(validator_set.len()) {
 				Ok(())
 			} else {
-				Err(ConsensusError::InvalidJustification)
+				Err((ConsensusError::InvalidJustification, signatures_checked))
 			}
 		},
 	}
@@ -127,16 +131,16 @@ pub(crate) mod tests {
 		// wrong block number -> should fail verification
 		let good_proof = proof.clone().into();
 		match verify_with_validator_set::<Block>(block_num + 1, &validator_set, &good_proof) {
-			Err(ConsensusError::InvalidJustification) => (),
-			_ => assert!(false, "Expected Err(ConsensusError::InvalidJustification)"),
+			Err((ConsensusError::InvalidJustification, 0)) => (),
+			e => assert!(false, "Got unexpected {:?}", e),
 		};
 
 		// wrong validator set id -> should fail verification
 		let good_proof = proof.clone().into();
 		let other = ValidatorSet::new(make_beefy_ids(keys), 1).unwrap();
 		match verify_with_validator_set::<Block>(block_num, &other, &good_proof) {
-			Err(ConsensusError::InvalidJustification) => (),
-			_ => assert!(false, "Expected Err(ConsensusError::InvalidJustification)"),
+			Err((ConsensusError::InvalidJustification, 0)) => (),
+			e => assert!(false, "Got unexpected {:?}", e),
 		};
 
 		// wrong signatures length -> should fail verification
@@ -147,8 +151,8 @@ pub(crate) mod tests {
 		};
 		bad_signed_commitment.signatures.pop().flatten().unwrap();
 		match verify_with_validator_set::<Block>(block_num + 1, &validator_set, &bad_proof.into()) {
-			Err(ConsensusError::InvalidJustification) => (),
-			_ => assert!(false, "Expected Err(ConsensusError::InvalidJustification)"),
+			Err((ConsensusError::InvalidJustification, 0)) => (),
+			e => assert!(false, "Got unexpected {:?}", e),
 		};
 
 		// not enough signatures -> should fail verification
@@ -158,9 +162,9 @@ pub(crate) mod tests {
 		};
 		// remove a signature (but same length)
 		*bad_signed_commitment.signatures.first_mut().unwrap() = None;
-		match verify_with_validator_set::<Block>(block_num + 1, &validator_set, &bad_proof.into()) {
-			Err(ConsensusError::InvalidJustification) => (),
-			_ => assert!(false, "Expected Err(ConsensusError::InvalidJustification)"),
+		match verify_with_validator_set::<Block>(block_num, &validator_set, &bad_proof.into()) {
+			Err((ConsensusError::InvalidJustification, 2)) => (),
+			e => assert!(false, "Got unexpected {:?}", e),
 		};
 
 		// not enough _correct_ signatures -> should fail verification
@@ -171,9 +175,9 @@ pub(crate) mod tests {
 		// change a signature to a different key
 		*bad_signed_commitment.signatures.first_mut().unwrap() =
 			Some(Keyring::Dave.sign(&bad_signed_commitment.commitment.encode()));
-		match verify_with_validator_set::<Block>(block_num + 1, &validator_set, &bad_proof.into()) {
-			Err(ConsensusError::InvalidJustification) => (),
-			_ => assert!(false, "Expected Err(ConsensusError::InvalidJustification)"),
+		match verify_with_validator_set::<Block>(block_num, &validator_set, &bad_proof.into()) {
+			Err((ConsensusError::InvalidJustification, 3)) => (),
+			e => assert!(false, "Got unexpected {:?}", e),
 		};
 	}
 
diff --git a/substrate/client/consensus/beefy/src/lib.rs b/substrate/client/consensus/beefy/src/lib.rs
index 3c66cc6eb716d012651101c549d3613f2f7acca7..d3e5e4bc68936c4e9abc69b73fcdaff2c7427b9b 100644
--- a/substrate/client/consensus/beefy/src/lib.rs
+++ b/substrate/client/consensus/beefy/src/lib.rs
@@ -52,7 +52,11 @@ use sp_consensus_beefy::{
 use sp_keystore::KeystorePtr;
 use sp_mmr_primitives::MmrApi;
 use sp_runtime::traits::{Block, Zero};
-use std::{collections::VecDeque, marker::PhantomData, sync::Arc};
+use std::{
+	collections::{BTreeMap, VecDeque},
+	marker::PhantomData,
+	sync::Arc,
+};
 
 mod aux_schema;
 mod error;
@@ -249,9 +253,10 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
 	let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
 	// Default votes filter is to discard everything.
 	// Validator is updated later with correct starting round and set id.
-	let gossip_validator =
-		Arc::new(communication::gossip::GossipValidator::new(known_peers.clone()));
-	let mut gossip_engine = sc_network_gossip::GossipEngine::new(
+	let (gossip_validator, gossip_report_stream) =
+		communication::gossip::GossipValidator::new(known_peers.clone());
+	let gossip_validator = Arc::new(gossip_validator);
+	let mut gossip_engine = GossipEngine::new(
 		network.clone(),
 		sync.clone(),
 		gossip_protocol_name,
@@ -295,7 +300,7 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
 		return
 	}
 
-	let worker_params = worker::WorkerParams {
+	let worker = worker::BeefyWorker {
 		backend,
 		payload_provider,
 		runtime,
@@ -303,14 +308,14 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
 		key_store: key_store.into(),
 		gossip_engine,
 		gossip_validator,
+		gossip_report_stream,
 		on_demand_justifications,
 		links,
 		metrics,
+		pending_justifications: BTreeMap::new(),
 		persisted_state,
 	};
 
-	let worker = worker::BeefyWorker::<_, _, _, _, _>::new(worker_params);
-
 	futures::future::join(
 		worker.run(block_import_justif, finality_notifications),
 		on_demand_justifications_handler.run(),
diff --git a/substrate/client/consensus/beefy/src/metrics.rs b/substrate/client/consensus/beefy/src/metrics.rs
index 6653763fc6754dd8d44677a4b0c2e681cb463d1c..031748bdceab5d3553f35b27fca4b6a2afdf70c9 100644
--- a/substrate/client/consensus/beefy/src/metrics.rs
+++ b/substrate/client/consensus/beefy/src/metrics.rs
@@ -228,8 +228,8 @@ impl PrometheusRegister for OnDemandIncomingRequestsMetrics {
 pub struct OnDemandOutgoingRequestsMetrics {
 	/// Number of times there was no good peer to request justification from
 	pub beefy_on_demand_justification_no_peer_to_request_from: Counter<U64>,
-	/// Number of on-demand justification peer hang up
-	pub beefy_on_demand_justification_peer_hang_up: Counter<U64>,
+	/// Number of on-demand justification peer refused valid requests
+	pub beefy_on_demand_justification_peer_refused: Counter<U64>,
 	/// Number of on-demand justification peer error
 	pub beefy_on_demand_justification_peer_error: Counter<U64>,
 	/// Number of on-demand justification invalid proof
@@ -249,10 +249,10 @@ impl PrometheusRegister for OnDemandOutgoingRequestsMetrics {
 				)?,
 				registry,
 			)?,
-			beefy_on_demand_justification_peer_hang_up: register(
+			beefy_on_demand_justification_peer_refused: register(
 				Counter::new(
-					"substrate_beefy_on_demand_justification_peer_hang_up",
-					"Number of on-demand justification peer hang up",
+					"beefy_on_demand_justification_peer_refused",
+					"Number of on-demand justification peer refused valid requests",
 				)?,
 				registry,
 			)?,
diff --git a/substrate/client/consensus/beefy/src/tests.rs b/substrate/client/consensus/beefy/src/tests.rs
index f36c2cd68f97f52e582ae7df2b6f4a0ca266a476..48ecebdac3581385ad7feb166fb91c3834f8f331 100644
--- a/substrate/client/consensus/beefy/src/tests.rs
+++ b/substrate/client/consensus/beefy/src/tests.rs
@@ -24,6 +24,7 @@ use crate::{
 	communication::{
 		gossip::{
 			proofs_topic, tests::sign_commitment, votes_topic, GossipFilterCfg, GossipMessage,
+			GossipValidator,
 		},
 		request_response::{on_demand_justifications_protocol_config, BeefyJustifsRequestHandler},
 	},
@@ -357,8 +358,8 @@ async fn voter_init_setup(
 ) -> sp_blockchain::Result<PersistedState<Block>> {
 	let backend = net.peer(0).client().as_backend();
 	let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
-	let gossip_validator =
-		Arc::new(crate::communication::gossip::GossipValidator::new(known_peers));
+	let (gossip_validator, _) = GossipValidator::new(known_peers);
+	let gossip_validator = Arc::new(gossip_validator);
 	let mut gossip_engine = sc_network_gossip::GossipEngine::new(
 		net.peer(0).network_service().clone(),
 		net.peer(0).sync_service().clone(),
@@ -1262,8 +1263,8 @@ async fn gossipped_finality_proofs() {
 	let charlie = &net.peers[2];
 	let known_peers = Arc::new(Mutex::new(KnownPeers::<Block>::new()));
 	// Charlie will run just the gossip engine and not the full voter.
-	let charlie_gossip_validator =
-		Arc::new(crate::communication::gossip::GossipValidator::new(known_peers));
+	let (gossip_validator, _) = GossipValidator::new(known_peers);
+	let charlie_gossip_validator = Arc::new(gossip_validator);
 	charlie_gossip_validator.update_filter(GossipFilterCfg::<Block> {
 		start: 1,
 		end: 10,
diff --git a/substrate/client/consensus/beefy/src/worker.rs b/substrate/client/consensus/beefy/src/worker.rs
index 19225ec214578cb2ff438aca9d58073a7d53bf78..c05de197d58fdef619fd8cc915a75dcdd0774ad3 100644
--- a/substrate/client/consensus/beefy/src/worker.rs
+++ b/substrate/client/consensus/beefy/src/worker.rs
@@ -19,7 +19,8 @@
 use crate::{
 	communication::{
 		gossip::{proofs_topic, votes_topic, GossipFilterCfg, GossipMessage, GossipValidator},
-		request_response::outgoing_requests_engine::OnDemandJustificationsEngine,
+		peers::PeerReport,
+		request_response::outgoing_requests_engine::{OnDemandJustificationsEngine, ResponseInfo},
 	},
 	error::Error,
 	justification::BeefyVersionedFinalityProof,
@@ -34,7 +35,7 @@ use futures::{stream::Fuse, FutureExt, StreamExt};
 use log::{debug, error, info, log_enabled, trace, warn};
 use sc_client_api::{Backend, FinalityNotification, FinalityNotifications, HeaderBackend};
 use sc_network_gossip::GossipEngine;
-use sc_utils::notification::NotificationReceiver;
+use sc_utils::{mpsc::TracingUnboundedReceiver, notification::NotificationReceiver};
 use sp_api::{BlockId, ProvideRuntimeApi};
 use sp_arithmetic::traits::{AtLeast32Bit, Saturating};
 use sp_consensus::SyncOracle;
@@ -255,20 +256,6 @@ impl<B: Block> VoterOracle<B> {
 	}
 }
 
-pub(crate) struct WorkerParams<B: Block, BE, P, R, S> {
-	pub backend: Arc<BE>,
-	pub payload_provider: P,
-	pub runtime: Arc<R>,
-	pub sync: Arc<S>,
-	pub key_store: BeefyKeystore,
-	pub gossip_engine: GossipEngine<B>,
-	pub gossip_validator: Arc<GossipValidator<B>>,
-	pub on_demand_justifications: OnDemandJustificationsEngine<B>,
-	pub links: BeefyVoterLinks<B>,
-	pub metrics: Option<VoterMetrics>,
-	pub persisted_state: PersistedState<B>,
-}
-
 #[derive(Debug, Decode, Encode, PartialEq)]
 pub(crate) struct PersistedState<B: Block> {
 	/// Best block we voted on.
@@ -311,28 +298,29 @@ impl<B: Block> PersistedState<B> {
 /// A BEEFY worker plays the BEEFY protocol
 pub(crate) struct BeefyWorker<B: Block, BE, P, RuntimeApi, S> {
 	// utilities
-	backend: Arc<BE>,
-	payload_provider: P,
-	runtime: Arc<RuntimeApi>,
-	sync: Arc<S>,
-	key_store: BeefyKeystore,
+	pub backend: Arc<BE>,
+	pub payload_provider: P,
+	pub runtime: Arc<RuntimeApi>,
+	pub sync: Arc<S>,
+	pub key_store: BeefyKeystore,
 
 	// communication
-	gossip_engine: GossipEngine<B>,
-	gossip_validator: Arc<GossipValidator<B>>,
-	on_demand_justifications: OnDemandJustificationsEngine<B>,
+	pub gossip_engine: GossipEngine<B>,
+	pub gossip_validator: Arc<GossipValidator<B>>,
+	pub gossip_report_stream: TracingUnboundedReceiver<PeerReport>,
+	pub on_demand_justifications: OnDemandJustificationsEngine<B>,
 
 	// channels
 	/// Links between the block importer, the background voter and the RPC layer.
-	links: BeefyVoterLinks<B>,
+	pub links: BeefyVoterLinks<B>,
 
 	// voter state
 	/// BEEFY client metrics.
-	metrics: Option<VoterMetrics>,
+	pub metrics: Option<VoterMetrics>,
 	/// Buffer holding justifications for future processing.
-	pending_justifications: BTreeMap<NumberFor<B>, BeefyVersionedFinalityProof<B>>,
+	pub pending_justifications: BTreeMap<NumberFor<B>, BeefyVersionedFinalityProof<B>>,
 	/// Persisted voter state.
-	persisted_state: PersistedState<B>,
+	pub persisted_state: PersistedState<B>,
 }
 
 impl<B, BE, P, R, S> BeefyWorker<B, BE, P, R, S>
@@ -344,43 +332,6 @@ where
 	R: ProvideRuntimeApi<B>,
 	R::Api: BeefyApi<B>,
 {
-	/// Return a new BEEFY worker instance.
-	///
-	/// Note that a BEEFY worker is only fully functional if a corresponding
-	/// BEEFY pallet has been deployed on-chain.
-	///
-	/// The BEEFY pallet is needed in order to keep track of the BEEFY authority set.
-	pub(crate) fn new(worker_params: WorkerParams<B, BE, P, R, S>) -> Self {
-		let WorkerParams {
-			backend,
-			payload_provider,
-			runtime,
-			key_store,
-			sync,
-			gossip_engine,
-			gossip_validator,
-			on_demand_justifications,
-			links,
-			metrics,
-			persisted_state,
-		} = worker_params;
-
-		BeefyWorker {
-			backend,
-			payload_provider,
-			runtime,
-			sync,
-			key_store,
-			gossip_engine,
-			gossip_validator,
-			on_demand_justifications,
-			links,
-			metrics,
-			pending_justifications: BTreeMap::new(),
-			persisted_state,
-		}
-	}
-
 	fn best_grandpa_block(&self) -> NumberFor<B> {
 		*self.persisted_state.voting_oracle.best_grandpa_block_header.number()
 	}
@@ -849,7 +800,12 @@ where
 			// Act on changed 'state'.
 			self.process_new_state();
 
+			// Mutable reference used to drive the gossip engine.
 			let mut gossip_engine = &mut self.gossip_engine;
+			// Use temp val and report after async section,
+			// to avoid having to Mutex-wrap `gossip_engine`.
+			let mut gossip_report: Option<PeerReport> = None;
+
 			// Wait for, and handle external events.
 			// The branches below only change 'state', actual voting happens afterwards,
 			// based on the new resulting 'state'.
@@ -870,11 +826,16 @@ where
 					return;
 				},
 				// Process incoming justifications as these can make some in-flight votes obsolete.
-				justif = self.on_demand_justifications.next().fuse() => {
-					if let Some(justif) = justif {
-						if let Err(err) = self.triage_incoming_justif(justif) {
-							debug!(target: LOG_TARGET, "🥩 {}", err);
-						}
+				response_info = self.on_demand_justifications.next().fuse() => {
+					match response_info {
+						ResponseInfo::ValidProof(justif, peer_report) => {
+							if let Err(err) = self.triage_incoming_justif(justif) {
+								debug!(target: LOG_TARGET, "🥩 {}", err);
+							}
+							gossip_report = Some(peer_report);
+						},
+						ResponseInfo::PeerReport(peer_report) => gossip_report = Some(peer_report),
+						ResponseInfo::Pending => (),
 					}
 				},
 				justif = block_import_justif.next() => {
@@ -918,6 +879,13 @@ where
 						return;
 					}
 				},
+				// Process peer reports.
+				report = self.gossip_report_stream.next() => {
+					gossip_report = report;
+				},
+			}
+			if let Some(PeerReport { who, cost_benefit }) = gossip_report {
+				self.gossip_engine.report(who, cost_benefit);
 			}
 		}
 	}
@@ -1122,7 +1090,8 @@ pub(crate) mod tests {
 		let network = peer.network_service().clone();
 		let sync = peer.sync_service().clone();
 		let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
-		let gossip_validator = Arc::new(GossipValidator::new(known_peers.clone()));
+		let (gossip_validator, gossip_report_stream) = GossipValidator::new(known_peers.clone());
+		let gossip_validator = Arc::new(gossip_validator);
 		let gossip_engine = GossipEngine::new(
 			network.clone(),
 			sync.clone(),
@@ -1152,7 +1121,7 @@ pub(crate) mod tests {
 		)
 		.unwrap();
 		let payload_provider = MmrRootProvider::new(api.clone());
-		let worker_params = crate::worker::WorkerParams {
+		BeefyWorker {
 			backend,
 			payload_provider,
 			runtime: api,
@@ -1160,12 +1129,13 @@ pub(crate) mod tests {
 			links,
 			gossip_engine,
 			gossip_validator,
+			gossip_report_stream,
 			metrics,
 			sync: Arc::new(sync),
 			on_demand_justifications,
+			pending_justifications: BTreeMap::new(),
 			persisted_state,
-		};
-		BeefyWorker::<_, _, _, _, _>::new(worker_params)
+		}
 	}
 
 	#[test]