diff --git a/substrate/client/consensus/beefy/src/communication/gossip.rs b/substrate/client/consensus/beefy/src/communication/gossip.rs
index 2b5e772c0578f046aef8836b9c3959dfeea39e27..376172fc23370c45e6c59c044f11e61ddc56a6a8 100644
--- a/substrate/client/consensus/beefy/src/communication/gossip.rs
+++ b/substrate/client/consensus/beefy/src/communication/gossip.rs
@@ -28,10 +28,17 @@ use log::{debug, trace};
 use parking_lot::{Mutex, RwLock};
 use wasm_timer::Instant;
 
-use crate::{communication::peers::KnownPeers, keystore::BeefyKeystore, LOG_TARGET};
+use crate::{
+	communication::peers::KnownPeers,
+	justification::{
+		proof_block_num_and_set_id, verify_with_validator_set, BeefyVersionedFinalityProof,
+	},
+	keystore::BeefyKeystore,
+	LOG_TARGET,
+};
 use sp_consensus_beefy::{
-	crypto::{Public, Signature},
-	ValidatorSetId, VoteMessage,
+	crypto::{AuthorityId, Signature},
+	ValidatorSet, ValidatorSetId, VoteMessage,
 };
 
 // Timeout for rebroadcasting messages.
@@ -40,59 +47,128 @@ const REBROADCAST_AFTER: Duration = Duration::from_secs(60);
 #[cfg(test)]
 const REBROADCAST_AFTER: Duration = Duration::from_secs(5);
 
-/// Gossip engine messages topic
-pub(crate) fn topic<B: Block>() -> B::Hash
+/// BEEFY gossip message type that gets encoded and sent on the network.
+#[derive(Debug, Encode, Decode)]
+pub(crate) enum GossipMessage<B: Block> {
+	/// BEEFY message with commitment and single signature.
+	Vote(VoteMessage<NumberFor<B>, AuthorityId, Signature>),
+	/// BEEFY justification with commitment and signatures.
+	FinalityProof(BeefyVersionedFinalityProof<B>),
+}
+
+impl<B: Block> GossipMessage<B> {
+	/// Return inner vote if this message is a Vote.
+	pub fn unwrap_vote(self) -> Option<VoteMessage<NumberFor<B>, AuthorityId, Signature>> {
+		match self {
+			GossipMessage::Vote(vote) => Some(vote),
+			GossipMessage::FinalityProof(_) => None,
+		}
+	}
+
+	/// Return inner finality proof if this message is a FinalityProof.
+	pub fn unwrap_finality_proof(self) -> Option<BeefyVersionedFinalityProof<B>> {
+		match self {
+			GossipMessage::Vote(_) => None,
+			GossipMessage::FinalityProof(proof) => Some(proof),
+		}
+	}
+}
+
+/// Gossip engine votes messages topic
+pub(crate) fn votes_topic<B: Block>() -> B::Hash
 where
 	B: Block,
 {
-	<<B::Header as Header>::Hashing as Hash>::hash(b"beefy")
+	<<B::Header as Header>::Hashing as Hash>::hash(b"beefy-votes")
 }
 
-#[derive(Debug)]
-pub(crate) struct GossipVoteFilter<B: Block> {
-	pub start: NumberFor<B>,
-	pub end: NumberFor<B>,
-	pub validator_set_id: ValidatorSetId,
+/// Gossip engine justifications messages topic
+pub(crate) fn proofs_topic<B: Block>() -> B::Hash
+where
+	B: Block,
+{
+	<<B::Header as Header>::Hashing as Hash>::hash(b"beefy-justifications")
 }
 
 /// A type that represents hash of the message.
 pub type MessageHash = [u8; 8];
 
-struct VotesFilter<B: Block> {
-	filter: Option<GossipVoteFilter<B>>,
-	live: BTreeMap<NumberFor<B>, fnv::FnvHashSet<MessageHash>>,
+#[derive(Clone, Debug)]
+pub(crate) struct GossipFilterCfg<'a, B: Block> {
+	pub start: NumberFor<B>,
+	pub end: NumberFor<B>,
+	pub validator_set: &'a ValidatorSet<AuthorityId>,
+}
+
+#[derive(Clone, Debug)]
+struct FilterInner<B: Block> {
+	pub start: NumberFor<B>,
+	pub end: NumberFor<B>,
+	pub validator_set: ValidatorSet<AuthorityId>,
+}
+
+struct Filter<B: Block> {
+	inner: Option<FilterInner<B>>,
+	live_votes: BTreeMap<NumberFor<B>, fnv::FnvHashSet<MessageHash>>,
 }
 
-impl<B: Block> VotesFilter<B> {
+impl<B: Block> Filter<B> {
 	pub fn new() -> Self {
-		Self { filter: None, live: BTreeMap::new() }
+		Self { inner: None, live_votes: BTreeMap::new() }
 	}
 
 	/// Update filter to new `start` and `set_id`.
-	fn update(&mut self, filter: GossipVoteFilter<B>) {
-		self.live.retain(|&round, _| round >= filter.start && round <= filter.end);
-		self.filter = Some(filter);
+	fn update(&mut self, cfg: GossipFilterCfg<B>) {
+		self.live_votes.retain(|&round, _| round >= cfg.start && round <= cfg.end);
+		// only clone+overwrite big validator_set if set_id changed
+		match self.inner.as_mut() {
+			Some(f) if f.validator_set.id() == cfg.validator_set.id() => {
+				f.start = cfg.start;
+				f.end = cfg.end;
+			},
+			_ =>
+				self.inner = Some(FilterInner {
+					start: cfg.start,
+					end: cfg.end,
+					validator_set: cfg.validator_set.clone(),
+				}),
+		}
+	}
+
+	/// Return true if `max(session_start, best_beefy) <= round <= best_grandpa`,
+	/// and vote `set_id` matches session set id.
+	///
+	/// Latest concluded round is still considered alive to allow proper gossiping for it.
+	fn is_vote_accepted(&self, round: NumberFor<B>, set_id: ValidatorSetId) -> bool {
+		self.inner
+			.as_ref()
+			.map(|f| set_id == f.validator_set.id() && round >= f.start && round <= f.end)
+			.unwrap_or(false)
 	}
 
 	/// Return true if `round` is >= than `max(session_start, best_beefy)`,
-	/// and vote set id matches session set id.
+	/// and proof `set_id` matches session set id.
 	///
 	/// Latest concluded round is still considered alive to allow proper gossiping for it.
-	fn is_live(&self, round: NumberFor<B>, set_id: ValidatorSetId) -> bool {
-		self.filter
+	fn is_finality_proof_accepted(&self, round: NumberFor<B>, set_id: ValidatorSetId) -> bool {
+		self.inner
 			.as_ref()
-			.map(|f| set_id == f.validator_set_id && round >= f.start && round <= f.end)
+			.map(|f| set_id == f.validator_set.id() && round >= f.start)
 			.unwrap_or(false)
 	}
 
 	/// Add new _known_ `hash` to the round's known votes.
-	fn add_known(&mut self, round: NumberFor<B>, hash: MessageHash) {
-		self.live.entry(round).or_default().insert(hash);
+	fn add_known_vote(&mut self, round: NumberFor<B>, hash: MessageHash) {
+		self.live_votes.entry(round).or_default().insert(hash);
 	}
 
 	/// Check if `hash` is already part of round's known votes.
-	fn is_known(&self, round: NumberFor<B>, hash: &MessageHash) -> bool {
-		self.live.get(&round).map(|known| known.contains(hash)).unwrap_or(false)
+	fn is_known_vote(&self, round: NumberFor<B>, hash: &MessageHash) -> bool {
+		self.live_votes.get(&round).map(|known| known.contains(hash)).unwrap_or(false)
+	}
+
+	fn validator_set(&self) -> Option<&ValidatorSet<AuthorityId>> {
+		self.inner.as_ref().map(|f| &f.validator_set)
 	}
 }
 
@@ -108,8 +184,9 @@ pub(crate) struct GossipValidator<B>
 where
 	B: Block,
 {
-	topic: B::Hash,
-	votes_filter: RwLock<VotesFilter<B>>,
+	votes_topic: B::Hash,
+	justifs_topic: B::Hash,
+	gossip_filter: RwLock<Filter<B>>,
 	next_rebroadcast: Mutex<Instant>,
 	known_peers: Arc<Mutex<KnownPeers<B>>>,
 }
@@ -120,8 +197,9 @@ where
 {
 	pub fn new(known_peers: Arc<Mutex<KnownPeers<B>>>) -> GossipValidator<B> {
 		GossipValidator {
-			topic: topic::<B>(),
-			votes_filter: RwLock::new(VotesFilter::new()),
+			votes_topic: votes_topic::<B>(),
+			justifs_topic: proofs_topic::<B>(),
+			gossip_filter: RwLock::new(Filter::new()),
 			next_rebroadcast: Mutex::new(Instant::now() + REBROADCAST_AFTER),
 			known_peers,
 		}
@@ -130,9 +208,79 @@ where
 	/// Update gossip validator filter.
 	///
 	/// Only votes for `set_id` and rounds `start <= round <= end` will be accepted.
-	pub(crate) fn update_filter(&self, filter: GossipVoteFilter<B>) {
+	pub(crate) fn update_filter(&self, filter: GossipFilterCfg<B>) {
 		debug!(target: LOG_TARGET, "🥩 New gossip filter {:?}", filter);
-		self.votes_filter.write().update(filter);
+		self.gossip_filter.write().update(filter);
+	}
+
+	fn validate_vote(
+		&self,
+		vote: VoteMessage<NumberFor<B>, AuthorityId, Signature>,
+		sender: &PeerId,
+		data: &[u8],
+	) -> ValidationResult<B::Hash> {
+		let msg_hash = twox_64(data);
+		let round = vote.commitment.block_number;
+		let set_id = vote.commitment.validator_set_id;
+		self.known_peers.lock().note_vote_for(*sender, round);
+
+		// Verify general usefulness of the message.
+		// We are going to discard old votes right away (without verification)
+		// Also we keep track of already received votes to avoid verifying duplicates.
+		{
+			let filter = self.gossip_filter.read();
+
+			if !filter.is_vote_accepted(round, set_id) {
+				return ValidationResult::Discard
+			}
+
+			if filter.is_known_vote(round, &msg_hash) {
+				return ValidationResult::ProcessAndKeep(self.votes_topic)
+			}
+		}
+
+		if BeefyKeystore::verify(&vote.id, &vote.signature, &vote.commitment.encode()) {
+			self.gossip_filter.write().add_known_vote(round, msg_hash);
+			ValidationResult::ProcessAndKeep(self.votes_topic)
+		} else {
+			// TODO: report peer
+			debug!(
+				target: LOG_TARGET,
+				"🥩 Bad signature on message: {:?}, from: {:?}", vote, sender
+			);
+			ValidationResult::Discard
+		}
+	}
+
+	fn validate_finality_proof(
+		&self,
+		proof: BeefyVersionedFinalityProof<B>,
+		sender: &PeerId,
+	) -> ValidationResult<B::Hash> {
+		let (round, set_id) = proof_block_num_and_set_id::<B>(&proof);
+		self.known_peers.lock().note_vote_for(*sender, round);
+
+		let guard = self.gossip_filter.read();
+		// Verify general usefulness of the justifications.
+		if !guard.is_finality_proof_accepted(round, set_id) {
+			return ValidationResult::Discard
+		}
+		// Verify justification signatures.
+		guard
+			.validator_set()
+			.map(|validator_set| {
+				if let Ok(()) = verify_with_validator_set::<B>(round, validator_set, &proof) {
+					ValidationResult::ProcessAndKeep(self.justifs_topic)
+				} else {
+					// TODO: report peer
+					debug!(
+						target: LOG_TARGET,
+						"🥩 Bad signatures on message: {:?}, from: {:?}", proof, sender
+					);
+					ValidationResult::Discard
+				}
+			})
+			.unwrap_or(ValidationResult::Discard)
 	}
 }
 
@@ -150,57 +298,38 @@ where
 		sender: &PeerId,
 		mut data: &[u8],
 	) -> ValidationResult<B::Hash> {
-		if let Ok(msg) = VoteMessage::<NumberFor<B>, Public, Signature>::decode(&mut data) {
-			let msg_hash = twox_64(data);
-			let round = msg.commitment.block_number;
-			let set_id = msg.commitment.validator_set_id;
-			self.known_peers.lock().note_vote_for(*sender, round);
-
-			// Verify general usefulness of the message.
-			// We are going to discard old votes right away (without verification)
-			// Also we keep track of already received votes to avoid verifying duplicates.
-			{
-				let filter = self.votes_filter.read();
-
-				if !filter.is_live(round, set_id) {
-					return ValidationResult::Discard
-				}
-
-				if filter.is_known(round, &msg_hash) {
-					return ValidationResult::ProcessAndKeep(self.topic)
-				}
-			}
-
-			if BeefyKeystore::verify(&msg.id, &msg.signature, &msg.commitment.encode()) {
-				self.votes_filter.write().add_known(round, msg_hash);
-				return ValidationResult::ProcessAndKeep(self.topic)
-			} else {
-				// TODO: report peer
-				debug!(
-					target: LOG_TARGET,
-					"🥩 Bad signature on message: {:?}, from: {:?}", msg, sender
-				);
-			}
+		match GossipMessage::<B>::decode(&mut data) {
+			Ok(GossipMessage::Vote(msg)) => self.validate_vote(msg, sender, data),
+			Ok(GossipMessage::FinalityProof(proof)) => self.validate_finality_proof(proof, sender),
+			Err(e) => {
+				debug!(target: LOG_TARGET, "Error decoding message: {}", e);
+				ValidationResult::Discard
+			},
 		}
-
-		ValidationResult::Discard
 	}
 
 	fn message_expired<'a>(&'a self) -> Box<dyn FnMut(B::Hash, &[u8]) -> bool + 'a> {
-		let filter = self.votes_filter.read();
-		Box::new(move |_topic, mut data| {
-			let msg = match VoteMessage::<NumberFor<B>, Public, Signature>::decode(&mut data) {
-				Ok(vote) => vote,
-				Err(_) => return true,
-			};
-
-			let round = msg.commitment.block_number;
-			let set_id = msg.commitment.validator_set_id;
-			let expired = !filter.is_live(round, set_id);
-
-			trace!(target: LOG_TARGET, "🥩 Message for round #{} expired: {}", round, expired);
-
-			expired
+		let filter = self.gossip_filter.read();
+		Box::new(move |_topic, mut data| match GossipMessage::<B>::decode(&mut data) {
+			Ok(GossipMessage::Vote(msg)) => {
+				let round = msg.commitment.block_number;
+				let set_id = msg.commitment.validator_set_id;
+				let expired = !filter.is_vote_accepted(round, set_id);
+				trace!(target: LOG_TARGET, "🥩 Vote for round #{} expired: {}", round, expired);
+				expired
+			},
+			Ok(GossipMessage::FinalityProof(proof)) => {
+				let (round, set_id) = proof_block_num_and_set_id::<B>(&proof);
+				let expired = !filter.is_finality_proof_accepted(round, set_id);
+				trace!(
+					target: LOG_TARGET,
+					"🥩 Finality proof for round #{} expired: {}",
+					round,
+					expired
+				);
+				expired
+			},
+			Err(_) => true,
 		})
 	}
 
@@ -219,68 +348,80 @@ where
 			}
 		};
 
-		let filter = self.votes_filter.read();
+		let filter = self.gossip_filter.read();
 		Box::new(move |_who, intent, _topic, mut data| {
 			if let MessageIntent::PeriodicRebroadcast = intent {
 				return do_rebroadcast
 			}
 
-			let msg = match VoteMessage::<NumberFor<B>, Public, Signature>::decode(&mut data) {
-				Ok(vote) => vote,
-				Err(_) => return false,
-			};
-
-			let round = msg.commitment.block_number;
-			let set_id = msg.commitment.validator_set_id;
-			let allowed = filter.is_live(round, set_id);
-
-			trace!(target: LOG_TARGET, "🥩 Message for round #{} allowed: {}", round, allowed);
-
-			allowed
+			match GossipMessage::<B>::decode(&mut data) {
+				Ok(GossipMessage::Vote(msg)) => {
+					let round = msg.commitment.block_number;
+					let set_id = msg.commitment.validator_set_id;
+					let allowed = filter.is_vote_accepted(round, set_id);
+					trace!(target: LOG_TARGET, "🥩 Vote for round #{} allowed: {}", round, allowed);
+					allowed
+				},
+				Ok(GossipMessage::FinalityProof(proof)) => {
+					let (round, set_id) = proof_block_num_and_set_id::<B>(&proof);
+					let allowed = filter.is_finality_proof_accepted(round, set_id);
+					trace!(
+						target: LOG_TARGET,
+						"🥩 Finality proof for round #{} allowed: {}",
+						round,
+						allowed
+					);
+					allowed
+				},
+				Err(_) => false,
+			}
 		})
 	}
 }
 
 #[cfg(test)]
-mod tests {
+pub(crate) mod tests {
 	use super::*;
 	use crate::keystore::BeefyKeystore;
 	use sc_network_test::Block;
 	use sp_consensus_beefy::{
-		crypto::Signature, known_payloads, Commitment, Keyring, MmrRootHash, Payload, VoteMessage,
-		KEY_TYPE,
+		crypto::Signature, known_payloads, Commitment, Keyring, MmrRootHash, Payload,
+		SignedCommitment, VoteMessage, KEY_TYPE,
 	};
 	use sp_keystore::{testing::MemoryKeystore, Keystore};
 
 	#[test]
 	fn known_votes_insert_remove() {
-		let mut kv = VotesFilter::<Block>::new();
+		let mut filter = Filter::<Block>::new();
 		let msg_hash = twox_64(b"data");
-
-		kv.add_known(1, msg_hash);
-		kv.add_known(1, msg_hash);
-		kv.add_known(2, msg_hash);
-		assert_eq!(kv.live.len(), 2);
-
-		kv.add_known(3, msg_hash);
-		assert!(kv.is_known(3, &msg_hash));
-		assert!(!kv.is_known(3, &twox_64(b"other")));
-		assert!(!kv.is_known(4, &msg_hash));
-		assert_eq!(kv.live.len(), 3);
-
-		assert!(kv.filter.is_none());
-		assert!(!kv.is_live(1, 1));
-
-		kv.update(GossipVoteFilter { start: 3, end: 10, validator_set_id: 1 });
-		assert_eq!(kv.live.len(), 1);
-		assert!(kv.live.contains_key(&3));
-		assert!(!kv.is_live(2, 1));
-		assert!(kv.is_live(3, 1));
-		assert!(kv.is_live(4, 1));
-		assert!(!kv.is_live(4, 2));
-
-		kv.update(GossipVoteFilter { start: 5, end: 10, validator_set_id: 2 });
-		assert!(kv.live.is_empty());
+		let keys = vec![Keyring::Alice.public()];
+		let validator_set = ValidatorSet::<AuthorityId>::new(keys.clone(), 1).unwrap();
+
+		filter.add_known_vote(1, msg_hash);
+		filter.add_known_vote(1, msg_hash);
+		filter.add_known_vote(2, msg_hash);
+		assert_eq!(filter.live_votes.len(), 2);
+
+		filter.add_known_vote(3, msg_hash);
+		assert!(filter.is_known_vote(3, &msg_hash));
+		assert!(!filter.is_known_vote(3, &twox_64(b"other")));
+		assert!(!filter.is_known_vote(4, &msg_hash));
+		assert_eq!(filter.live_votes.len(), 3);
+
+		assert!(filter.inner.is_none());
+		assert!(!filter.is_vote_accepted(1, 1));
+
+		filter.update(GossipFilterCfg { start: 3, end: 10, validator_set: &validator_set });
+		assert_eq!(filter.live_votes.len(), 1);
+		assert!(filter.live_votes.contains_key(&3));
+		assert!(!filter.is_vote_accepted(2, 1));
+		assert!(filter.is_vote_accepted(3, 1));
+		assert!(filter.is_vote_accepted(4, 1));
+		assert!(!filter.is_vote_accepted(4, 2));
+
+		let validator_set = ValidatorSet::<AuthorityId>::new(keys, 2).unwrap();
+		filter.update(GossipFilterCfg { start: 5, end: 10, validator_set: &validator_set });
+		assert!(filter.live_votes.is_empty());
 	}
 
 	struct TestContext;
@@ -302,14 +443,14 @@ mod tests {
 		}
 	}
 
-	fn sign_commitment<BN: Encode>(who: &Keyring, commitment: &Commitment<BN>) -> Signature {
+	pub fn sign_commitment<BN: Encode>(who: &Keyring, commitment: &Commitment<BN>) -> Signature {
 		let store = MemoryKeystore::new();
 		store.ecdsa_generate_new(KEY_TYPE, Some(&who.to_seed())).unwrap();
 		let beefy_keystore: BeefyKeystore = Some(store.into()).into();
 		beefy_keystore.sign(&who.public(), &commitment.encode()).unwrap()
 	}
 
-	fn dummy_vote(block_number: u64) -> VoteMessage<u64, Public, Signature> {
+	fn dummy_vote(block_number: u64) -> VoteMessage<u64, AuthorityId, Signature> {
 		let payload = Payload::from_single_entry(
 			known_payloads::MMR_ROOT_ID,
 			MmrRootHash::default().encode(),
@@ -320,51 +461,111 @@ mod tests {
 		VoteMessage { commitment, id: Keyring::Alice.public(), signature }
 	}
 
+	pub fn dummy_proof(
+		block_number: u64,
+		validator_set: &ValidatorSet<AuthorityId>,
+	) -> BeefyVersionedFinalityProof<Block> {
+		let payload = Payload::from_single_entry(
+			known_payloads::MMR_ROOT_ID,
+			MmrRootHash::default().encode(),
+		);
+		let commitment = Commitment { payload, block_number, validator_set_id: validator_set.id() };
+		let signatures = validator_set
+			.validators()
+			.iter()
+			.map(|validator: &AuthorityId| {
+				Some(sign_commitment(&Keyring::from_public(validator).unwrap(), &commitment))
+			})
+			.collect();
+
+		BeefyVersionedFinalityProof::<Block>::V1(SignedCommitment { commitment, signatures })
+	}
+
 	#[test]
-	fn should_avoid_verifying_signatures_twice() {
+	fn should_validate_messages() {
+		let keys = vec![Keyring::Alice.public()];
+		let validator_set = ValidatorSet::<AuthorityId>::new(keys.clone(), 0).unwrap();
 		let gv = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
-		gv.update_filter(GossipVoteFilter { start: 0, end: 10, validator_set_id: 0 });
+		gv.update_filter(GossipFilterCfg { start: 0, end: 10, validator_set: &validator_set });
 		let sender = sc_network::PeerId::random();
 		let mut context = TestContext;
 
 		let vote = dummy_vote(3);
+		let gossip_vote = GossipMessage::<Block>::Vote(vote.clone());
 
 		// first time the cache should be populated
-		let res = gv.validate(&mut context, &sender, &vote.encode());
+		let res = gv.validate(&mut context, &sender, &gossip_vote.encode());
 
 		assert!(matches!(res, ValidationResult::ProcessAndKeep(_)));
 		assert_eq!(
-			gv.votes_filter.read().live.get(&vote.commitment.block_number).map(|x| x.len()),
+			gv.gossip_filter
+				.read()
+				.live_votes
+				.get(&vote.commitment.block_number)
+				.map(|x| x.len()),
 			Some(1)
 		);
 
 		// second time we should hit the cache
-		let res = gv.validate(&mut context, &sender, &vote.encode());
-
+		let res = gv.validate(&mut context, &sender, &gossip_vote.encode());
 		assert!(matches!(res, ValidationResult::ProcessAndKeep(_)));
 
 		// next we should quickly reject if the round is not live
-		gv.update_filter(GossipVoteFilter { start: 7, end: 10, validator_set_id: 0 });
+		gv.update_filter(GossipFilterCfg { start: 7, end: 10, validator_set: &validator_set });
 
 		let number = vote.commitment.block_number;
 		let set_id = vote.commitment.validator_set_id;
-		assert!(!gv.votes_filter.read().is_live(number, set_id));
+		assert!(!gv.gossip_filter.read().is_vote_accepted(number, set_id));
 
 		let res = gv.validate(&mut context, &sender, &vote.encode());
+		assert!(matches!(res, ValidationResult::Discard));
+
+		// reject old proof
+		let proof = dummy_proof(5, &validator_set);
+		let encoded_proof = GossipMessage::<Block>::FinalityProof(proof).encode();
+		let res = gv.validate(&mut context, &sender, &encoded_proof);
+		assert!(matches!(res, ValidationResult::Discard));
+
+		// accept next proof with good set_id
+		let proof = dummy_proof(7, &validator_set);
+		let encoded_proof = GossipMessage::<Block>::FinalityProof(proof).encode();
+		let res = gv.validate(&mut context, &sender, &encoded_proof);
+		assert!(matches!(res, ValidationResult::ProcessAndKeep(_)));
+
+		// accept future proof with good set_id
+		let proof = dummy_proof(20, &validator_set);
+		let encoded_proof = GossipMessage::<Block>::FinalityProof(proof).encode();
+		let res = gv.validate(&mut context, &sender, &encoded_proof);
+		assert!(matches!(res, ValidationResult::ProcessAndKeep(_)));
 
+		// reject proof, wrong set_id
+		let bad_validator_set = ValidatorSet::<AuthorityId>::new(keys, 1).unwrap();
+		let proof = dummy_proof(20, &bad_validator_set);
+		let encoded_proof = GossipMessage::<Block>::FinalityProof(proof).encode();
+		let res = gv.validate(&mut context, &sender, &encoded_proof);
+		assert!(matches!(res, ValidationResult::Discard));
+
+		// reject proof, bad signatures (Bob instead of Alice)
+		let bad_validator_set =
+			ValidatorSet::<AuthorityId>::new(vec![Keyring::Bob.public()], 0).unwrap();
+		let proof = dummy_proof(20, &bad_validator_set);
+		let encoded_proof = GossipMessage::<Block>::FinalityProof(proof).encode();
+		let res = gv.validate(&mut context, &sender, &encoded_proof);
 		assert!(matches!(res, ValidationResult::Discard));
 	}
 
 	#[test]
 	fn messages_allowed_and_expired() {
+		let keys = vec![Keyring::Alice.public()];
+		let validator_set = ValidatorSet::<AuthorityId>::new(keys.clone(), 0).unwrap();
 		let gv = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
-		gv.update_filter(GossipVoteFilter { start: 0, end: 10, validator_set_id: 0 });
+		gv.update_filter(GossipFilterCfg { start: 0, end: 10, validator_set: &validator_set });
 		let sender = sc_network::PeerId::random();
 		let topic = Default::default();
 		let intent = MessageIntent::Broadcast;
 
 		// conclude 2
-		gv.update_filter(GossipVoteFilter { start: 2, end: 10, validator_set_id: 0 });
+		gv.update_filter(GossipFilterCfg { start: 2, end: 10, validator_set: &validator_set });
 		let mut allowed = gv.message_allowed();
 		let mut expired = gv.message_expired();
 
@@ -374,33 +575,68 @@ mod tests {
 
 		// inactive round 1 -> expired
 		let vote = dummy_vote(1);
-		let mut encoded_vote = vote.encode();
+		let mut encoded_vote = GossipMessage::<Block>::Vote(vote).encode();
 		assert!(!allowed(&sender, intent, &topic, &mut encoded_vote));
 		assert!(expired(topic, &mut encoded_vote));
+		let proof = dummy_proof(1, &validator_set);
+		let mut encoded_proof = GossipMessage::<Block>::FinalityProof(proof).encode();
+		assert!(!allowed(&sender, intent, &topic, &mut encoded_proof));
+		assert!(expired(topic, &mut encoded_proof));
 
 		// active round 2 -> !expired - concluded but still gossiped
 		let vote = dummy_vote(2);
-		let mut encoded_vote = vote.encode();
+		let mut encoded_vote = GossipMessage::<Block>::Vote(vote).encode();
 		assert!(allowed(&sender, intent, &topic, &mut encoded_vote));
 		assert!(!expired(topic, &mut encoded_vote));
+		let proof = dummy_proof(2, &validator_set);
+		let mut encoded_proof = GossipMessage::<Block>::FinalityProof(proof).encode();
+		assert!(allowed(&sender, intent, &topic, &mut encoded_proof));
+		assert!(!expired(topic, &mut encoded_proof));
+		// using wrong set_id -> !allowed, expired
+		let bad_validator_set = ValidatorSet::<AuthorityId>::new(keys.clone(), 1).unwrap();
+		let proof = dummy_proof(2, &bad_validator_set);
+		let mut encoded_proof = GossipMessage::<Block>::FinalityProof(proof).encode();
+		assert!(!allowed(&sender, intent, &topic, &mut encoded_proof));
+		assert!(expired(topic, &mut encoded_proof));
 
 		// in progress round 3 -> !expired
 		let vote = dummy_vote(3);
-		let mut encoded_vote = vote.encode();
+		let mut encoded_vote = GossipMessage::<Block>::Vote(vote).encode();
 		assert!(allowed(&sender, intent, &topic, &mut encoded_vote));
 		assert!(!expired(topic, &mut encoded_vote));
+		let proof = dummy_proof(3, &validator_set);
+		let mut encoded_proof = GossipMessage::<Block>::FinalityProof(proof).encode();
+		assert!(allowed(&sender, intent, &topic, &mut encoded_proof));
+		assert!(!expired(topic, &mut encoded_proof));
 
 		// unseen round 4 -> !expired
-		let vote = dummy_vote(3);
-		let mut encoded_vote = vote.encode();
+		let vote = dummy_vote(4);
+		let mut encoded_vote = GossipMessage::<Block>::Vote(vote).encode();
 		assert!(allowed(&sender, intent, &topic, &mut encoded_vote));
 		assert!(!expired(topic, &mut encoded_vote));
+		let proof = dummy_proof(4, &validator_set);
+		let mut encoded_proof = GossipMessage::<Block>::FinalityProof(proof).encode();
+		assert!(allowed(&sender, intent, &topic, &mut encoded_proof));
+		assert!(!expired(topic, &mut encoded_proof));
+
+		// future round 11 -> expired
+		let vote = dummy_vote(11);
+		let mut encoded_vote = GossipMessage::<Block>::Vote(vote).encode();
+		assert!(!allowed(&sender, intent, &topic, &mut encoded_vote));
+		assert!(expired(topic, &mut encoded_vote));
+		// future proofs allowed while same set_id -> allowed
+		let proof = dummy_proof(11, &validator_set);
+		let mut encoded_proof = GossipMessage::<Block>::FinalityProof(proof).encode();
+		assert!(allowed(&sender, intent, &topic, &mut encoded_proof));
+		assert!(!expired(topic, &mut encoded_proof));
 	}
 
 	#[test]
 	fn messages_rebroadcast() {
+		let keys = vec![Keyring::Alice.public()];
+		let validator_set = ValidatorSet::<AuthorityId>::new(keys.clone(), 0).unwrap();
 		let gv = GossipValidator::<Block>::new(Arc::new(Mutex::new(KnownPeers::new())));
-		gv.update_filter(GossipVoteFilter { start: 0, end: 10, validator_set_id: 0 });
+		gv.update_filter(GossipFilterCfg { start: 0, end: 10, validator_set: &validator_set });
 		let sender = sc_network::PeerId::random();
 		let topic = Default::default();
 
diff --git a/substrate/client/consensus/beefy/src/communication/mod.rs b/substrate/client/consensus/beefy/src/communication/mod.rs
index 295d549bb1ba82c1bd832d0eda2183aa00ee0ed8..13735a9d3211bb3d715c159f592f39b1928d908c 100644
--- a/substrate/client/consensus/beefy/src/communication/mod.rs
+++ b/substrate/client/consensus/beefy/src/communication/mod.rs
@@ -29,7 +29,7 @@ pub(crate) mod beefy_protocol_name {
 	use sc_network::ProtocolName;
 
 	/// BEEFY votes gossip protocol name suffix.
-	const GOSSIP_NAME: &str = "/beefy/1";
+	const GOSSIP_NAME: &str = "/beefy/2";
 	/// BEEFY justifications protocol name suffix.
 	const JUSTIFICATIONS_NAME: &str = "/beefy/justifications/1";
 
@@ -86,7 +86,7 @@ mod tests {
 		let genesis_hash = H256::random();
 		let genesis_hex = array_bytes::bytes2hex("", genesis_hash.as_ref());
 
-		let expected_gossip_name = format!("/{}/beefy/1", genesis_hex);
+		let expected_gossip_name = format!("/{}/beefy/2", genesis_hex);
 		let gossip_proto_name = gossip_protocol_name(&genesis_hash, None);
 		assert_eq!(gossip_proto_name.to_string(), expected_gossip_name);
 
@@ -101,7 +101,7 @@ mod tests {
 		];
 		let genesis_hex = "32043c7b3a6ad8f6c2bc8bc121d4caab09377b5e082b0cfbbb39ad13bc4acd93";
 
-		let expected_gossip_name = format!("/{}/beefy/1", genesis_hex);
+		let expected_gossip_name = format!("/{}/beefy/2", genesis_hex);
 		let gossip_proto_name = gossip_protocol_name(&genesis_hash, None);
 		assert_eq!(gossip_proto_name.to_string(), expected_gossip_name);
 
diff --git a/substrate/client/consensus/beefy/src/justification.rs b/substrate/client/consensus/beefy/src/justification.rs
index 1bd250b2a25f3f3110282e17e8f354c058d940db..5175fd17d4ea34b12df398e8e0c86bb73be7b631 100644
--- a/substrate/client/consensus/beefy/src/justification.rs
+++ b/substrate/client/consensus/beefy/src/justification.rs
@@ -21,13 +21,21 @@ use codec::{Decode, Encode};
 use sp_consensus::Error as ConsensusError;
 use sp_consensus_beefy::{
 	crypto::{AuthorityId, Signature},
-	ValidatorSet, VersionedFinalityProof,
+	ValidatorSet, ValidatorSetId, VersionedFinalityProof,
 };
 use sp_runtime::traits::{Block as BlockT, NumberFor};
 
 /// A finality proof with matching BEEFY authorities' signatures.
-pub type BeefyVersionedFinalityProof<Block> =
-	sp_consensus_beefy::VersionedFinalityProof<NumberFor<Block>, Signature>;
+pub type BeefyVersionedFinalityProof<Block> = VersionedFinalityProof<NumberFor<Block>, Signature>;
+
+pub(crate) fn proof_block_num_and_set_id<Block: BlockT>(
+	proof: &BeefyVersionedFinalityProof<Block>,
+) -> (NumberFor<Block>, ValidatorSetId) {
+	match proof {
+		VersionedFinalityProof::V1(sc) =>
+			(sc.commitment.block_number, sc.commitment.validator_set_id),
+	}
+}
 
 /// Decode and verify a Beefy FinalityProof.
 pub(crate) fn decode_and_verify_finality_proof<Block: BlockT>(
@@ -41,7 +49,7 @@ pub(crate) fn decode_and_verify_finality_proof<Block: BlockT>(
 }
 
 /// Verify the Beefy finality proof against the validator set at the block it was generated.
-fn verify_with_validator_set<Block: BlockT>(
+pub(crate) fn verify_with_validator_set<Block: BlockT>(
 	target_number: NumberFor<Block>,
 	validator_set: &ValidatorSet<AuthorityId>,
 	proof: &BeefyVersionedFinalityProof<Block>,
diff --git a/substrate/client/consensus/beefy/src/lib.rs b/substrate/client/consensus/beefy/src/lib.rs
index b84fa45e7e2f3011cd09ddacb40d15a8de2a82de..3c66cc6eb716d012651101c549d3613f2f7acca7 100644
--- a/substrate/client/consensus/beefy/src/lib.rs
+++ b/substrate/client/consensus/beefy/src/lib.rs
@@ -288,7 +288,7 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
 		};
 	// Update the gossip validator with the right starting round and set id.
 	if let Err(e) = persisted_state
-		.current_gossip_filter()
+		.gossip_filter_config()
 		.map(|f| gossip_validator.update_filter(f))
 	{
 		error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e);
diff --git a/substrate/client/consensus/beefy/src/round.rs b/substrate/client/consensus/beefy/src/round.rs
index 64d03beeee854b78aac5c69c320021148e693f3d..d8948ff98c5521e3d9e73f3897c87080ed0fe2ef 100644
--- a/substrate/client/consensus/beefy/src/round.rs
+++ b/substrate/client/consensus/beefy/src/round.rs
@@ -21,7 +21,7 @@ use crate::LOG_TARGET;
 use codec::{Decode, Encode};
 use log::debug;
 use sp_consensus_beefy::{
-	crypto::{AuthorityId, Public, Signature},
+	crypto::{AuthorityId, Signature},
 	Commitment, EquivocationProof, SignedCommitment, ValidatorSet, ValidatorSetId, VoteMessage,
 };
 use sp_runtime::traits::{Block, NumberFor};
@@ -33,11 +33,11 @@ use std::collections::BTreeMap;
 /// Does not do any validation on votes or signatures, layers above need to handle that (gossip).
 #[derive(Debug, Decode, Default, Encode, PartialEq)]
 pub(crate) struct RoundTracker {
-	votes: BTreeMap<Public, Signature>,
+	votes: BTreeMap<AuthorityId, Signature>,
 }
 
 impl RoundTracker {
-	fn add_vote(&mut self, vote: (Public, Signature)) -> bool {
+	fn add_vote(&mut self, vote: (AuthorityId, Signature)) -> bool {
 		if self.votes.contains_key(&vote.0) {
 			return false
 		}
@@ -61,7 +61,7 @@ pub fn threshold(authorities: usize) -> usize {
 pub enum VoteImportResult<B: Block> {
 	Ok,
 	RoundConcluded(SignedCommitment<NumberFor<B>, Signature>),
-	Equivocation(EquivocationProof<NumberFor<B>, Public, Signature>),
+	Equivocation(EquivocationProof<NumberFor<B>, AuthorityId, Signature>),
 	Invalid,
 	Stale,
 }
@@ -73,9 +73,10 @@ pub enum VoteImportResult<B: Block> {
 #[derive(Debug, Decode, Encode, PartialEq)]
 pub(crate) struct Rounds<B: Block> {
 	rounds: BTreeMap<Commitment<NumberFor<B>>, RoundTracker>,
-	previous_votes: BTreeMap<(Public, NumberFor<B>), VoteMessage<NumberFor<B>, Public, Signature>>,
+	previous_votes:
+		BTreeMap<(AuthorityId, NumberFor<B>), VoteMessage<NumberFor<B>, AuthorityId, Signature>>,
 	session_start: NumberFor<B>,
-	validator_set: ValidatorSet<Public>,
+	validator_set: ValidatorSet<AuthorityId>,
 	mandatory_done: bool,
 	best_done: Option<NumberFor<B>>,
 }
@@ -84,7 +85,10 @@ impl<B> Rounds<B>
 where
 	B: Block,
 {
-	pub(crate) fn new(session_start: NumberFor<B>, validator_set: ValidatorSet<Public>) -> Self {
+	pub(crate) fn new(
+		session_start: NumberFor<B>,
+		validator_set: ValidatorSet<AuthorityId>,
+	) -> Self {
 		Rounds {
 			rounds: BTreeMap::new(),
 			previous_votes: BTreeMap::new(),
@@ -95,7 +99,7 @@ where
 		}
 	}
 
-	pub(crate) fn validator_set(&self) -> &ValidatorSet<Public> {
+	pub(crate) fn validator_set(&self) -> &ValidatorSet<AuthorityId> {
 		&self.validator_set
 	}
 
@@ -103,7 +107,7 @@ where
 		self.validator_set.id()
 	}
 
-	pub(crate) fn validators(&self) -> &[Public] {
+	pub(crate) fn validators(&self) -> &[AuthorityId] {
 		self.validator_set.validators()
 	}
 
@@ -199,11 +203,11 @@ mod tests {
 	use sc_network_test::Block;
 
 	use sp_consensus_beefy::{
-		crypto::Public, known_payloads::MMR_ROOT_ID, Commitment, EquivocationProof, Keyring,
-		Payload, SignedCommitment, ValidatorSet, VoteMessage,
+		known_payloads::MMR_ROOT_ID, Commitment, EquivocationProof, Keyring, Payload,
+		SignedCommitment, ValidatorSet, VoteMessage,
 	};
 
-	use super::{threshold, Block as BlockT, RoundTracker, Rounds};
+	use super::{threshold, AuthorityId, Block as BlockT, RoundTracker, Rounds};
 	use crate::round::VoteImportResult;
 
 	impl<B> Rounds<B>
@@ -251,7 +255,7 @@ mod tests {
 	fn new_rounds() {
 		sp_tracing::try_init_simple();
 
-		let validators = ValidatorSet::<Public>::new(
+		let validators = ValidatorSet::<AuthorityId>::new(
 			vec![Keyring::Alice.public(), Keyring::Bob.public(), Keyring::Charlie.public()],
 			42,
 		)
@@ -272,7 +276,7 @@ mod tests {
 	fn add_and_conclude_votes() {
 		sp_tracing::try_init_simple();
 
-		let validators = ValidatorSet::<Public>::new(
+		let validators = ValidatorSet::<AuthorityId>::new(
 			vec![
 				Keyring::Alice.public(),
 				Keyring::Bob.public(),
@@ -338,7 +342,7 @@ mod tests {
 	fn old_rounds_not_accepted() {
 		sp_tracing::try_init_simple();
 
-		let validators = ValidatorSet::<Public>::new(
+		let validators = ValidatorSet::<AuthorityId>::new(
 			vec![Keyring::Alice.public(), Keyring::Bob.public(), Keyring::Charlie.public()],
 			42,
 		)
@@ -384,7 +388,7 @@ mod tests {
 	fn multiple_rounds() {
 		sp_tracing::try_init_simple();
 
-		let validators = ValidatorSet::<Public>::new(
+		let validators = ValidatorSet::<AuthorityId>::new(
 			vec![Keyring::Alice.public(), Keyring::Bob.public(), Keyring::Charlie.public()],
 			Default::default(),
 		)
@@ -459,7 +463,7 @@ mod tests {
 	fn should_provide_equivocation_proof() {
 		sp_tracing::try_init_simple();
 
-		let validators = ValidatorSet::<Public>::new(
+		let validators = ValidatorSet::<AuthorityId>::new(
 			vec![Keyring::Alice.public(), Keyring::Bob.public()],
 			Default::default(),
 		)
diff --git a/substrate/client/consensus/beefy/src/tests.rs b/substrate/client/consensus/beefy/src/tests.rs
index 0ad5f108860938b0ed6ef71307f0683b6c55b0bf..f36c2cd68f97f52e582ae7df2b6f4a0ca266a476 100644
--- a/substrate/client/consensus/beefy/src/tests.rs
+++ b/substrate/client/consensus/beefy/src/tests.rs
@@ -21,15 +21,18 @@
 use crate::{
 	aux_schema::{load_persistent, tests::verify_persisted_version},
 	beefy_block_import_and_links,
-	communication::request_response::{
-		on_demand_justifications_protocol_config, BeefyJustifsRequestHandler,
+	communication::{
+		gossip::{
+			proofs_topic, tests::sign_commitment, votes_topic, GossipFilterCfg, GossipMessage,
+		},
+		request_response::{on_demand_justifications_protocol_config, BeefyJustifsRequestHandler},
 	},
 	gossip_protocol_name,
 	justification::*,
 	load_or_init_voter_state, wait_for_runtime_pallet, BeefyRPCLinks, BeefyVoterLinks, KnownPeers,
 	PersistedState,
 };
-use futures::{future, stream::FuturesUnordered, Future, StreamExt};
+use futures::{future, stream::FuturesUnordered, Future, FutureExt, StreamExt};
 use parking_lot::Mutex;
 use sc_client_api::{Backend as BackendT, BlockchainEvents, FinalityNotifications, HeaderBackend};
 use sc_consensus::{
@@ -48,16 +51,16 @@ use sp_consensus::BlockOrigin;
 use sp_consensus_beefy::{
 	crypto::{AuthorityId, Signature},
 	known_payloads,
-	mmr::MmrRootProvider,
+	mmr::{find_mmr_root_digest, MmrRootProvider},
 	BeefyApi, Commitment, ConsensusLog, EquivocationProof, Keyring as BeefyKeyring, MmrRootHash,
 	OpaqueKeyOwnershipProof, Payload, SignedCommitment, ValidatorSet, ValidatorSetId,
-	VersionedFinalityProof, BEEFY_ENGINE_ID, KEY_TYPE as BeefyKeyType,
+	VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID, KEY_TYPE as BeefyKeyType,
 };
 use sp_core::H256;
 use sp_keystore::{testing::MemoryKeystore, Keystore, KeystorePtr};
 use sp_mmr_primitives::{Error as MmrError, MmrApi};
 use sp_runtime::{
-	codec::Encode,
+	codec::{Decode, Encode},
 	traits::{Header as HeaderT, NumberFor},
 	BuildStorage, DigestItem, EncodedJustification, Justifications, Storage,
 };
@@ -503,16 +506,15 @@ async fn wait_for_beefy_signed_commitments(
 	run_until(wait_for, net).await;
 }
 
-async fn streams_empty_after_timeout<T>(
+async fn streams_empty_after_future<T>(
 	streams: Vec<NotificationReceiver<T>>,
-	net: &Arc<Mutex<BeefyTestNet>>,
-	timeout: Option<Duration>,
+	future: Option<impl Future + Unpin>,
 ) where
 	T: std::fmt::Debug,
 	T: std::cmp::PartialEq,
 {
-	if let Some(timeout) = timeout {
-		run_for(timeout, net).await;
+	if let Some(future) = future {
+		future.await;
 	}
 	for mut stream in streams.into_iter() {
 		future::poll_fn(move |cx| {
@@ -523,6 +525,18 @@ async fn streams_empty_after_timeout<T>(
 	}
 }
 
+async fn streams_empty_after_timeout<T>(
+	streams: Vec<NotificationReceiver<T>>,
+	net: &Arc<Mutex<BeefyTestNet>>,
+	timeout: Option<Duration>,
+) where
+	T: std::fmt::Debug,
+	T: std::cmp::PartialEq,
+{
+	let timeout = timeout.map(|timeout| Box::pin(run_for(timeout, net)));
+	streams_empty_after_future(streams, timeout).await;
+}
+
 async fn finalize_block_and_wait_for_beefy(
 	net: &Arc<Mutex<BeefyTestNet>>,
 	// peer index and key
@@ -1229,3 +1243,143 @@ async fn beefy_reports_equivocations() {
 	assert_eq!(equivocation_proof.first.id, BeefyKeyring::Bob.public());
 	assert_eq!(equivocation_proof.first.commitment.block_number, 1);
 }
+
+#[tokio::test]
+async fn gossipped_finality_proofs() {
+	sp_tracing::try_init_simple();
+
+	let validators = [BeefyKeyring::Alice, BeefyKeyring::Bob, BeefyKeyring::Charlie];
+	// Only Alice and Bob are running the voter -> finality threshold not reached
+	let peers = [BeefyKeyring::Alice, BeefyKeyring::Bob];
+	let validator_set = ValidatorSet::new(make_beefy_ids(&validators), 0).unwrap();
+	let session_len = 30;
+	let min_block_delta = 1;
+
+	let mut net = BeefyTestNet::new(3);
+	let api = Arc::new(TestApi::with_validator_set(&validator_set));
+	let beefy_peers = peers.iter().enumerate().map(|(id, key)| (id, key, api.clone())).collect();
+
+	let charlie = &net.peers[2];
+	let known_peers = Arc::new(Mutex::new(KnownPeers::<Block>::new()));
+	// Charlie will run just the gossip engine and not the full voter.
+	let charlie_gossip_validator =
+		Arc::new(crate::communication::gossip::GossipValidator::new(known_peers));
+	charlie_gossip_validator.update_filter(GossipFilterCfg::<Block> {
+		start: 1,
+		end: 10,
+		validator_set: &validator_set,
+	});
+	let mut charlie_gossip_engine = sc_network_gossip::GossipEngine::new(
+		charlie.network_service().clone(),
+		charlie.sync_service().clone(),
+		beefy_gossip_proto_name(),
+		charlie_gossip_validator.clone(),
+		None,
+	);
+
+	// Alice and Bob run full voter.
+	tokio::spawn(initialize_beefy(&mut net, beefy_peers, min_block_delta));
+
+	let net = Arc::new(Mutex::new(net));
+
+	// Pump net + Charlie gossip to see peers.
+	let timeout = Box::pin(tokio::time::sleep(Duration::from_millis(200)));
+	let gossip_engine_pump = &mut charlie_gossip_engine;
+	let pump_with_timeout = future::select(gossip_engine_pump, timeout);
+	run_until(pump_with_timeout, &net).await;
+
+	// push 10 blocks
+	let hashes = net.lock().generate_blocks_and_sync(10, session_len, &validator_set, true).await;
+
+	let peers = peers.into_iter().enumerate();
+
+	// Alice, Bob and Charlie finalize #1, Alice and Bob vote on it, but not Charlie.
+	let finalize = hashes[1];
+	let (best_blocks, versioned_finality_proof) = get_beefy_streams(&mut net.lock(), peers.clone());
+	net.lock().peer(0).client().as_client().finalize_block(finalize, None).unwrap();
+	net.lock().peer(1).client().as_client().finalize_block(finalize, None).unwrap();
+	net.lock().peer(2).client().as_client().finalize_block(finalize, None).unwrap();
+	// verify nothing gets finalized by BEEFY
+	let timeout = Box::pin(tokio::time::sleep(Duration::from_millis(100)));
+	let pump_net = futures::future::poll_fn(|cx| {
+		net.lock().poll(cx);
+		Poll::<()>::Pending
+	});
+	let pump_gossip = &mut charlie_gossip_engine;
+	let pump_with_timeout = future::select(pump_gossip, future::select(pump_net, timeout));
+	streams_empty_after_future(best_blocks, Some(pump_with_timeout)).await;
+	streams_empty_after_timeout(versioned_finality_proof, &net, None).await;
+
+	let (best_blocks, versioned_finality_proof) = get_beefy_streams(&mut net.lock(), peers.clone());
+	// Charlie gossips finality proof for #1 -> Alice and Bob also finalize.
+	let proof = crate::communication::gossip::tests::dummy_proof(1, &validator_set);
+	let gossip_proof = GossipMessage::<Block>::FinalityProof(proof);
+	let encoded_proof = gossip_proof.encode();
+	charlie_gossip_engine.gossip_message(proofs_topic::<Block>(), encoded_proof, true);
+	// Expect #1 is finalized.
+	wait_for_best_beefy_blocks(best_blocks, &net, &[1]).await;
+	wait_for_beefy_signed_commitments(versioned_finality_proof, &net, &[1]).await;
+
+	// Code above verifies gossipped finality proofs are correctly imported and consumed by voters.
+	// Next, let's verify finality proofs are correctly generated and gossipped by voters.
+
+	// Everyone finalizes #2
+	let block_number = 2u64;
+	let finalize = hashes[block_number as usize];
+	let (best_blocks, versioned_finality_proof) = get_beefy_streams(&mut net.lock(), peers.clone());
+	net.lock().peer(0).client().as_client().finalize_block(finalize, None).unwrap();
+	net.lock().peer(1).client().as_client().finalize_block(finalize, None).unwrap();
+	net.lock().peer(2).client().as_client().finalize_block(finalize, None).unwrap();
+
+	// Simulate Charlie vote on #2
+	let header = net.lock().peer(2).client().as_client().expect_header(finalize).unwrap();
+	let mmr_root = find_mmr_root_digest::<Block>(&header).unwrap();
+	let payload = Payload::from_single_entry(known_payloads::MMR_ROOT_ID, mmr_root.encode());
+	let commitment = Commitment { payload, block_number, validator_set_id: validator_set.id() };
+	let signature = sign_commitment(&BeefyKeyring::Charlie, &commitment);
+	let vote_message = VoteMessage { commitment, id: BeefyKeyring::Charlie.public(), signature };
+	let encoded_vote = GossipMessage::<Block>::Vote(vote_message).encode();
+	charlie_gossip_engine.gossip_message(votes_topic::<Block>(), encoded_vote, true);
+
+	// Expect #2 is finalized.
+	wait_for_best_beefy_blocks(best_blocks, &net, &[2]).await;
+	wait_for_beefy_signed_commitments(versioned_finality_proof, &net, &[2]).await;
+
+	// Now verify Charlie also sees the gossipped proof generated by either Alice or Bob.
+	let mut charlie_gossip_proofs = Box::pin(
+		charlie_gossip_engine
+			.messages_for(proofs_topic::<Block>())
+			.filter_map(|notification| async move {
+				GossipMessage::<Block>::decode(&mut &notification.message[..]).ok().and_then(
+					|message| match message {
+						GossipMessage::<Block>::Vote(_) => unreachable!(),
+						GossipMessage::<Block>::FinalityProof(proof) => Some(proof),
+					},
+				)
+			})
+			.fuse(),
+	);
+	loop {
+		let pump_net = futures::future::poll_fn(|cx| {
+			net.lock().poll(cx);
+			Poll::<()>::Pending
+		});
+		let mut gossip_engine = &mut charlie_gossip_engine;
+		futures::select! {
+			// pump gossip engine
+			_ = gossip_engine => unreachable!(),
+			// pump network
+			_ = pump_net.fuse() => unreachable!(),
+			// verify finality proof has been gossipped
+			proof = charlie_gossip_proofs.next() => {
+				let proof = proof.unwrap();
+				let (round, _) = proof_block_num_and_set_id::<Block>(&proof);
+				match round {
+					1 => continue, // finality proof generated by Charlie in the previous round
+					2 => break,	// finality proof generated by Alice or Bob and gossiped to Charlie
+					_ => panic!("Charlie got unexpected finality proof"),
+				}
+			},
+		}
+	}
+}
diff --git a/substrate/client/consensus/beefy/src/worker.rs b/substrate/client/consensus/beefy/src/worker.rs
index 0260d7693c6540e32804b210e8829672113e6a8d..19225ec214578cb2ff438aca9d58073a7d53bf78 100644
--- a/substrate/client/consensus/beefy/src/worker.rs
+++ b/substrate/client/consensus/beefy/src/worker.rs
@@ -18,7 +18,7 @@
 
 use crate::{
 	communication::{
-		gossip::{topic, GossipValidator, GossipVoteFilter},
+		gossip::{proofs_topic, votes_topic, GossipFilterCfg, GossipMessage, GossipValidator},
 		request_response::outgoing_requests_engine::OnDemandJustificationsEngine,
 	},
 	error::Error,
@@ -42,7 +42,7 @@ use sp_consensus_beefy::{
 	check_equivocation_proof,
 	crypto::{AuthorityId, Signature},
 	BeefyApi, Commitment, ConsensusLog, EquivocationProof, PayloadProvider, ValidatorSet,
-	ValidatorSetId, VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID,
+	VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID,
 };
 use sp_runtime::{
 	generic::OpaqueDigestItemId,
@@ -158,8 +158,8 @@ impl<B: Block> VoterOracle<B> {
 		self.sessions.front_mut().ok_or(Error::UninitSession)
 	}
 
-	fn current_validator_set_id(&self) -> Result<ValidatorSetId, Error> {
-		self.active_rounds().map(|r| r.validator_set_id())
+	fn current_validator_set(&self) -> Result<&ValidatorSet<AuthorityId>, Error> {
+		self.active_rounds().map(|r| r.validator_set())
 	}
 
 	// Prune the sessions queue to keep the Oracle in one of the expected three states.
@@ -301,10 +301,10 @@ impl<B: Block> PersistedState<B> {
 		self.voting_oracle.best_grandpa_block_header = best_grandpa;
 	}
 
-	pub(crate) fn current_gossip_filter(&self) -> Result<GossipVoteFilter<B>, Error> {
+	pub(crate) fn gossip_filter_config(&self) -> Result<GossipFilterCfg<B>, Error> {
 		let (start, end) = self.voting_oracle.accepted_interval()?;
-		let validator_set_id = self.voting_oracle.current_validator_set_id()?;
-		Ok(GossipVoteFilter { start, end, validator_set_id })
+		let validator_set = self.voting_oracle.current_validator_set()?;
+		Ok(GossipFilterCfg { start, end, validator_set })
 	}
 }
 
@@ -494,7 +494,7 @@ where
 			// Update gossip validator votes filter.
 			if let Err(e) = self
 				.persisted_state
-				.current_gossip_filter()
+				.gossip_filter_config()
 				.map(|filter| self.gossip_validator.update_filter(filter))
 			{
 				error!(target: LOG_TARGET, "🥩 Voter error: {:?}", e);
@@ -509,7 +509,12 @@ where
 	) -> Result<(), Error> {
 		let block_num = vote.commitment.block_number;
 		match self.voting_oracle().triage_round(block_num)? {
-			RoundAction::Process => self.handle_vote(vote)?,
+			RoundAction::Process =>
+				if let Some(finality_proof) = self.handle_vote(vote)? {
+					let gossip_proof = GossipMessage::<B>::FinalityProof(finality_proof);
+					let encoded_proof = gossip_proof.encode();
+					self.gossip_engine.gossip_message(proofs_topic::<B>(), encoded_proof, true);
+				},
 			RoundAction::Drop => metric_inc!(self, beefy_stale_votes),
 			RoundAction::Enqueue => error!(target: LOG_TARGET, "🥩 unexpected vote: {:?}.", vote),
 		};
@@ -554,7 +559,7 @@ where
 	fn handle_vote(
 		&mut self,
 		vote: VoteMessage<NumberFor<B>, AuthorityId, Signature>,
-	) -> Result<(), Error> {
+	) -> Result<Option<BeefyVersionedFinalityProof<B>>, Error> {
 		let rounds = self.persisted_state.voting_oracle.active_rounds_mut()?;
 
 		let block_number = vote.commitment.block_number;
@@ -567,8 +572,9 @@ where
 				);
 				// We created the `finality_proof` and know to be valid.
 				// New state is persisted after finalization.
-				self.finalize(finality_proof)?;
+				self.finalize(finality_proof.clone())?;
 				metric_inc!(self, beefy_good_votes_processed);
+				return Ok(Some(finality_proof))
 			},
 			VoteImportResult::Ok => {
 				// Persist state after handling mandatory block vote.
@@ -590,7 +596,7 @@ where
 			VoteImportResult::Invalid => metric_inc!(self, beefy_invalid_votes),
 			VoteImportResult::Stale => metric_inc!(self, beefy_stale_votes),
 		};
-		Ok(())
+		Ok(None)
 	}
 
 	/// Provide BEEFY finality for block based on `finality_proof`:
@@ -643,7 +649,7 @@ where
 
 		// Update gossip validator votes filter.
 		self.persisted_state
-			.current_gossip_filter()
+			.gossip_filter_config()
 			.map(|filter| self.gossip_validator.update_filter(filter))?;
 		Ok(())
 	}
@@ -758,20 +764,20 @@ where
 			BeefyKeystore::verify(&authority_id, &signature, &encoded_commitment)
 		);
 
-		let message = VoteMessage { commitment, id: authority_id, signature };
-
-		let encoded_message = message.encode();
-
-		metric_inc!(self, beefy_votes_sent);
-
-		debug!(target: LOG_TARGET, "🥩 Sent vote message: {:?}", message);
-
-		if let Err(err) = self.handle_vote(message) {
+		let vote = VoteMessage { commitment, id: authority_id, signature };
+		if let Some(finality_proof) = self.handle_vote(vote.clone()).map_err(|err| {
 			error!(target: LOG_TARGET, "🥩 Error handling self vote: {}", err);
+			err
+		})? {
+			let encoded_proof = GossipMessage::<B>::FinalityProof(finality_proof).encode();
+			self.gossip_engine.gossip_message(proofs_topic::<B>(), encoded_proof, true);
+		} else {
+			metric_inc!(self, beefy_votes_sent);
+			debug!(target: LOG_TARGET, "🥩 Sent vote message: {:?}", vote);
+			let encoded_vote = GossipMessage::<B>::Vote(vote).encode();
+			self.gossip_engine.gossip_message(votes_topic::<B>(), encoded_vote, false);
 		}
 
-		self.gossip_engine.gossip_message(topic::<B>(), encoded_message, false);
-
 		// Persist state after vote to avoid double voting in case of voter restarts.
 		self.persisted_state.best_voted = target_number;
 		metric_set!(self, beefy_best_voted, target_number);
@@ -816,17 +822,28 @@ where
 
 		let mut votes = Box::pin(
 			self.gossip_engine
-				.messages_for(topic::<B>())
+				.messages_for(votes_topic::<B>())
 				.filter_map(|notification| async move {
-					let vote = VoteMessage::<NumberFor<B>, AuthorityId, Signature>::decode(
-						&mut &notification.message[..],
-					)
-					.ok();
+					let vote = GossipMessage::<B>::decode(&mut &notification.message[..])
+						.ok()
+						.and_then(|message| message.unwrap_vote());
 					trace!(target: LOG_TARGET, "🥩 Got vote message: {:?}", vote);
 					vote
 				})
 				.fuse(),
 		);
+		let mut gossip_proofs = Box::pin(
+			self.gossip_engine
+				.messages_for(proofs_topic::<B>())
+				.filter_map(|notification| async move {
+					let proof = GossipMessage::<B>::decode(&mut &notification.message[..])
+						.ok()
+						.and_then(|message| message.unwrap_finality_proof());
+					trace!(target: LOG_TARGET, "🥩 Got gossip proof message: {:?}", proof);
+					proof
+				})
+				.fuse(),
+		);
 
 		loop {
 			// Act on changed 'state'.
@@ -872,6 +889,20 @@ where
 						return;
 					}
 				},
+				justif = gossip_proofs.next() => {
+					if let Some(justif) = justif {
+						// Gossiped justifications have already been verified by `GossipValidator`.
+						if let Err(err) = self.triage_incoming_justif(justif) {
+							debug!(target: LOG_TARGET, "🥩 {}", err);
+						}
+					} else {
+						error!(
+							target: LOG_TARGET,
+							"🥩 Finality proofs gossiping stream terminated, closing worker."
+						);
+						return;
+					}
+				},
 				// Finally process incoming votes.
 				vote = votes.next() => {
 					if let Some(vote) = vote {
@@ -880,7 +911,10 @@ where
 							debug!(target: LOG_TARGET, "🥩 {}", err);
 						}
 					} else {
-						error!(target: LOG_TARGET, "🥩 Votes gossiping stream terminated, closing worker.");
+						error!(
+							target: LOG_TARGET,
+							"🥩 Votes gossiping stream terminated, closing worker."
+						);
 						return;
 					}
 				},