From bc3d283e781131bc31fee214f27ff245b528102d Mon Sep 17 00:00:00 2001
From: Pierre Krieger <pierre.krieger1708@gmail.com>
Date: Thu, 9 Jan 2020 19:24:51 +0100
Subject: [PATCH] Clean-ups in the network-gossip crate (#4542)

* Remove usage of sc_network::Context trait

* Remove Context::send_consensus

* Pass &mut dyn Network instead of &dyn Network

* Move Validator traits and related to separate module
---
 substrate/client/network-gossip/src/bridge.rs |  78 ++------
 substrate/client/network-gossip/src/lib.rs    |   6 +-
 .../network-gossip/src/state_machine.rs       | 173 ++++--------------
 .../client/network-gossip/src/validator.rs    | 103 +++++++++++
 substrate/client/network/src/protocol.rs      |  34 ----
 5 files changed, 159 insertions(+), 235 deletions(-)
 create mode 100644 substrate/client/network-gossip/src/validator.rs

diff --git a/substrate/client/network-gossip/src/bridge.rs b/substrate/client/network-gossip/src/bridge.rs
index 7eeb33131db..2b0b19b8761 100644
--- a/substrate/client/network-gossip/src/bridge.rs
+++ b/substrate/client/network-gossip/src/bridge.rs
@@ -14,10 +14,9 @@
 // You should have received a copy of the GNU General Public License
 // along with Substrate.  If not, see <http://www.gnu.org/licenses/>.
 
-use crate::Network;
-use crate::state_machine::{ConsensusGossip, Validator, TopicNotification};
+use crate::{Network, Validator};
+use crate::state_machine::{ConsensusGossip, TopicNotification};
 
-use sc_network::Context;
 use sc_network::message::generic::ConsensusMessage;
 use sc_network::{Event, ReputationChange};
 
@@ -36,37 +35,29 @@ pub struct GossipEngine<B: BlockT> {
 
 struct GossipEngineInner<B: BlockT> {
 	state_machine: ConsensusGossip<B>,
-	context: Box<dyn Context<B> + Send>,
-	context_ext: Box<dyn ContextExt<B> + Send>,
+	network: Box<dyn Network<B> + Send>,
 }
 
 impl<B: BlockT> GossipEngine<B> {
 	/// Create a new instance.
 	pub fn new<N: Network<B> + Send + Clone + 'static>(
-		network: N,
+		mut network: N,
 		executor: &impl futures::task::Spawn,
 		engine_id: ConsensusEngineId,
 		validator: Arc<dyn Validator<B>>,
 	) -> Self where B: 'static {
 		let mut state_machine = ConsensusGossip::new();
-		let mut context = Box::new(ContextOverService {
-			network: network.clone(),
-		});
-		let context_ext = Box::new(ContextOverService {
-			network: network.clone(),
-		});
 
 		// We grab the event stream before registering the notifications protocol, otherwise we
 		// might miss events.
 		let event_stream = network.event_stream();
 
 		network.register_notifications_protocol(engine_id);
-		state_machine.register_validator(&mut *context, engine_id, validator);
+		state_machine.register_validator(&mut network, engine_id, validator);
 
 		let inner = Arc::new(Mutex::new(GossipEngineInner {
 			state_machine,
-			context,
-			context_ext,
+			network: Box::new(network),
 		}));
 
 		let gossip_engine = GossipEngine {
@@ -82,7 +73,7 @@ impl<B: BlockT> GossipEngine<B> {
 					if let Some(inner) = inner.upgrade() {
 						let mut inner = inner.lock();
 						let inner = &mut *inner;
-						inner.state_machine.tick(&mut *inner.context);
+						inner.state_machine.tick(&mut *inner.network);
 					} else {
 						// We reach this branch if the `Arc<GossipEngineInner>` has no reference
 						// left. We can now let the task end.
@@ -107,7 +98,7 @@ impl<B: BlockT> GossipEngine<B> {
 						}
 						let mut inner = inner.lock();
 						let inner = &mut *inner;
-						inner.state_machine.new_peer(&mut *inner.context, remote, roles);
+						inner.state_machine.new_peer(&mut *inner.network, remote, roles);
 					}
 					Event::NotificationsStreamClosed { remote, engine_id: msg_engine_id } => {
 						if msg_engine_id != engine_id {
@@ -115,13 +106,13 @@ impl<B: BlockT> GossipEngine<B> {
 						}
 						let mut inner = inner.lock();
 						let inner = &mut *inner;
-						inner.state_machine.peer_disconnected(&mut *inner.context, remote);
+						inner.state_machine.peer_disconnected(&mut *inner.network, remote);
 					},
 					Event::NotificationsReceived { remote, messages } => {
 						let mut inner = inner.lock();
 						let inner = &mut *inner;
 						inner.state_machine.on_incoming(
-							&mut *inner.context,
+							&mut *inner.network,
 							remote,
 							messages.into_iter()
 								.filter_map(|(engine, data)| if engine == engine_id {
@@ -144,7 +135,7 @@ impl<B: BlockT> GossipEngine<B> {
 	}
 
 	pub fn report(&self, who: PeerId, reputation: ReputationChange) {
-		self.inner.lock().context.report_peer(who, reputation);
+		self.inner.lock().network.report_peer(who, reputation);
 	}
 
 	/// Registers a message without propagating it to any peers. The message
@@ -169,7 +160,7 @@ impl<B: BlockT> GossipEngine<B> {
 	pub fn broadcast_topic(&self, topic: B::Hash, force: bool) {
 		let mut inner = self.inner.lock();
 		let inner = &mut *inner;
-		inner.state_machine.broadcast_topic(&mut *inner.context, topic, force);
+		inner.state_machine.broadcast_topic(&mut *inner.network, topic, force);
 	}
 
 	/// Get data of valid, incoming messages for a topic (but might have expired meanwhile).
@@ -188,7 +179,7 @@ impl<B: BlockT> GossipEngine<B> {
 	) {
 		let mut inner = self.inner.lock();
 		let inner = &mut *inner;
-		inner.state_machine.send_topic(&mut *inner.context, who, topic, self.engine_id, force)
+		inner.state_machine.send_topic(&mut *inner.network, who, topic, self.engine_id, force)
 	}
 
 	/// Multicast a message to all peers.
@@ -205,7 +196,7 @@ impl<B: BlockT> GossipEngine<B> {
 
 		let mut inner = self.inner.lock();
 		let inner = &mut *inner;
-		inner.state_machine.multicast(&mut *inner.context, topic, message, force)
+		inner.state_machine.multicast(&mut *inner.network, topic, message, force)
 	}
 
 	/// Send addressed message to the given peers. The message is not kept or multicast
@@ -215,7 +206,7 @@ impl<B: BlockT> GossipEngine<B> {
 		let inner = &mut *inner;
 
 		for who in &who {
-			inner.state_machine.send_message(&mut *inner.context, who, ConsensusMessage {
+			inner.state_machine.send_message(&mut *inner.network, who, ConsensusMessage {
 				engine_id: self.engine_id,
 				data: data.clone(),
 			});
@@ -227,7 +218,7 @@ impl<B: BlockT> GossipEngine<B> {
 	/// Note: this method isn't strictly related to gossiping and should eventually be moved
 	/// somewhere else.
 	pub fn announce(&self, block: B::Hash, associated_data: Vec<u8>) {
-		self.inner.lock().context_ext.announce(block, associated_data);
+		self.inner.lock().network.announce(block, associated_data);
 	}
 }
 
@@ -239,40 +230,3 @@ impl<B: BlockT> Clone for GossipEngine<B> {
 		}
 	}
 }
-
-struct ContextOverService<N> {
-	network: N,
-}
-
-impl<B: BlockT, N: Network<B>> Context<B> for ContextOverService<N> {
-	fn report_peer(&mut self, who: PeerId, reputation: ReputationChange) {
-		self.network.report_peer(who, reputation);
-	}
-
-	fn disconnect_peer(&mut self, who: PeerId) {
-		self.network.disconnect_peer(who)
-	}
-
-	fn send_consensus(&mut self, who: PeerId, messages: Vec<ConsensusMessage>) {
-		for message in messages {
-			self.network.write_notification(who.clone(), message.engine_id, message.data);
-		}
-	}
-
-	fn send_chain_specific(&mut self, _: PeerId, _: Vec<u8>) {
-		log::error!(
-			target: "sub-libp2p",
-			"send_chain_specific has been called in a context where it shouldn't"
-		);
-	}
-}
-
-trait ContextExt<B: BlockT> {
-	fn announce(&self, block: B::Hash, associated_data: Vec<u8>);
-}
-
-impl<B: BlockT, N: Network<B>> ContextExt<B> for ContextOverService<N> {
-	fn announce(&self, block: B::Hash, associated_data: Vec<u8>) {
-		Network::announce(&self.network, block, associated_data)
-	}
-}
diff --git a/substrate/client/network-gossip/src/lib.rs b/substrate/client/network-gossip/src/lib.rs
index f7b360f939c..705a27210ac 100644
--- a/substrate/client/network-gossip/src/lib.rs
+++ b/substrate/client/network-gossip/src/lib.rs
@@ -55,9 +55,8 @@
 //! used to inform peers of a current view of protocol state.
 
 pub use self::bridge::GossipEngine;
-pub use self::state_machine::{TopicNotification, MessageIntent};
-pub use self::state_machine::{Validator, ValidatorContext, ValidationResult};
-pub use self::state_machine::DiscardAll;
+pub use self::state_machine::TopicNotification;
+pub use self::validator::{DiscardAll, MessageIntent, Validator, ValidatorContext, ValidationResult};
 
 use futures::prelude::*;
 use sc_network::{specialization::NetworkSpecialization, Event, ExHashT, NetworkService, PeerId, ReputationChange};
@@ -66,6 +65,7 @@ use std::sync::Arc;
 
 mod bridge;
 mod state_machine;
+mod validator;
 
 /// Abstraction over a network.
 pub trait Network<B: BlockT> {
diff --git a/substrate/client/network-gossip/src/state_machine.rs b/substrate/client/network-gossip/src/state_machine.rs
index 3e54e452db8..d1931b1bd29 100644
--- a/substrate/client/network-gossip/src/state_machine.rs
+++ b/substrate/client/network-gossip/src/state_machine.rs
@@ -14,6 +14,8 @@
 // You should have received a copy of the GNU General Public License
 // along with Substrate.  If not, see <http://www.gnu.org/licenses/>.
 
+use crate::{Network, MessageIntent, Validator, ValidatorContext, ValidationResult};
+
 use std::collections::{HashMap, HashSet, hash_map::Entry};
 use std::sync::Arc;
 use std::iter;
@@ -25,7 +27,6 @@ use libp2p::PeerId;
 use sp_runtime::traits::{Block as BlockT, Hash, HashFor};
 use sp_runtime::ConsensusEngineId;
 pub use sc_network::message::generic::{Message, ConsensusMessage};
-use sc_network::Context;
 use sc_network::config::Roles;
 
 // FIXME: Add additional spam/DoS attack protection: https://github.com/paritytech/substrate/issues/1115
@@ -67,62 +68,23 @@ struct MessageEntry<B: BlockT> {
 	sender: Option<PeerId>,
 }
 
-/// The reason for sending out the message.
-#[derive(Eq, PartialEq, Copy, Clone)]
-#[cfg_attr(test, derive(Debug))]
-pub enum MessageIntent {
-	/// Requested broadcast.
-	Broadcast,
-	/// Requested broadcast to all peers.
-	ForcedBroadcast,
-	/// Periodic rebroadcast of all messages to all peers.
-	PeriodicRebroadcast,
-}
-
-/// Message validation result.
-pub enum ValidationResult<H> {
-	/// Message should be stored and propagated under given topic.
-	ProcessAndKeep(H),
-	/// Message should be processed, but not propagated.
-	ProcessAndDiscard(H),
-	/// Message should be ignored.
-	Discard,
-}
-
-impl MessageIntent {
-	fn broadcast() -> MessageIntent {
-		MessageIntent::Broadcast
-	}
-}
-
-/// Validation context. Allows reacting to incoming messages by sending out further messages.
-pub trait ValidatorContext<B: BlockT> {
-	/// Broadcast all messages with given topic to peers that do not have it yet.
-	fn broadcast_topic(&mut self, topic: B::Hash, force: bool);
-	/// Broadcast a message to all peers that have not received it previously.
-	fn broadcast_message(&mut self, topic: B::Hash, message: Vec<u8>, force: bool);
-	/// Send addressed message to a peer.
-	fn send_message(&mut self, who: &PeerId, message: Vec<u8>);
-	/// Send all messages with given topic to a peer.
-	fn send_topic(&mut self, who: &PeerId, topic: B::Hash, force: bool);
-}
-
+/// Local implementation of `ValidatorContext`.
 struct NetworkContext<'g, 'p, B: BlockT> {
 	gossip: &'g mut ConsensusGossip<B>,
-	protocol: &'p mut dyn Context<B>,
+	network: &'p mut dyn Network<B>,
 	engine_id: ConsensusEngineId,
 }
 
 impl<'g, 'p, B: BlockT> ValidatorContext<B> for NetworkContext<'g, 'p, B> {
 	/// Broadcast all messages with given topic to peers that do not have it yet.
 	fn broadcast_topic(&mut self, topic: B::Hash, force: bool) {
-		self.gossip.broadcast_topic(self.protocol, topic, force);
+		self.gossip.broadcast_topic(self.network, topic, force);
 	}
 
 	/// Broadcast a message to all peers that have not received it previously.
 	fn broadcast_message(&mut self, topic: B::Hash, message: Vec<u8>, force: bool) {
 		self.gossip.multicast(
-			self.protocol,
+			self.network,
 			topic,
 			ConsensusMessage{ data: message, engine_id: self.engine_id.clone() },
 			force,
@@ -131,20 +93,17 @@ impl<'g, 'p, B: BlockT> ValidatorContext<B> for NetworkContext<'g, 'p, B> {
 
 	/// Send addressed message to a peer.
 	fn send_message(&mut self, who: &PeerId, message: Vec<u8>) {
-		self.protocol.send_consensus(who.clone(), vec![ConsensusMessage {
-			engine_id: self.engine_id,
-			data: message,
-		}]);
+		self.network.write_notification(who.clone(), self.engine_id, message);
 	}
 
 	/// Send all messages with given topic to a peer.
 	fn send_topic(&mut self, who: &PeerId, topic: B::Hash, force: bool) {
-		self.gossip.send_topic(self.protocol, who, topic, self.engine_id, force);
+		self.gossip.send_topic(self.network, who, topic, self.engine_id, force);
 	}
 }
 
 fn propagate<'a, B: BlockT, I>(
-	protocol: &mut dyn Context<B>,
+	network: &mut dyn Network<B>,
 	messages: I,
 	intent: MessageIntent,
 	peers: &mut HashMap<PeerId, PeerConsensus<B::Hash>>,
@@ -168,7 +127,6 @@ fn propagate<'a, B: BlockT, I>(
 	};
 
 	for (id, ref mut peer) in peers.iter_mut() {
-		let mut batch = Vec::new();
 		for (message_hash, topic, message) in messages.clone() {
 			let intent = match intent {
 				MessageIntent::Broadcast { .. } =>
@@ -195,38 +153,8 @@ fn propagate<'a, B: BlockT, I>(
 			peer.known_messages.insert(message_hash.clone());
 
 			trace!(target: "gossip", "Propagating to {}: {:?}", id, message);
-			batch.push(message.clone())
+			network.write_notification(id.clone(), message.engine_id, message.data.clone());
 		}
-		protocol.send_consensus(id.clone(), batch);
-	}
-}
-
-/// Validates consensus messages.
-pub trait Validator<B: BlockT>: Send + Sync {
-	/// New peer is connected.
-	fn new_peer(&self, _context: &mut dyn ValidatorContext<B>, _who: &PeerId, _roles: Roles) {
-	}
-
-	/// New connection is dropped.
-	fn peer_disconnected(&self, _context: &mut dyn ValidatorContext<B>, _who: &PeerId) {
-	}
-
-	/// Validate consensus message.
-	fn validate(
-		&self,
-		context: &mut dyn ValidatorContext<B>,
-		sender: &PeerId,
-		data: &[u8]
-	) -> ValidationResult<B::Hash>;
-
-	/// Produce a closure for validating messages on a given topic.
-	fn message_expired<'a>(&'a self) -> Box<dyn FnMut(B::Hash, &[u8]) -> bool + 'a> {
-		Box::new(move |_topic, _data| false)
-	}
-
-	/// Produce a closure for filtering egress messages.
-	fn message_allowed<'a>(&'a self) -> Box<dyn FnMut(&PeerId, MessageIntent, &B::Hash, &[u8]) -> bool + 'a> {
-		Box::new(move |_who, _intent, _topic, _data| true)
 	}
 }
 
@@ -256,14 +184,14 @@ impl<B: BlockT> ConsensusGossip<B> {
 	/// Register message validator for a message type.
 	pub fn register_validator(
 		&mut self,
-		protocol: &mut dyn Context<B>,
+		network: &mut dyn Network<B>,
 		engine_id: ConsensusEngineId,
 		validator: Arc<dyn Validator<B>>
 	) {
 		self.register_validator_internal(engine_id, validator.clone());
 		let peers: Vec<_> = self.peers.iter().map(|(id, peer)| (id.clone(), peer.roles)).collect();
 		for (id, roles) in peers {
-			let mut context = NetworkContext { gossip: self, protocol, engine_id: engine_id.clone() };
+			let mut context = NetworkContext { gossip: self, network, engine_id: engine_id.clone() };
 			validator.new_peer(&mut context, &id, roles);
 		}
 	}
@@ -273,7 +201,7 @@ impl<B: BlockT> ConsensusGossip<B> {
 	}
 
 	/// Handle new connected peer.
-	pub fn new_peer(&mut self, protocol: &mut dyn Context<B>, who: PeerId, roles: Roles) {
+	pub fn new_peer(&mut self, network: &mut dyn Network<B>, who: PeerId, roles: Roles) {
 		// light nodes are not valid targets for consensus gossip messages
 		if !roles.is_full() {
 			return;
@@ -285,7 +213,7 @@ impl<B: BlockT> ConsensusGossip<B> {
 			roles,
 		});
 		for (engine_id, v) in self.validators.clone() {
-			let mut context = NetworkContext { gossip: self, protocol, engine_id: engine_id.clone() };
+			let mut context = NetworkContext { gossip: self, network, engine_id: engine_id.clone() };
 			v.new_peer(&mut context, &who, roles);
 		}
 	}
@@ -322,37 +250,37 @@ impl<B: BlockT> ConsensusGossip<B> {
 	}
 
 	/// Call when a peer has been disconnected to stop tracking gossip status.
-	pub fn peer_disconnected(&mut self, protocol: &mut dyn Context<B>, who: PeerId) {
+	pub fn peer_disconnected(&mut self, network: &mut dyn Network<B>, who: PeerId) {
 		for (engine_id, v) in self.validators.clone() {
-			let mut context = NetworkContext { gossip: self, protocol, engine_id: engine_id.clone() };
+			let mut context = NetworkContext { gossip: self, network, engine_id: engine_id.clone() };
 			v.peer_disconnected(&mut context, &who);
 		}
 	}
 
 	/// Perform periodic maintenance
-	pub fn tick(&mut self, protocol: &mut dyn Context<B>) {
+	pub fn tick(&mut self, network: &mut dyn Network<B>) {
 		self.collect_garbage();
 		if time::Instant::now() >= self.next_broadcast {
-			self.rebroadcast(protocol);
+			self.rebroadcast(network);
 			self.next_broadcast = time::Instant::now() + REBROADCAST_INTERVAL;
 		}
 	}
 
 	/// Rebroadcast all messages to all peers.
-	fn rebroadcast(&mut self, protocol: &mut dyn Context<B>) {
+	fn rebroadcast(&mut self, network: &mut dyn Network<B>) {
 		let messages = self.messages.iter()
 			.map(|entry| (&entry.message_hash, &entry.topic, &entry.message));
-		propagate(protocol, messages, MessageIntent::PeriodicRebroadcast, &mut self.peers, &self.validators);
+		propagate(network, messages, MessageIntent::PeriodicRebroadcast, &mut self.peers, &self.validators);
 	}
 
 	/// Broadcast all messages with given topic.
-	pub fn broadcast_topic(&mut self, protocol: &mut dyn Context<B>, topic: B::Hash, force: bool) {
+	pub fn broadcast_topic(&mut self, network: &mut dyn Network<B>, topic: B::Hash, force: bool) {
 		let messages = self.messages.iter()
 			.filter_map(|entry|
 				if entry.topic == topic { Some((&entry.message_hash, &entry.topic, &entry.message)) } else { None }
 			);
-		let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::broadcast() };
-		propagate(protocol, messages, intent, &mut self.peers, &self.validators);
+		let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast };
+		propagate(network, messages, intent, &mut self.peers, &self.validators);
 	}
 
 	/// Prune old or no longer relevant consensus messages. Provide a predicate
@@ -420,7 +348,7 @@ impl<B: BlockT> ConsensusGossip<B> {
 	/// in all other cases.
 	pub fn on_incoming(
 		&mut self,
-		protocol: &mut dyn Context<B>,
+		network: &mut dyn Network<B>,
 		who: PeerId,
 		messages: Vec<ConsensusMessage>,
 	) {
@@ -430,7 +358,7 @@ impl<B: BlockT> ConsensusGossip<B> {
 
 			if self.known_messages.contains(&message_hash) {
 				trace!(target:"gossip", "Ignored already known message from {}", who);
-				protocol.report_peer(who.clone(), rep::DUPLICATE_GOSSIP);
+				network.report_peer(who.clone(), rep::DUPLICATE_GOSSIP);
 				continue;
 			}
 
@@ -439,7 +367,7 @@ impl<B: BlockT> ConsensusGossip<B> {
 			let validation = self.validators.get(&engine_id)
 				.cloned()
 				.map(|v| {
-					let mut context = NetworkContext { gossip: self, protocol, engine_id };
+					let mut context = NetworkContext { gossip: self, network, engine_id };
 					v.validate(&mut context, &who, &message.data)
 				});
 
@@ -449,14 +377,14 @@ impl<B: BlockT> ConsensusGossip<B> {
 				Some(ValidationResult::Discard) => None,
 				None => {
 					trace!(target:"gossip", "Unknown message engine id {:?} from {}", engine_id, who);
-					protocol.report_peer(who.clone(), rep::UNKNOWN_GOSSIP);
-					protocol.disconnect_peer(who.clone());
+					network.report_peer(who.clone(), rep::UNKNOWN_GOSSIP);
+					network.disconnect_peer(who.clone());
 					continue;
 				}
 			};
 
 			if let Some((topic, keep)) = validation_result {
-				protocol.report_peer(who.clone(), rep::GOSSIP_SUCCESS);
+				network.report_peer(who.clone(), rep::GOSSIP_SUCCESS);
 				if let Some(ref mut peer) = self.peers.get_mut(&who) {
 					peer.known_messages.insert(message_hash);
 					if let Entry::Occupied(mut entry) = self.live_message_sinks.entry((engine_id, topic)) {
@@ -479,7 +407,7 @@ impl<B: BlockT> ConsensusGossip<B> {
 					}
 				} else {
 					trace!(target:"gossip", "Ignored statement from unregistered peer {}", who);
-					protocol.report_peer(who.clone(), rep::UNREGISTERED_TOPIC);
+					network.report_peer(who.clone(), rep::UNREGISTERED_TOPIC);
 				}
 			} else {
 				trace!(target:"gossip", "Handled valid one hop message from peer {}", who);
@@ -490,7 +418,7 @@ impl<B: BlockT> ConsensusGossip<B> {
 	/// Send all messages with given topic to a peer.
 	pub fn send_topic(
 		&mut self,
-		protocol: &mut dyn Context<B>,
+		network: &mut dyn Network<B>,
 		who: &PeerId,
 		topic: B::Hash,
 		engine_id: ConsensusEngineId,
@@ -503,7 +431,6 @@ impl<B: BlockT> ConsensusGossip<B> {
 		};
 
 		if let Some(ref mut peer) = self.peers.get_mut(who) {
-			let mut batch = Vec::new();
 			for entry in self.messages.iter().filter(|m| m.topic == topic && m.message.engine_id == engine_id) {
 				let intent = if force {
 					MessageIntent::ForcedBroadcast
@@ -522,34 +449,30 @@ impl<B: BlockT> ConsensusGossip<B> {
 				peer.known_messages.insert(entry.message_hash.clone());
 
 				trace!(target: "gossip", "Sending topic message to {}: {:?}", who, entry.message);
-				batch.push(ConsensusMessage {
-					engine_id: engine_id.clone(),
-					data: entry.message.data.clone(),
-				});
+				network.write_notification(who.clone(), engine_id, entry.message.data.clone());
 			}
-			protocol.send_consensus(who.clone(), batch);
 		}
 	}
 
 	/// Multicast a message to all peers.
 	pub fn multicast(
 		&mut self,
-		protocol: &mut dyn Context<B>,
+		network: &mut dyn Network<B>,
 		topic: B::Hash,
 		message: ConsensusMessage,
 		force: bool,
 	) {
 		let message_hash = HashFor::<B>::hash(&message.data);
 		self.register_message_hashed(message_hash, topic, message.clone(), None);
-		let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::broadcast() };
-		propagate(protocol, iter::once((&message_hash, &topic, &message)), intent, &mut self.peers, &self.validators);
+		let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast };
+		propagate(network, iter::once((&message_hash, &topic, &message)), intent, &mut self.peers, &self.validators);
 	}
 
 	/// Send addressed message to a peer. The message is not kept or multicast
 	/// later on.
 	pub fn send_message(
 		&mut self,
-		protocol: &mut dyn Context<B>,
+		network: &mut dyn Network<B>,
 		who: &PeerId,
 		message: ConsensusMessage,
 	) {
@@ -563,29 +486,7 @@ impl<B: BlockT> ConsensusGossip<B> {
 		trace!(target: "gossip", "Sending direct to {}: {:?}", who, message);
 
 		peer.known_messages.insert(message_hash);
-		protocol.send_consensus(who.clone(), vec![message.clone()]);
-	}
-}
-
-/// A gossip message validator that discards all messages.
-pub struct DiscardAll;
-
-impl<B: BlockT> Validator<B> for DiscardAll {
-	fn validate(
-		&self,
-		_context: &mut dyn ValidatorContext<B>,
-		_sender: &PeerId,
-		_data: &[u8],
-	) -> ValidationResult<B::Hash> {
-		ValidationResult::Discard
-	}
-
-	fn message_expired<'a>(&'a self) -> Box<dyn FnMut(B::Hash, &[u8]) -> bool + 'a> {
-		Box::new(move |_topic, _data| true)
-	}
-
-	fn message_allowed<'a>(&'a self) -> Box<dyn FnMut(&PeerId, MessageIntent, &B::Hash, &[u8]) -> bool + 'a> {
-		Box::new(move |_who, _intent, _topic, _data| false)
+		network.write_notification(who.clone(), message.engine_id, message.data);
 	}
 }
 
diff --git a/substrate/client/network-gossip/src/validator.rs b/substrate/client/network-gossip/src/validator.rs
new file mode 100644
index 00000000000..74b5307ee9c
--- /dev/null
+++ b/substrate/client/network-gossip/src/validator.rs
@@ -0,0 +1,103 @@
+// Copyright 2017-2020 Parity Technologies (UK) Ltd.
+// This file is part of Substrate.
+
+// Substrate is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Substrate is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Substrate.  If not, see <http://www.gnu.org/licenses/>.
+
+use sc_network::{config::Roles, PeerId};
+use sp_runtime::traits::Block as BlockT;
+
+/// Validates consensus messages.
+pub trait Validator<B: BlockT>: Send + Sync {
+	/// New peer is connected.
+	fn new_peer(&self, _context: &mut dyn ValidatorContext<B>, _who: &PeerId, _roles: Roles) {
+	}
+
+	/// New connection is dropped.
+	fn peer_disconnected(&self, _context: &mut dyn ValidatorContext<B>, _who: &PeerId) {
+	}
+
+	/// Validate consensus message.
+	fn validate(
+		&self,
+		context: &mut dyn ValidatorContext<B>,
+		sender: &PeerId,
+		data: &[u8]
+	) -> ValidationResult<B::Hash>;
+
+	/// Produce a closure for validating messages on a given topic.
+	fn message_expired<'a>(&'a self) -> Box<dyn FnMut(B::Hash, &[u8]) -> bool + 'a> {
+		Box::new(move |_topic, _data| false)
+	}
+
+	/// Produce a closure for filtering egress messages.
+	fn message_allowed<'a>(&'a self) -> Box<dyn FnMut(&PeerId, MessageIntent, &B::Hash, &[u8]) -> bool + 'a> {
+		Box::new(move |_who, _intent, _topic, _data| true)
+	}
+}
+
+/// Validation context. Allows reacting to incoming messages by sending out further messages.
+pub trait ValidatorContext<B: BlockT> {
+	/// Broadcast all messages with given topic to peers that do not have it yet.
+	fn broadcast_topic(&mut self, topic: B::Hash, force: bool);
+	/// Broadcast a message to all peers that have not received it previously.
+	fn broadcast_message(&mut self, topic: B::Hash, message: Vec<u8>, force: bool);
+	/// Send addressed message to a peer.
+	fn send_message(&mut self, who: &PeerId, message: Vec<u8>);
+	/// Send all messages with given topic to a peer.
+	fn send_topic(&mut self, who: &PeerId, topic: B::Hash, force: bool);
+}
+
+/// The reason for sending out the message.
+#[derive(Eq, PartialEq, Copy, Clone)]
+#[cfg_attr(test, derive(Debug))]
+pub enum MessageIntent {
+	/// Requested broadcast.
+	Broadcast,
+	/// Requested broadcast to all peers.
+	ForcedBroadcast,
+	/// Periodic rebroadcast of all messages to all peers.
+	PeriodicRebroadcast,
+}
+
+/// Message validation result.
+pub enum ValidationResult<H> {
+	/// Message should be stored and propagated under given topic.
+	ProcessAndKeep(H),
+	/// Message should be processed, but not propagated.
+	ProcessAndDiscard(H),
+	/// Message should be ignored.
+	Discard,
+}
+
+/// A gossip message validator that discards all messages.
+pub struct DiscardAll;
+
+impl<B: BlockT> Validator<B> for DiscardAll {
+	fn validate(
+		&self,
+		_context: &mut dyn ValidatorContext<B>,
+		_sender: &PeerId,
+		_data: &[u8],
+	) -> ValidationResult<B::Hash> {
+		ValidationResult::Discard
+	}
+
+	fn message_expired<'a>(&'a self) -> Box<dyn FnMut(B::Hash, &[u8]) -> bool + 'a> {
+		Box::new(move |_topic, _data| true)
+	}
+
+	fn message_allowed<'a>(&'a self) -> Box<dyn FnMut(&PeerId, MessageIntent, &B::Hash, &[u8]) -> bool + 'a> {
+		Box::new(move |_who, _intent, _topic, _data| false)
+	}
+}
diff --git a/substrate/client/network/src/protocol.rs b/substrate/client/network/src/protocol.rs
index 983cdd25a89..b712ebe5155 100644
--- a/substrate/client/network/src/protocol.rs
+++ b/substrate/client/network/src/protocol.rs
@@ -80,8 +80,6 @@ pub(crate) const MIN_VERSION: u32 = 3;
 
 // Maximum allowed entries in `BlockResponse`
 const MAX_BLOCK_DATA_RESPONSE: u32 = 128;
-// Maximum allowed entries in `ConsensusBatch`
-const MAX_CONSENSUS_MESSAGES: usize = 256;
 /// When light node connects to the full node and the full node is behind light node
 /// for at least `LIGHT_MAXIMAL_BLOCKS_DIFFERENCE` blocks, we consider it unuseful
 /// and disconnect to free connection slot.
@@ -327,9 +325,6 @@ pub trait Context<B: BlockT> {
 	/// Force disconnecting from a peer. Use this when a peer misbehaved.
 	fn disconnect_peer(&mut self, who: PeerId);
 
-	/// Send a consensus message to a peer.
-	fn send_consensus(&mut self, who: PeerId, messages: Vec<ConsensusMessage>);
-
 	/// Send a chain-specific message to a peer.
 	fn send_chain_specific(&mut self, who: PeerId, message: Vec<u8>);
 }
@@ -360,35 +355,6 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context<B> for ProtocolContext<'a, B,
 		self.behaviour.disconnect_peer(&who)
 	}
 
-	fn send_consensus(&mut self, who: PeerId, messages: Vec<ConsensusMessage>) {
-		if self.context_data.peers.get(&who).map_or(false, |peer| peer.info.protocol_version > 4) {
-			let mut batch = Vec::new();
-			let len = messages.len();
-			for (index, message) in messages.into_iter().enumerate() {
-				batch.reserve(MAX_CONSENSUS_MESSAGES);
-				batch.push(message);
-				if batch.len() == MAX_CONSENSUS_MESSAGES || index == len - 1 {
-					send_message::<B> (
-						self.behaviour,
-						&mut self.context_data.stats,
-						&who,
-						GenericMessage::ConsensusBatch(std::mem::replace(&mut batch, Vec::new())),
-					)
-				}
-			}
-		} else {
-			// Backwards compatibility
-			for message in messages {
-				send_message::<B> (
-					self.behaviour,
-					&mut self.context_data.stats,
-					&who,
-					GenericMessage::Consensus(message)
-				)
-			}
-		}
-	}
-
 	fn send_chain_specific(&mut self, who: PeerId, message: Vec<u8>) {
 		send_message::<B> (
 			self.behaviour,
-- 
GitLab