diff --git a/substrate/core/finality-grandpa/src/communication.rs b/substrate/core/finality-grandpa/src/communication.rs
index 4bfbe4c74e29640d831441e93f5791ecf2dba0e9..18fbcbfeb461335316b13b7b0b492584345cfb3b 100644
--- a/substrate/core/finality-grandpa/src/communication.rs
+++ b/substrate/core/finality-grandpa/src/communication.rs
@@ -28,7 +28,8 @@ use parity_codec::{Encode, Decode};
 use substrate_primitives::{ed25519, Ed25519AuthorityId};
 use runtime_primitives::traits::Block as BlockT;
 use tokio::timer::Interval;
-use crate::{Error, Network, Message, SignedMessage, Commit, CompactCommit};
+use crate::{Error, Network, Message, SignedMessage, Commit,
+	CompactCommit, GossipMessage, FullCommitMessage, VoteOrPrecommitMessage};
 
 fn localized_payload<E: Encode>(round: u64, set_id: u64, message: &E) -> Vec<u8> {
 	(message, round, set_id).encode()
@@ -259,8 +260,6 @@ pub(crate) fn check_message_sig<Block: BlockT>(
 /// converts a message stream into a stream of signed messages.
 /// the output stream checks signatures also.
 pub(crate) fn checked_message_stream<Block: BlockT, S>(
-	round: u64,
-	set_id: u64,
 	inner: S,
 	voters: Arc<VoterSet<Ed25519AuthorityId>>,
 )
@@ -269,28 +268,27 @@ pub(crate) fn checked_message_stream<Block: BlockT, S>(
 {
 	inner
 		.filter_map(|raw| {
-			let decoded = SignedMessage::<Block>::decode(&mut &raw[..]);
+			let decoded = GossipMessage::<Block>::decode(&mut &raw[..]);
 			if decoded.is_none() {
 				debug!(target: "afg", "Skipping malformed message {:?}", raw);
 			}
 			decoded
 		})
 		.and_then(move |msg| {
-			// check signature.
-			if !voters.contains_key(&msg.id) {
-				debug!(target: "afg", "Skipping message from unknown voter {}", msg.id);
-				return Ok(None);
+			match msg {
+				GossipMessage::VoteOrPrecommit(msg) => {
+					// check signature.
+					if !voters.contains_key(&msg.message.id) {
+						debug!(target: "afg", "Skipping message from unknown voter {}", msg.message.id);
+						return Ok(None);
+					}
+					Ok(Some(msg.message))
+				}
+				_ => {
+					debug!(target: "afg", "Skipping unknown message type");
+					return Ok(None);
+				}
 			}
-
-			// we ignore messages where the signature doesn't check out.
-			let res = check_message_sig::<Block>(
-				&msg.message,
-				&msg.id,
-				&msg.signature,
-				round,
-				set_id
-			);
-			Ok(res.map(move |()| msg).ok())
 		})
 		.filter_map(|x| x)
 		.map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream")))
@@ -322,10 +320,16 @@ impl<Block: BlockT, N: Network<Block>> Sink for OutgoingMessages<Block, N>
 				id: local_id,
 			};
 
+			let message = GossipMessage::VoteOrPrecommit(VoteOrPrecommitMessage::<Block> {
+				message: signed.clone(),
+				round: self.round,
+				set_id: self.set_id,
+			});
+
 			// announce our block hash to peers and propagate the
 			// message.
 			self.network.announce(self.round, self.set_id, target_hash);
-			self.network.send_message(self.round, self.set_id, signed.encode());
+			self.network.send_message(self.round, self.set_id, message.encode());
 
 			// forward the message to the inner sender.
 			let _ = self.sender.unbounded_send(signed);
@@ -392,34 +396,18 @@ pub(crate) fn outgoing_messages<Block: BlockT, N: Network<Block>>(
 fn check_compact_commit<Block: BlockT>(
 	msg: CompactCommit<Block>,
 	voters: &VoterSet<Ed25519AuthorityId>,
-	round: u64,
-	set_id: u64,
 ) -> Option<CompactCommit<Block>> {
-	use grandpa::Message as GrandpaMessage;
 	if msg.precommits.len() != msg.auth_data.len() || msg.precommits.is_empty() {
 		debug!(target: "afg", "Skipping malformed compact commit");
 		return None;
 	}
 
 	// check signatures on all contained precommits.
-	for (precommit, &(ref sig, ref id)) in msg.precommits.iter().zip(&msg.auth_data) {
+	for (_, ref id) in &msg.auth_data {
 		if !voters.contains_key(id) {
 			debug!(target: "afg", "Skipping commit containing unknown voter {}", id);
 			return None;
 		}
-
-		let res = check_message_sig::<Block>(
-			&GrandpaMessage::Precommit(precommit.clone()),
-			id,
-			sig,
-			round,
-			set_id,
-		);
-
-		if let Err(()) = res {
-			debug!(target: "afg", "Skipping commit containing bad message");
-			return None;
-		}
 	}
 
 	Some(msg)
@@ -428,7 +416,6 @@ fn check_compact_commit<Block: BlockT>(
 /// A stream for incoming commit messages. This checks all the signatures on the
 /// messages.
 pub(crate) fn checked_commit_stream<Block: BlockT, S>(
-	set_id: u64,
 	inner: S,
 	voters: Arc<VoterSet<Ed25519AuthorityId>>,
 )
@@ -438,14 +425,23 @@ pub(crate) fn checked_commit_stream<Block: BlockT, S>(
 	inner
 		.filter_map(|raw| {
 			// this could be optimized by decoding piecewise.
-			let decoded = <(u64, CompactCommit<Block>)>::decode(&mut &raw[..]);
+			let decoded = GossipMessage::<Block>::decode(&mut &raw[..]);
 			if decoded.is_none() {
 				trace!(target: "afg", "Skipping malformed commit message {:?}", raw);
 			}
 			decoded
 		})
-		.filter_map(move |(round, msg)| {
-			check_compact_commit::<Block>(msg, &*voters, round, set_id).map(move |c| (round, c))
+		.filter_map(move |msg| {
+			match msg {
+				GossipMessage::Commit(msg) => {
+					let round = msg.round;
+					check_compact_commit::<Block>(msg.message, &*voters).map(move |c| (round, c))
+				},
+				_ => {
+					debug!(target: "afg", "Skipping unknown message type");
+					return None;
+				}
+			}
 		})
 		.map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream")))
 }
@@ -491,7 +487,13 @@ impl<Block: BlockT, N: Network<Block>> Sink for CommitsOut<Block, N> {
 			auth_data
 		};
 
-		self.network.send_commit(round, self.set_id, Encode::encode(&(round, compact_commit)));
+		let message = FullCommitMessage::<Block> {
+			round: round,
+			set_id: self.set_id,
+			message: compact_commit,
+		};
+
+		self.network.send_commit(round, self.set_id, Encode::encode(&message));
 
 		Ok(AsyncSink::Ready)
 	}
diff --git a/substrate/core/finality-grandpa/src/environment.rs b/substrate/core/finality-grandpa/src/environment.rs
index d210f0fa8b01bc1dcc29a8e9a17daf927fea54f3..71424e8be9161089355263d97c8a9e59f9d78496 100644
--- a/substrate/core/finality-grandpa/src/environment.rs
+++ b/substrate/core/finality-grandpa/src/environment.rs
@@ -248,8 +248,6 @@ impl<B, E, Block: BlockT<Hash=H256>, N, RA> voter::Environment<Block::Hash, Numb
 		let precommit_timer = Delay::new(now + self.config.gossip_duration * 4);
 
 		let incoming = crate::communication::checked_message_stream::<Block, _>(
-			round,
-			self.set_id,
 			self.network.messages_for(round, self.set_id),
 			self.voters.clone(),
 		);
diff --git a/substrate/core/finality-grandpa/src/lib.rs b/substrate/core/finality-grandpa/src/lib.rs
index 5400ae0dd017d52e78d8f569010c37e911506654..ad00980d3c4e413d64048e225a26b9b1f3d561a0 100644
--- a/substrate/core/finality-grandpa/src/lib.rs
+++ b/substrate/core/finality-grandpa/src/lib.rs
@@ -53,7 +53,7 @@
 //! included in the newly-finalized chain.
 
 use futures::prelude::*;
-use log::{debug, info, warn};
+use log::{debug, info, warn, trace};
 use futures::sync::{self, mpsc, oneshot};
 use client::{
 	BlockchainEvents, CallExecutor, Client, backend::Backend,
@@ -61,6 +61,7 @@ use client::{
 };
 use client::blockchain::HeaderBackend;
 use parity_codec::{Encode, Decode};
+use parity_codec_derive::{Encode, Decode};
 use runtime_primitives::traits::{
 	NumberFor, Block as BlockT, Header as HeaderT, DigestFor, ProvideRuntimeApi, Hash as HashT,
 	DigestItemFor, DigestItem,
@@ -73,6 +74,7 @@ use grandpa::Error as GrandpaError;
 use grandpa::{voter, round::State as RoundState, BlockNumberOps, VoterSet};
 
 use network::Service as NetworkService;
+use network::consensus_gossip as network_gossip;
 use std::sync::Arc;
 use std::time::Duration;
 
@@ -106,6 +108,10 @@ const LAST_COMPLETED_KEY: &[u8] = b"grandpa_completed_round";
 const AUTHORITY_SET_KEY: &[u8] = b"grandpa_voters";
 const CONSENSUS_CHANGES_KEY: &[u8] = b"grandpa_consensus_changes";
 
+const GRANDPA_ENGINE_ID: network::ConsensusEngineId = [b'a', b'f', b'g', b'1'];
+
+const MESSAGE_ROUND_TOLERANCE: u64 = 2;
+
 /// round-number, round-state
 type LastCompleted<H, N> = (u64, RoundState<H, N>);
 
@@ -118,6 +124,25 @@ pub type SignedMessage<Block> = grandpa::SignedMessage<
 	ed25519::Signature,
 	Ed25519AuthorityId,
 >;
+
+/// Grandpa gossip message type.
+/// This is the root type that gets encoded and sent on the network.
+#[derive(Debug, Encode, Decode)]
+pub enum GossipMessage<Block: BlockT> {
+	/// Grandpa message with round and set info.
+	VoteOrPrecommit(VoteOrPrecommitMessage<Block>),
+	/// Grandpa commit message with round and set info.
+	Commit(FullCommitMessage<Block>),
+}
+
+/// Network level message with topic information.
+#[derive(Debug, Encode, Decode)]
+pub struct VoteOrPrecommitMessage<Block: BlockT> {
+	pub round: u64,
+	pub set_id: u64,
+	pub message: SignedMessage<Block>,
+}
+
 /// A prevote message for this chain's block type.
 pub type Prevote<Block> = grandpa::Prevote<<Block as BlockT>::Hash, NumberFor<Block>>;
 /// A precommit message for this chain's block type.
@@ -137,6 +162,14 @@ pub type CompactCommit<Block> = grandpa::CompactCommit<
 	Ed25519AuthorityId
 >;
 
+/// Network level commit message with topic information.
+#[derive(Debug, Encode, Decode)]
+pub struct FullCommitMessage<Block: BlockT> {
+	pub round: u64,
+	pub set_id: u64,
+	pub message: CompactCommit<Block>,
+}
+
 /// Configuration for the GRANDPA service.
 #[derive(Clone)]
 pub struct Config {
@@ -213,6 +246,142 @@ impl Stream for NetworkStream {
 	}
 }
 
+struct TopicTracker {
+	min_live_round: u64,
+	max_round: u64,
+	set_id: u64,
+}
+
+struct GossipValidator<Block: BlockT> {
+	rounds: parking_lot::RwLock<TopicTracker>,
+	_marker: ::std::marker::PhantomData<Block>,
+}
+
+impl<Block: BlockT> GossipValidator<Block> {
+	fn new() -> GossipValidator<Block> {
+		GossipValidator {
+			rounds: parking_lot::RwLock::new(TopicTracker {
+				min_live_round: 0,
+				max_round: 0,
+				set_id: 0,
+			}),
+			_marker: Default::default(),
+		}
+	}
+
+	fn note_round(&self, round: u64, set_id: u64) {
+		let mut rounds = self.rounds.write();
+		if set_id > rounds.set_id {
+			rounds.set_id = set_id;
+			rounds.max_round = 0;
+			rounds.min_live_round = 0;
+		}
+		rounds.max_round = rounds.max_round.max(round);
+	}
+
+	fn note_set(&self, _set_id: u64) {
+	}
+
+	fn drop_round(&self, round: u64, set_id: u64) {
+		let mut rounds = self.rounds.write();
+		if set_id == rounds.set_id && round >= rounds.min_live_round {
+			rounds.min_live_round = round + 1;
+		}
+	}
+
+	fn drop_set(&self, _set_id: u64) {
+	}
+
+	fn is_expired(&self, round: u64, set_id: u64) -> bool {
+		let rounds = self.rounds.read();
+		if set_id < rounds.set_id {
+			trace!(target: "afg", "Expired: Message with expired set_id {} (ours {})", set_id, rounds.set_id);
+			return true;
+		} else if set_id == rounds.set_id + 1 {
+			// allow a few first rounds of future set.
+			if round > MESSAGE_ROUND_TOLERANCE {
+				trace!(target: "afg", "Expired: Message too far in the future set, round {} (ours set_id {})", round, rounds.set_id);
+				return true;
+			}
+		} else if set_id == rounds.set_id {
+			if round < rounds.min_live_round.saturating_sub(MESSAGE_ROUND_TOLERANCE) {
+				trace!(target: "afg", "Expired: Message round is out of bounds {} (ours {}-{})", round, rounds.min_live_round, rounds.max_round);
+				return true;
+			}
+		} else {
+			trace!(target: "afg", "Expired: Message in invalid future set {} (ours {})", set_id, rounds.set_id);
+			return true;
+		}
+		false
+	}
+
+	fn validate_round_message(&self, full: VoteOrPrecommitMessage<Block>)
+		-> network_gossip::ValidationResult<Block::Hash>
+	{
+		if self.is_expired(full.round, full.set_id) {
+			return network_gossip::ValidationResult::Expired;
+		}
+
+		if let Err(()) = communication::check_message_sig::<Block>(
+			&full.message.message,
+			&full.message.id,
+			&full.message.signature,
+			full.round,
+			full.set_id
+		) {
+			debug!(target: "afg", "Bad message signature {}", full.message.id);
+			return network_gossip::ValidationResult::Invalid;
+		}
+
+		let topic = message_topic::<Block>(full.round, full.set_id);
+		network_gossip::ValidationResult::Valid(topic)
+	}
+
+	fn validate_commit_message(&self, full: FullCommitMessage<Block>)
+		-> network_gossip::ValidationResult<Block::Hash>
+	{
+		use grandpa::Message as GrandpaMessage;
+		if self.is_expired(full.round, full.set_id) {
+			return network_gossip::ValidationResult::Expired;
+		}
+
+		if full.message.precommits.len() != full.message.auth_data.len() || full.message.precommits.is_empty() {
+			debug!(target: "afg", "Malformed compact commit");
+			return network_gossip::ValidationResult::Invalid;
+		}
+
+		// check signatures on all contained precommits.
+		for (precommit, &(ref sig, ref id)) in full.message.precommits.iter().zip(&full.message.auth_data) {
+			if let Err(()) = communication::check_message_sig::<Block>(
+				&GrandpaMessage::Precommit(precommit.clone()),
+				id,
+				sig,
+				full.round,
+				full.set_id,
+			) {
+				debug!(target: "afg", "Bad commit message signature {}", id);
+				return network_gossip::ValidationResult::Invalid;
+			}
+		}
+
+		let topic = commit_topic::<Block>(full.set_id);
+		network_gossip::ValidationResult::Valid(topic)
+	}
+}
+
+impl<Block: BlockT> network_gossip::Validator<Block::Hash> for GossipValidator<Block> {
+	fn validate(&self, mut data: &[u8]) -> network_gossip::ValidationResult<Block::Hash> {
+		match GossipMessage::<Block>::decode(&mut data) {
+			Some(GossipMessage::VoteOrPrecommit(message)) => self.validate_round_message(message),
+			Some(GossipMessage::Commit(message)) => self.validate_commit_message(message),
+			None => {
+				debug!(target: "afg", "Error decoding message");
+				network_gossip::ValidationResult::Invalid
+			}
+		}
+	}
+}
+
 /// A handle to the network. This is generally implemented by providing some
 /// handle to a gossip service or similar.
 ///
@@ -247,20 +416,27 @@ pub trait Network<Block: BlockT>: Clone {
 
 ///  Bridge between NetworkService, gossiping consensus messages and Grandpa
 pub struct NetworkBridge<B: BlockT, S: network::specialization::NetworkSpecialization<B>> {
-	service: Arc<NetworkService<B, S>>
+	service: Arc<NetworkService<B, S>>,
+	validator: Arc<GossipValidator<B>>,
 }
 
 impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>> NetworkBridge<B, S> {
 	/// Create a new NetworkBridge to the given NetworkService
 	pub fn new(service: Arc<NetworkService<B, S>>) -> Self {
-		NetworkBridge { service }
+		let validator = Arc::new(GossipValidator::new());
+		let v = validator.clone();
+		service.with_gossip(move |gossip, _| {
+			gossip.register_validator(GRANDPA_ENGINE_ID, v);
+		});
+		NetworkBridge { service, validator: validator }
 	}
 }
 
 impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>,> Clone for NetworkBridge<B, S> {
 	fn clone(&self) -> Self {
 		NetworkBridge {
-			service: Arc::clone(&self.service)
+			service: Arc::clone(&self.service),
+			validator: Arc::clone(&self.validator),
 		}
 	}
 }
@@ -276,6 +452,7 @@ fn commit_topic<B: BlockT>(set_id: u64) -> B::Hash {
 impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>,> Network<B> for NetworkBridge<B, S> {
 	type In = NetworkStream;
 	fn messages_for(&self, round: u64, set_id: u64) -> Self::In {
+		self.validator.note_round(round, set_id);
 		let (tx, rx) = sync::oneshot::channel();
 		self.service.with_gossip(move |gossip, _| {
 			let inner_rx = gossip.messages_for(message_topic::<B>(round, set_id));
@@ -286,20 +463,21 @@ impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>,> Network<B
 
 	fn send_message(&self, round: u64, set_id: u64, message: Vec<u8>) {
 		let topic = message_topic::<B>(round, set_id);
-		self.service.gossip_consensus_message(topic, message, false);
+		self.service.gossip_consensus_message(topic, GRANDPA_ENGINE_ID, message);
 	}
 
 	fn drop_round_messages(&self, round: u64, set_id: u64) {
-		let topic = message_topic::<B>(round, set_id);
-		self.service.with_gossip(move |gossip, _| gossip.collect_garbage_for_topic(topic));
+		self.validator.drop_round(round, set_id);
+		self.service.with_gossip(move |gossip, _| gossip.collect_garbage());
 	}
 
 	fn drop_set_messages(&self, set_id: u64) {
-		let topic = commit_topic::<B>(set_id);
-		self.service.with_gossip(move |gossip, _| gossip.collect_garbage_for_topic(topic));
+		self.validator.drop_set(set_id);
+		self.service.with_gossip(move |gossip, _| gossip.collect_garbage());
 	}
 
 	fn commit_messages(&self, set_id: u64) -> Self::In {
+		self.validator.note_set(set_id);
 		let (tx, rx) = sync::oneshot::channel();
 		self.service.with_gossip(move |gossip, _| {
 			let inner_rx = gossip.messages_for(commit_topic::<B>(set_id));
@@ -310,7 +488,7 @@ impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>,> Network<B
 
 	fn send_commit(&self, _round: u64, set_id: u64, message: Vec<u8>) {
 		let topic = commit_topic::<B>(set_id);
-		self.service.gossip_consensus_message(topic, message, false);
+		self.service.gossip_consensus_message(topic, GRANDPA_ENGINE_ID, message);
 	}
 
 	fn announce(&self, round: u64, _set_id: u64, block: B::Hash) {
@@ -439,7 +617,6 @@ fn committer_communication<Block: BlockT<Hash=H256>, B, E, N, RA>(
 {
 	// verification stream
 	let commit_in = crate::communication::checked_commit_stream::<Block, _>(
-		set_id,
 		network.commit_messages(set_id),
 		voters.clone(),
 	);
diff --git a/substrate/core/finality-grandpa/src/tests.rs b/substrate/core/finality-grandpa/src/tests.rs
index ac64b7d24899d8ea391c16e15ee87282c3c8a6a3..029e517943ad5dcb7ef452d7d1892886dfd354c0 100644
--- a/substrate/core/finality-grandpa/src/tests.rs
+++ b/substrate/core/finality-grandpa/src/tests.rs
@@ -139,13 +139,24 @@ impl TestNetFactory for GrandpaTestNet {
 struct MessageRouting {
 	inner: Arc<Mutex<GrandpaTestNet>>,
 	peer_id: usize,
+	validator: Arc<GossipValidator<Block>>,
 }
 
 impl MessageRouting {
 	fn new(inner: Arc<Mutex<GrandpaTestNet>>, peer_id: usize,) -> Self {
+		let validator = Arc::new(GossipValidator::new());
+		let v = validator.clone();
+		{
+			let inner = inner.lock();
+			let peer = inner.peer(peer_id);
+			peer.with_gossip(move |gossip, _| {
+				gossip.register_validator(GRANDPA_ENGINE_ID, v);
+			});
+		}
 		MessageRouting {
 			inner,
 			peer_id,
+			validator,
 		}
 	}
 
@@ -157,37 +168,18 @@ impl MessageRouting {
 }
 
 fn make_topic(round: u64, set_id: u64) -> Hash {
-	let mut hash = Hash::default();
-	round.using_encoded(|s| {
-		let raw = hash.as_mut();
-		raw[..8].copy_from_slice(s);
-	});
-	set_id.using_encoded(|s| {
-		let raw = hash.as_mut();
-		raw[8..16].copy_from_slice(s);
-	});
-	hash
+	message_topic::<Block>(round, set_id)
 }
 
 fn make_commit_topic(set_id: u64) -> Hash {
-	let mut hash = Hash::default();
-
-	{
-		let raw = hash.as_mut();
-		raw[16..22].copy_from_slice(b"commit");
-	}
-	set_id.using_encoded(|s| {
-		let raw = hash.as_mut();
-		raw[24..].copy_from_slice(s);
-	});
-
-	hash
+	commit_topic::<Block>(set_id)
 }
 
 impl Network<Block> for MessageRouting {
 	type In = Box<Stream<Item=Vec<u8>,Error=()> + Send>;
 
 	fn messages_for(&self, round: u64, set_id: u64) -> Self::In {
+		self.validator.note_round(round, set_id);
 		let inner = self.inner.lock();
 		let peer = inner.peer(self.peer_id);
 		let messages = peer.consensus_gossip_messages_for(make_topic(round, set_id));
@@ -201,20 +193,23 @@ impl Network<Block> for MessageRouting {
 
 	fn send_message(&self, round: u64, set_id: u64, message: Vec<u8>) {
 		let inner = self.inner.lock();
-		inner.peer(self.peer_id).gossip_message(make_topic(round, set_id), message, false);
+		inner.peer(self.peer_id).gossip_message(make_topic(round, set_id), GRANDPA_ENGINE_ID, message);
 	}
 
 	fn drop_round_messages(&self, round: u64, set_id: u64) {
+		self.validator.drop_round(round, set_id);
 		let topic = make_topic(round, set_id);
 		self.drop_messages(topic);
 	}
 
 	fn drop_set_messages(&self, set_id: u64) {
+		self.validator.drop_set(set_id);
 		let topic = make_commit_topic(set_id);
 		self.drop_messages(topic);
 	}
 
 	fn commit_messages(&self, set_id: u64) -> Self::In {
+		self.validator.note_set(set_id);
 		let inner = self.inner.lock();
 		let peer = inner.peer(self.peer_id);
         let messages = peer.consensus_gossip_messages_for(make_commit_topic(set_id));
@@ -228,7 +223,7 @@ impl Network<Block> for MessageRouting {
 
 	fn send_commit(&self, _round: u64, set_id: u64, message: Vec<u8>) {
 		let inner = self.inner.lock();
-		inner.peer(self.peer_id).gossip_message(make_commit_topic(set_id), message, false);
+		inner.peer(self.peer_id).gossip_message(make_commit_topic(set_id), GRANDPA_ENGINE_ID, message);
 	}
 
 	fn announce(&self, _round: u64, _set_id: u64, _block: H256) {
@@ -516,6 +511,7 @@ fn finalize_3_voters_1_observer() {
 
 #[test]
 fn transition_3_voters_twice_1_observer() {
+	let _ = env_logger::try_init();
 	let peers_a = &[
 		Keyring::Alice,
 		Keyring::Bob,
diff --git a/substrate/core/network/src/consensus_gossip.rs b/substrate/core/network/src/consensus_gossip.rs
index 68f5db835fbd9e70a8c9d728a7d4753f0858c47a..720cf55a67b75e3697bcd96b80b6274cc7b6c9bc 100644
--- a/substrate/core/network/src/consensus_gossip.rs
+++ b/substrate/core/network/src/consensus_gossip.rs
@@ -18,20 +18,22 @@
 //! Handles chain-specific and standard BFT messages.
 
 use std::collections::{HashMap, HashSet};
+use std::sync::Arc;
 use std::time::{Instant, Duration};
 use log::{trace, debug};
 use futures::sync::mpsc;
 use rand::{self, seq::SliceRandom};
 use lru_cache::LruCache;
-use network_libp2p::NodeIndex;
+use network_libp2p::{Severity, NodeIndex};
 use runtime_primitives::traits::{Block as BlockT, Hash, HashFor};
 pub use crate::message::generic::{Message, ConsensusMessage};
 use crate::protocol::Context;
 use crate::config::Roles;
+use crate::ConsensusEngineId;
 
 // FIXME: Add additional spam/DoS attack protection: https://github.com/paritytech/substrate/issues/1115
 const MESSAGE_LIFETIME: Duration = Duration::from_secs(120);
-const DEAD_TOPICS_CACHE_SIZE: usize = 4096;
+const KNOWN_MESSAGES_CACHE_SIZE: usize = 4096;
 
 struct PeerConsensus<H> {
 	known_messages: HashSet<H>,
@@ -39,21 +41,35 @@ struct PeerConsensus<H> {
 }
 
 struct MessageEntry<B: BlockT> {
-	topic: B::Hash,
 	message_hash: B::Hash,
+	topic: B::Hash,
 	message: ConsensusMessage,
-	broadcast: bool,
+	timestamp: Instant,
+}
+
+/// Message validation result.
+pub enum ValidationResult<H> {
+	/// Message is valid with this topic.
+	Valid(H),
+	/// Invalid message.
+	Invalid,
+	/// Obsolete message.
+	Expired,
+}
+
+/// Validates consensus messages.
+pub trait Validator<H> {
+	/// Validate consensus message.
+	fn validate(&self, data: &[u8]) -> ValidationResult<H>;
 }
 
 /// Consensus network protocol handler. Manages statements and candidate requests.
 pub struct ConsensusGossip<B: BlockT> {
-	peers: HashMap<NodeIndex, PeerConsensus<(B::Hash, B::Hash)>>,
-	live_message_sinks: HashMap<B::Hash, Vec<mpsc::UnboundedSender<ConsensusMessage>>>,
+	peers: HashMap<NodeIndex, PeerConsensus<B::Hash>>,
+	live_message_sinks: HashMap<B::Hash, Vec<mpsc::UnboundedSender<Vec<u8>>>>,
 	messages: Vec<MessageEntry<B>>,
-	known_messages: HashSet<(B::Hash, B::Hash)>,
-	known_dead_topics: LruCache<B::Hash, ()>,
-	message_times: HashMap<(B::Hash, B::Hash), Instant>,
-	session_start: Option<B::Hash>,
+	known_messages: LruCache<B::Hash, ()>,
+	validators: HashMap<ConsensusEngineId, Arc<Validator<B::Hash>>>,
 }
 
 impl<B: BlockT> ConsensusGossip<B> {
@@ -63,10 +79,8 @@ impl<B: BlockT> ConsensusGossip<B> {
 			peers: HashMap::new(),
 			live_message_sinks: HashMap::new(),
 			messages: Default::default(),
-			known_messages: Default::default(),
-			known_dead_topics: LruCache::new(DEAD_TOPICS_CACHE_SIZE),
-			message_times: Default::default(),
-			session_start: None
+			known_messages: LruCache::new(KNOWN_MESSAGES_CACHE_SIZE),
+			validators: Default::default(),
 		}
 	}
 
@@ -75,15 +89,22 @@ impl<B: BlockT> ConsensusGossip<B> {
 		self.live_message_sinks.clear();
 	}
 
+	/// Register message validator for a message type.
+	pub fn register_validator(&mut self, engine_id: ConsensusEngineId, validator: Arc<Validator<B::Hash>>) {
+		self.validators.insert(engine_id, validator);
+	}
+
 	/// Handle new connected peer.
 	pub fn new_peer(&mut self, protocol: &mut Context<B>, who: NodeIndex, roles: Roles) {
 		if roles.intersects(Roles::AUTHORITY) {
 			trace!(target:"gossip", "Registering {:?} {}", roles, who);
+			let now = Instant::now();
 			// Send out all known messages to authorities.
 			let mut known_messages = HashSet::new();
 			for entry in self.messages.iter() {
-				known_messages.insert((entry.topic, entry.message_hash));
-				protocol.send_message(who, Message::Consensus(entry.topic.clone(), entry.message.clone(), entry.broadcast));
+				if entry.timestamp + MESSAGE_LIFETIME < now { continue };
+				known_messages.insert(entry.message_hash);
+				protocol.send_message(who, Message::Consensus(entry.message.clone()));
 			}
 			self.peers.insert(who, PeerConsensus {
 				known_messages,
@@ -102,30 +123,18 @@ impl<B: BlockT> ConsensusGossip<B> {
 		&mut self,
 		protocol: &mut Context<B>,
 		message_hash: B::Hash,
-		topic: B::Hash,
-		broadcast: bool,
 		get_message: F,
 	)
 		where F: Fn() -> ConsensusMessage,
 	{
-		if broadcast {
-			for (id, ref mut peer) in self.peers.iter_mut() {
-				if peer.known_messages.insert((topic.clone(), message_hash.clone())) {
-					let message = get_message();
-					if peer.is_authority {
-						trace!(target:"gossip", "Propagating to authority {}: {:?}", id, message);
-					} else {
-						trace!(target:"gossip", "Propagating to {}: {:?}", id, message);
-					}
-					protocol.send_message(*id, Message::Consensus(topic, message, broadcast));
-				}
-			}
-
-			return;
-		}
-
 		let mut non_authorities: Vec<_> = self.peers.iter()
-			.filter_map(|(id, ref peer)| if !peer.is_authority && !peer.known_messages.contains(&(topic, message_hash)) { Some(*id) } else { None })
+			.filter_map(|(id, ref peer)|
+				if !peer.is_authority && !peer.known_messages.contains(&message_hash) {
+					Some(*id)
+				} else {
+					None
+				}
+			)
 			.collect();
 
 		non_authorities.shuffle(&mut rand::thread_rng());
@@ -137,34 +146,33 @@ impl<B: BlockT> ConsensusGossip<B> {
 
 		for (id, ref mut peer) in self.peers.iter_mut() {
 			if peer.is_authority {
-				if peer.known_messages.insert((topic.clone(), message_hash.clone())) {
+				if peer.known_messages.insert(message_hash.clone()) {
 					let message = get_message();
 					trace!(target:"gossip", "Propagating to authority {}: {:?}", id, message);
-					protocol.send_message(*id, Message::Consensus(topic, message, broadcast));
+					protocol.send_message(*id, Message::Consensus(message));
 				}
 			} else if non_authorities.contains(&id) {
-				let message = get_message();
-				trace!(target:"gossip", "Propagating to {}: {:?}", id, message);
-				peer.known_messages.insert((topic.clone(), message_hash.clone()));
-				protocol.send_message(*id, Message::Consensus(topic, message, broadcast));
+				if peer.known_messages.insert(message_hash.clone()) {
+					let message = get_message();
+					trace!(target:"gossip", "Propagating to {}: {:?}", id, message);
+					protocol.send_message(*id, Message::Consensus(message));
+				}
 			}
 		}
 	}
 
-	fn register_message<F>(&mut self, message_hash: B::Hash, topic: B::Hash, broadcast: bool, get_message: F)
+	fn register_message<F>(&mut self, message_hash: B::Hash, topic: B::Hash, get_message: F)
 		where F: Fn() -> ConsensusMessage
 	{
-		if !self.known_dead_topics.contains_key(&topic) &&
-			self.known_messages.insert((topic, message_hash))
+		if self.known_messages.insert(message_hash, ()).is_none()
 		{
 			self.messages.push(MessageEntry {
 				topic,
 				message_hash,
-				broadcast,
 				message: get_message(),
+				timestamp: Instant::now(),
 			});
 
-			self.message_times.insert((topic, message_hash), Instant::now());
 		}
 	}
 
@@ -173,38 +181,27 @@ impl<B: BlockT> ConsensusGossip<B> {
 		self.peers.remove(&who);
 	}
 
-	/// Prune all existing messages for the given topic and mark it as dead, all
-	/// new messages for the given topic are ignored.
-	pub fn collect_garbage_for_topic(&mut self, topic: B::Hash) {
-		self.known_dead_topics.insert(topic, ());
-		self.collect_garbage(|_| true);
-	}
-
 	/// Prune old or no longer relevant consensus messages. Provide a predicate
 	/// for pruning, which returns `false` when the items with a given topic should be pruned.
-	pub fn collect_garbage<P: Fn(&B::Hash) -> bool>(&mut self, predicate: P) {
+	pub fn collect_garbage(&mut self) {
 		self.live_message_sinks.retain(|_, sinks| {
 			sinks.retain(|sink| !sink.is_closed());
 			!sinks.is_empty()
 		});
 
-		let message_times = &mut self.message_times;
 		let known_messages = &mut self.known_messages;
-		let known_dead_topics = &mut self.known_dead_topics;
 		let before = self.messages.len();
+		let validators = &self.validators;
 		let now = Instant::now();
 
 		self.messages.retain(|entry| {
-			!known_dead_topics.contains_key(&entry.topic) &&
-				message_times.get(&(entry.topic, entry.message_hash))
-					.map(|instant| *instant + MESSAGE_LIFETIME >= now && predicate(&entry.topic))
-					.unwrap_or(false)
-		});
-
-		known_messages.retain(|(topic, message_hash)| {
-			message_times.get(&(*topic, *message_hash))
-				.map(|instant| *instant + (5 * MESSAGE_LIFETIME) >= now)
-				.unwrap_or(false)
+			entry.timestamp + MESSAGE_LIFETIME >= now
+			&& match validators.get(&entry.message.engine_id)
+				.map(|v| v.validate(&entry.message.data))
+			{
+				Some(ValidationResult::Valid(_)) => true,
+				_ => false,
+			}
 		});
 
 		trace!(target: "gossip", "Cleaned up {} stale messages, {} left ({} known)",
@@ -213,18 +210,16 @@ impl<B: BlockT> ConsensusGossip<B> {
 			known_messages.len(),
 		);
 
-		message_times.retain(|h, _| known_messages.contains(h));
-
 		for (_, ref mut peer) in self.peers.iter_mut() {
-			peer.known_messages.retain(|h| known_messages.contains(h));
+			peer.known_messages.retain(|h| known_messages.contains_key(h));
 		}
 	}
 
-	/// Get all incoming messages for a topic.
-	pub fn messages_for(&mut self, topic: B::Hash) -> mpsc::UnboundedReceiver<ConsensusMessage> {
+	/// Get data of valid, incoming messages for a topic (but might have expired meanwhile)
+	pub fn messages_for(&mut self, topic: B::Hash) -> mpsc::UnboundedReceiver<Vec<u8>> {
 		let (tx, rx) = mpsc::unbounded();
 		for entry in self.messages.iter().filter(|e| e.topic == topic) {
-			tx.unbounded_send(entry.message.clone()).expect("receiver known to be live; qed");
+			tx.unbounded_send(entry.message.data.clone()).expect("receiver known to be live; qed");
 		}
 		self.live_message_sinks.entry(topic).or_default().push(tx);
 
@@ -239,29 +234,57 @@ impl<B: BlockT> ConsensusGossip<B> {
 		&mut self,
 		protocol: &mut Context<B>,
 		who: NodeIndex,
-		topic: B::Hash,
 		message: ConsensusMessage,
-		broadcast: bool,
+		is_syncing: bool,
 	) -> Option<(B::Hash, ConsensusMessage)> {
-		let message_hash = HashFor::<B>::hash(&message[..]);
-
-		if self.known_dead_topics.contains_key(&topic) {
-			trace!(target:"gossip", "Ignored message from {} in dead topic {}", who, topic);
-			return None;
-		}
+		let message_hash = HashFor::<B>::hash(&message.data[..]);
 
-		if self.known_messages.contains(&(topic, message_hash)) {
-			trace!(target:"gossip", "Ignored already known message from {} in {}", who, topic);
+		if self.known_messages.contains_key(&message_hash) {
+			trace!(target:"gossip", "Ignored already known message from {}", who);
 			return None;
 		}
 
 		if let Some(ref mut peer) = self.peers.get_mut(&who) {
 			use std::collections::hash_map::Entry;
-			peer.known_messages.insert((topic, message_hash));
+
+			//validate the message
+			let topic = match self.validators.get(&message.engine_id)
+				.map(|v| v.validate(&message.data))
+			{
+				Some(ValidationResult::Valid(topic)) => topic,
+				Some(ValidationResult::Invalid) => {
+					trace!(target:"gossip", "Invalid message from {}", who);
+					protocol.report_peer(
+						who,
+						Severity::Bad(format!("Sent invalid consensus message")),
+					);
+					return None;
+				},
+				Some(ValidationResult::Expired) => {
+					trace!(target:"gossip", "Ignored expired message from {}", who);
+					if !is_syncing {
+						protocol.report_peer(
+							who,
+							Severity::Useless(format!("Sent expired consensus message")),
+						);
+					}
+					return None;
+				}
+				None => {
+					protocol.report_peer(
+						who,
+						Severity::Useless(format!("Sent unknown consensus engine id")),
+					);
+					trace!(target:"gossip", "Unknown message engine id {:?} from {}", message.engine_id, who);
+					return None;
+				}
+			};
+
+			peer.known_messages.insert(message_hash);
 			if let Entry::Occupied(mut entry) = self.live_message_sinks.entry(topic) {
 				debug!(target: "gossip", "Pushing consensus message to sinks for {}.", topic);
 				entry.get_mut().retain(|sink| {
-					if let Err(e) = sink.unbounded_send(message.clone()) {
+					if let Err(e) = sink.unbounded_send(message.data.clone()) {
 						trace!(target:"gossip", "Error broadcasting message notification: {:?}", e);
 					}
 					!sink.is_closed()
@@ -270,13 +293,12 @@ impl<B: BlockT> ConsensusGossip<B> {
 					entry.remove_entry();
 				}
 			}
+			self.multicast_inner(protocol, message_hash, topic, || message.clone());
+			Some((topic, message))
 		} else {
 			trace!(target:"gossip", "Ignored statement from unregistered peer {}", who);
-			return None;
+			None
 		}
-
-		self.multicast_inner(protocol, message_hash, topic, broadcast, || message.clone());
-		Some((topic, message))
 	}
 
 	/// Multicast a message to all peers.
@@ -285,10 +307,9 @@ impl<B: BlockT> ConsensusGossip<B> {
 		protocol: &mut Context<B>,
 		topic: B::Hash,
 		message: ConsensusMessage,
-		broadcast: bool,
 	) {
-		let message_hash = HashFor::<B>::hash(&message);
-		self.multicast_inner(protocol, message_hash, topic, broadcast, || message.clone());
+		let message_hash = HashFor::<B>::hash(&message.data);
+		self.multicast_inner(protocol, message_hash, topic, || message.clone());
 	}
 
 	fn multicast_inner<F>(
@@ -296,20 +317,17 @@ impl<B: BlockT> ConsensusGossip<B> {
 		protocol: &mut Context<B>,
 		message_hash: B::Hash,
 		topic: B::Hash,
-		broadcast: bool,
 		get_message: F,
 	)
 		where F: Fn() -> ConsensusMessage
 	{
-		self.register_message(message_hash, topic, broadcast, &get_message);
-		self.propagate(protocol, message_hash, topic, broadcast, get_message);
+		self.register_message(message_hash, topic, &get_message);
+		self.propagate(protocol, message_hash, get_message);
 	}
 
 	/// Note new consensus session.
-	pub fn new_session(&mut self, parent_hash: B::Hash) {
-		let old_session = self.session_start.take();
-		self.session_start = Some(parent_hash);
-		self.collect_garbage(|topic| old_session.as_ref().map_or(true, |h| topic != h));
+	pub fn new_session(&mut self, _parent_hash: B::Hash) {
+		self.collect_garbage();
 	}
 }
 
@@ -323,91 +341,75 @@ mod tests {
 
 	macro_rules! push_msg {
 		($consensus:expr, $topic:expr, $hash: expr, $now: expr, $m:expr) => {
-			if $consensus.known_messages.insert(($topic, $hash)) {
+			if $consensus.known_messages.insert($hash, ()).is_none() {
 				$consensus.messages.push(MessageEntry {
 					topic: $topic,
 					message_hash: $hash,
-					message: $m,
-					broadcast: false,
+					message: ConsensusMessage { data: $m, engine_id: [0, 0, 0, 0]},
+					timestamp: $now,
 				});
-				$consensus.message_times.insert(($topic, $hash), $now);
 			}
 		}
 	}
 
 	#[test]
 	fn collects_garbage() {
+
+		struct AllowAll;
+		impl Validator<H256> for AllowAll {
+			fn validate(&self, _data: &[u8]) -> ValidationResult<H256> {
+				ValidationResult::Valid(H256::default())
+			}
+		}
+
+		struct AllowOne;
+		impl Validator<H256> for AllowOne {
+			fn validate(&self, data: &[u8]) -> ValidationResult<H256> {
+				if data[0] == 1 {
+					ValidationResult::Valid(H256::default())
+				} else {
+					ValidationResult::Expired
+				}
+			}
+		}
+
 		let prev_hash = H256::random();
 		let best_hash = H256::random();
 		let mut consensus = ConsensusGossip::<Block>::new();
-		let now = Instant::now();
 		let m1_hash = H256::random();
 		let m2_hash = H256::random();
 		let m1 = vec![1, 2, 3];
 		let m2 = vec![4, 5, 6];
 
+		let now = Instant::now();
 		push_msg!(consensus, prev_hash, m1_hash, now, m1);
 		push_msg!(consensus, best_hash, m2_hash, now, m2.clone());
-		consensus.known_messages.insert((prev_hash, m1_hash));
-		consensus.known_messages.insert((best_hash, m2_hash));
+		consensus.known_messages.insert(m1_hash, ());
+		consensus.known_messages.insert(m2_hash, ());
 
-		// nothing to collect
-		consensus.collect_garbage(|_t| true);
+		let test_engine_id = Default::default();
+		consensus.register_validator(test_engine_id, Arc::new(AllowAll));
+		consensus.collect_garbage();
 		assert_eq!(consensus.messages.len(), 2);
 		assert_eq!(consensus.known_messages.len(), 2);
 
-		// nothing to collect with default.
-		consensus.collect_garbage(|&topic| topic != Default::default());
-		assert_eq!(consensus.messages.len(), 2);
-		assert_eq!(consensus.known_messages.len(), 2);
+		consensus.register_validator(test_engine_id, Arc::new(AllowOne));
 
-		// topic that was used in one message.
-		consensus.collect_garbage(|topic| topic != &prev_hash);
+		// m2 is expired
+		consensus.collect_garbage();
 		assert_eq!(consensus.messages.len(), 1);
-		// known messages are only pruned based on expiration time
+		// known messages are only pruned based on size.
 		assert_eq!(consensus.known_messages.len(), 2);
-		assert!(consensus.known_messages.contains(&(best_hash, m2_hash)));
+		assert!(consensus.known_messages.contains_key(&m2_hash));
 
 		// make timestamp expired, but the message is still kept as known
 		consensus.messages.clear();
 		consensus.known_messages.clear();
+		consensus.register_validator(test_engine_id, Arc::new(AllowAll));
 		push_msg!(consensus, best_hash, m2_hash, now - MESSAGE_LIFETIME, m2.clone());
-		consensus.collect_garbage(|_topic| true);
+		consensus.collect_garbage();
 		assert!(consensus.messages.is_empty());
 		assert_eq!(consensus.known_messages.len(), 1);
-
-		// make timestamp expired past the known message lifetime
-		consensus.known_messages.clear();
-		push_msg!(consensus, best_hash, m2_hash, now - (5 * MESSAGE_LIFETIME), m2);
-		consensus.collect_garbage(|_topic| true);
-		assert!(consensus.messages.is_empty());
-		assert!(consensus.known_messages.is_empty());
-	}
-
-	#[test]
-	fn collects_garbage_for_topic() {
-		let topic = H256::random();
-		let dead_topic = H256::random();
-		let message = Vec::new();
-		let now = Instant::now();
-		let mut consensus = ConsensusGossip::<Block>::new();
-
-		let message_hash = H256::random();
-		push_msg!(consensus, topic, message_hash, now, message.clone());
-		push_msg!(consensus, dead_topic, message_hash, now, message.clone());
-		assert_eq!(consensus.messages.len(), 2);
-
-		consensus.collect_garbage_for_topic(topic);
-
-		// removes all messages for the topic and marks the topic as dead
-		assert_eq!(consensus.messages.len(), 1);
-		assert_eq!(consensus.known_messages.len(), 2);
-		assert_eq!(consensus.known_dead_topics.len(), 1);
-
-		// new messages for dead topics are ignored
-		consensus.register_message(HashFor::<Block>::hash(&message), topic, false, || message.clone());
-		assert_eq!(consensus.messages.len(), 1);
-		assert_eq!(consensus.known_messages.len(), 2);
 	}
 
 	#[test]
@@ -416,15 +418,15 @@ mod tests {
 
 		let mut consensus = ConsensusGossip::<Block>::new();
 
-		let message = vec![1, 2, 3];
+		let message = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] };
 
-		let message_hash = HashFor::<Block>::hash(&message);
+		let message_hash = HashFor::<Block>::hash(&message.data);
 		let topic = HashFor::<Block>::hash(&[1,2,3]);
 
-		consensus.register_message(message_hash, topic, false, || message.clone());
+		consensus.register_message(message_hash, topic, || message.clone());
 		let stream = consensus.messages_for(topic);
 
-		assert_eq!(stream.wait().next(), Some(Ok(message)));
+		assert_eq!(stream.wait().next(), Some(Ok(message.data)));
 	}
 
 	#[test]
@@ -432,11 +434,11 @@ mod tests {
 		let mut consensus = ConsensusGossip::<Block>::new();
 
 		let topic = [1; 32].into();
-		let msg_a = vec![1, 2, 3];
-		let msg_b = vec![4, 5, 6];
+		let msg_a = ConsensusMessage { data: vec![1, 2, 3], engine_id: [0, 0, 0, 0] };
+		let msg_b = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] };
 
-		consensus.register_message(HashFor::<Block>::hash(&msg_a), topic, false, || msg_a.clone());
-		consensus.register_message(HashFor::<Block>::hash(&msg_b), topic, false, || msg_b.clone());
+		consensus.register_message(HashFor::<Block>::hash(&msg_a.data), topic, || msg_a.clone());
+		consensus.register_message(HashFor::<Block>::hash(&msg_b.data), topic, || msg_b.clone());
 
 		assert_eq!(consensus.messages.len(), 2);
 	}
@@ -447,17 +449,17 @@ mod tests {
 
 		let mut consensus = ConsensusGossip::<Block>::new();
 
-		let message = vec![1, 2, 3];
+		let message = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] };
 
-		let message_hash = HashFor::<Block>::hash(&message);
+		let message_hash = HashFor::<Block>::hash(&message.data);
 		let topic = HashFor::<Block>::hash(&[1,2,3]);
 
-		consensus.register_message(message_hash, topic, false, || message.clone());
+		consensus.register_message(message_hash, topic, || message.clone());
 
 		let stream1 = consensus.messages_for(topic);
 		let stream2 = consensus.messages_for(topic);
 
-		assert_eq!(stream1.wait().next(), Some(Ok(message.clone())));
-		assert_eq!(stream2.wait().next(), Some(Ok(message)));
+		assert_eq!(stream1.wait().next(), Some(Ok(message.data.clone())));
+		assert_eq!(stream2.wait().next(), Some(Ok(message.data)));
 	}
 }
diff --git a/substrate/core/network/src/lib.rs b/substrate/core/network/src/lib.rs
index dc3a2dfb2f0cd53c7dbaa63230ce3bb30e4f3602..7a86c9e86efd3761d666b69620482a79dfbb1745 100644
--- a/substrate/core/network/src/lib.rs
+++ b/substrate/core/network/src/lib.rs
@@ -44,7 +44,7 @@ pub use network_libp2p::{
     NodeIndex, ProtocolId, Severity, Protocol, Multiaddr,
     obtain_private_key, multiaddr, PeerId, PublicKey
 };
-pub use message::{generic as generic_message, RequestId, Status as StatusMessage};
+pub use message::{generic as generic_message, RequestId, Status as StatusMessage, ConsensusEngineId};
 pub use error::Error;
 pub use on_demand::{OnDemand, OnDemandService, RemoteResponse};
 #[doc(hidden)]
diff --git a/substrate/core/network/src/message.rs b/substrate/core/network/src/message.rs
index 0aff9166b645dc0cda5ce6487d4a68332c7bcf8b..b54032d317fcd244bbf26e57b3718cc0ed6e0cbf 100644
--- a/substrate/core/network/src/message.rs
+++ b/substrate/core/network/src/message.rs
@@ -30,6 +30,9 @@ pub use self::generic::{
 /// A unique ID of a request.
 pub type RequestId = u64;
 
+/// Consensus engine unique ID.
+pub type ConsensusEngineId = [u8; 4];
+
 /// Type alias for using the message type using block type parameters.
 pub type Message<B> = generic::Message<
 	<B as BlockT>::Header,
@@ -132,10 +135,16 @@ pub mod generic {
 	use crate::config::Roles;
 	use super::{
 		BlockAttributes, RemoteCallResponse, RemoteReadResponse,
-		RequestId, Transactions, Direction
+		RequestId, Transactions, Direction, ConsensusEngineId,
 	};
-	/// Consensus is opaque to us
-	pub type ConsensusMessage = Vec<u8>;
+	/// Consensus is mostly opaque to us
+	#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
+	pub struct ConsensusMessage {
+		/// Identifies consensus engine.
+		pub engine_id: ConsensusEngineId,
+		/// Message payload.
+		pub data: Vec<u8>,
+	}
 
 	/// Block data sent in the response.
 	#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
@@ -177,7 +186,7 @@ pub mod generic {
 		/// Transactions.
 		Transactions(Transactions<Extrinsic>),
 		/// Consensus protocol message.
-		Consensus(Hash, ConsensusMessage, bool), // topic, opaque Vec<u8>, broadcast
+		Consensus(ConsensusMessage),
 		/// Remote method call request.
 		RemoteCallRequest(RemoteCallRequest<Hash>),
 		/// Remote method call response.
@@ -216,6 +225,8 @@ pub mod generic {
 	pub struct Status<Hash, Number> {
 		/// Protocol version.
 		pub version: u32,
+		/// Minimum supported version.
+		pub min_supported_version: u32,
 		/// Supported roles.
 		pub roles: Roles,
 		/// Best block number.
diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs
index b8c88c8624470554c50a8fae8b9ffa61c7101a76..5840371e32a873865e269f092b757a866dddc549 100644
--- a/substrate/core/network/src/protocol.rs
+++ b/substrate/core/network/src/protocol.rs
@@ -20,8 +20,8 @@ use primitives::storage::StorageKey;
 use runtime_primitives::generic::BlockId;
 use runtime_primitives::traits::{As, Block as BlockT, Header as HeaderT, NumberFor, Zero};
 use consensus::import_queue::ImportQueue;
-use crate::message::{self, Message};
-use crate::message::generic::Message as GenericMessage;
+use crate::message::{self, Message, ConsensusEngineId};
+use crate::message::generic::{Message as GenericMessage, ConsensusMessage};
 use crate::consensus_gossip::ConsensusGossip;
 use crate::on_demand::OnDemandService;
 use crate::specialization::NetworkSpecialization;
@@ -44,7 +44,9 @@ const TICK_TIMEOUT: time::Duration = time::Duration::from_millis(1000);
 const PROPAGATE_TIMEOUT: time::Duration = time::Duration::from_millis(5000);
 
 /// Current protocol version.
-pub(crate) const CURRENT_VERSION: u32 = 1;
+pub(crate) const CURRENT_VERSION: u32 = 2;
+/// Lowest version we support
+const MIN_VERSION: u32 = 2;
 
 // Maximum allowed entries in `BlockResponse`
 const MAX_BLOCK_DATA_RESPONSE: u32 = 128;
@@ -199,7 +201,7 @@ pub enum ProtocolMsg<B: BlockT, S: NetworkSpecialization<B>,> {
 	/// Execute a closure with the consensus gossip.
 	ExecuteWithGossip(Box<GossipTask<B> + Send + 'static>),
 	/// Incoming gossip consensus message.
-	GossipConsensusMessage(B::Hash, Vec<u8>, bool),
+	GossipConsensusMessage(B::Hash, ConsensusEngineId, Vec<u8>),
 	/// Is protocol currently major-syncing?
 	IsMajorSyncing(Sender<bool>),
 	/// Is protocol currently offline?
@@ -327,8 +329,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
 					ProtocolContext::new(&mut self.context_data, &self.network_chan);
 				task.call_box(&mut self.consensus_gossip, &mut context);
 			}
-			ProtocolMsg::GossipConsensusMessage(topic, message, broadcast) => {
-				self.gossip_consensus_message(topic, message, broadcast)
+			ProtocolMsg::GossipConsensusMessage(topic, engine_id, message) => {
+				self.gossip_consensus_message(topic, engine_id, message)
 			}
 			ProtocolMsg::IsMajorSyncing(sender) => {
 				let is_syncing = self.sync.status().is_major_syncing();
@@ -420,13 +422,12 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
 			GenericMessage::RemoteHeaderResponse(response) => self.on_remote_header_response(who, response),
 			GenericMessage::RemoteChangesRequest(request) => self.on_remote_changes_request(who, request),
 			GenericMessage::RemoteChangesResponse(response) => self.on_remote_changes_response(who, response),
-			GenericMessage::Consensus(topic, msg, broadcast) => {
+			GenericMessage::Consensus(msg) => {
 				self.consensus_gossip.on_incoming(
 					&mut ProtocolContext::new(&mut self.context_data, &self.network_chan),
 					who,
-					topic,
 					msg,
-					broadcast,
+					self.sync.status().is_major_syncing(),
 				);
 			}
 			other => self.specialization.on_message(
@@ -446,12 +447,11 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
 		);
 	}
 
-	fn gossip_consensus_message(&mut self, topic: B::Hash, message: Vec<u8>, broadcast: bool) {
+	fn gossip_consensus_message(&mut self, topic: B::Hash, engine_id: ConsensusEngineId, message: Vec<u8>) {
 		self.consensus_gossip.multicast(
 			&mut ProtocolContext::new(&mut self.context_data, &self.network_chan),
 			topic,
-			message,
-			broadcast,
+			ConsensusMessage{ data: message, engine_id },
 		);
 	}
 
@@ -599,7 +599,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
 
 	/// Perform time based maintenance.
 	fn tick(&mut self) {
-		self.consensus_gossip.collect_garbage(|_| true);
+		self.consensus_gossip.collect_garbage();
 		self.maintain_peers();
 		self.sync.tick(&mut ProtocolContext::new(&mut self.context_data, &self.network_chan));
 		self.on_demand
@@ -653,7 +653,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
 				));
 				return;
 			}
-			if status.version != CURRENT_VERSION {
+			if status.version < MIN_VERSION && CURRENT_VERSION < status.min_supported_version {
 				let reason = format!("Peer using unsupported protocol version {}", status.version);
 				self.network_chan.send(NetworkMsg::ReportPeer(
 					who,
@@ -803,6 +803,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
 		if let Ok(info) = self.context_data.chain.info() {
 			let status = message::generic::Status {
 				version: CURRENT_VERSION,
+				min_supported_version: MIN_VERSION,
 				genesis_hash: info.chain.genesis_hash,
 				roles: self.config.roles.into(),
 				best_number: info.chain.best_number,
diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs
index f857e712d34bafeecaf6bd018ce20626351f0ac9..81e79dbad9fdcead203c223a5897d1e7fa561e9a 100644
--- a/substrate/core/network/src/service.rs
+++ b/substrate/core/network/src/service.rs
@@ -25,7 +25,7 @@ use network_libp2p::{start_service, parse_str_addr, Service as NetworkService, S
 use network_libp2p::{Protocol as Libp2pProtocol, RegisteredProtocol};
 use consensus::import_queue::{ImportQueue, Link};
 use crate::consensus_gossip::ConsensusGossip;
-use crate::message::Message;
+use crate::message::{Message, ConsensusEngineId};
 use crate::protocol::{self, Context, Protocol, ProtocolMsg, ProtocolStatus, PeerInfo};
 use crate::config::Params;
 use crossbeam_channel::{self as channel, Receiver, Sender, TryRecvError};
@@ -208,11 +208,11 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
 	}
 
 	/// Send a consensus message through the gossip
-	pub fn gossip_consensus_message(&self, topic: B::Hash, message: Vec<u8>, broadcast: bool) {
+	pub fn gossip_consensus_message(&self, topic: B::Hash, engine_id: ConsensusEngineId, message: Vec<u8>) {
 		let _ = self
 			.protocol_sender
 			.send(ProtocolMsg::GossipConsensusMessage(
-				topic, message, broadcast,
+				topic, engine_id, message,
 			));
 	}
 
diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs
index 71cead8da9bfd0483592885abd4b31529b59ccf6..c16b2b8fd5781ae14d9de83a18e746338f5a2db9 100644
--- a/substrate/core/network/src/test/mod.rs
+++ b/substrate/core/network/src/test/mod.rs
@@ -34,12 +34,12 @@ use consensus::import_queue::{BasicQueue, ImportQueue, IncomingBlock};
 use consensus::import_queue::{Link, SharedBlockImport, SharedJustificationImport, Verifier};
 use consensus::{Error as ConsensusError, ErrorKind as ConsensusErrorKind};
 use consensus::{BlockOrigin, ForkChoiceStrategy, ImportBlock, JustificationImport};
-use crate::consensus_gossip::{ConsensusGossip, ConsensusMessage};
+use crate::consensus_gossip::ConsensusGossip;
 use crossbeam_channel::{self as channel, Sender, select};
 use futures::Future;
 use futures::sync::{mpsc, oneshot};
 use keyring::Keyring;
-use crate::message::Message;
+use crate::message::{Message, ConsensusEngineId};
 use network_libp2p::{NodeIndex, ProtocolId};
 use parity_codec::Encode;
 use parking_lot::Mutex;
@@ -274,21 +274,21 @@ impl<D> Peer<D> {
 
 	/// Push a message into the gossip network and relay to peers.
 	/// `TestNet::sync_step` needs to be called to ensure it's propagated.
-	pub fn gossip_message(&self, topic: <Block as BlockT>::Hash, data: Vec<u8>, broadcast: bool) {
+	pub fn gossip_message(&self, topic: <Block as BlockT>::Hash, engine_id: ConsensusEngineId, data: Vec<u8>) {
 		let _ = self
 			.protocol_sender
-			.send(ProtocolMsg::GossipConsensusMessage(topic, data, broadcast));
+			.send(ProtocolMsg::GossipConsensusMessage(topic, engine_id, data));
 	}
 
-	pub fn consensus_gossip_collect_garbage_for_topic(&self, topic: <Block as BlockT>::Hash) {
-		self.with_gossip(move |gossip, _| gossip.collect_garbage_for_topic(topic))
+	pub fn consensus_gossip_collect_garbage_for_topic(&self, _topic: <Block as BlockT>::Hash) {
+		self.with_gossip(move |gossip, _| gossip.collect_garbage())
 	}
 
 	/// access the underlying consensus gossip handler
 	pub fn consensus_gossip_messages_for(
 		&self,
 		topic: <Block as BlockT>::Hash,
-	) -> mpsc::UnboundedReceiver<ConsensusMessage> {
+	) -> mpsc::UnboundedReceiver<Vec<u8>> {
 		let (tx, rx) = oneshot::channel();
 		self.with_gossip(move |gossip, _| {
 			let inner_rx = gossip.messages_for(topic);